blobfs/
mock.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Mock implementation of blobfs for blobfs::Client.
6
7use fidl::endpoints::RequestStream as _;
8use fuchsia_hash::Hash;
9use futures::{Future, StreamExt as _, TryStreamExt as _};
10use std::cmp::min;
11use std::collections::HashSet;
12use std::convert::TryInto as _;
13use zx::{self as zx, AsHandleRef as _, HandleBased as _, Status};
14use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
15
16/// A testing server implementation of /blob.
17///
18/// Mock does not handle requests until instructed to do so.
19pub struct Mock {
20    pub(super) stream: fio::DirectoryRequestStream,
21}
22
23impl Mock {
24    /// Consume the next directory request, verifying it is intended to read the blob identified
25    /// by `merkle`.  Returns a `Blob` representing the open blob file.
26    ///
27    /// # Panics
28    ///
29    /// Panics on error or assertion violation (unexpected requests or a mismatched open call)
30    pub async fn expect_open_blob(&mut self, merkle: Hash) -> Blob {
31        match self.stream.next().await {
32            Some(Ok(fio::DirectoryRequest::Open {
33                path,
34                flags,
35                options: _,
36                object,
37                control_handle: _,
38            })) => {
39                assert_eq!(path, merkle.to_string());
40                assert!(flags.contains(fio::PERM_READABLE));
41                assert!(!flags.intersects(fio::Flags::PERM_WRITE | fio::Flags::FLAG_MAYBE_CREATE));
42
43                let stream =
44                    fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
45                        .cast_stream();
46                Blob { stream }
47            }
48            other => panic!("unexpected request: {other:?}"),
49        }
50    }
51
52    /// Consume the next directory request, verifying it is intended to create the blob identified
53    /// by `merkle`.  Returns a `Blob` representing the open blob file.
54    ///
55    /// # Panics
56    ///
57    /// Panics on error or assertion violation (unexpected requests or a mismatched open call)
58    pub async fn expect_create_blob(&mut self, merkle: Hash) -> Blob {
59        match self.stream.next().await {
60            Some(Ok(fio::DirectoryRequest::Open {
61                path,
62                flags,
63                options: _,
64                object,
65                control_handle: _,
66            })) => {
67                assert!(flags.contains(fio::PERM_WRITABLE | fio::Flags::FLAG_MAYBE_CREATE));
68                assert_eq!(path, delivery_blob::delivery_blob_path(merkle));
69                let stream =
70                    fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
71                        .cast_stream();
72                Blob { stream }
73            }
74            other => panic!("unexpected request: {other:?}"),
75        }
76    }
77
78    async fn handle_rewind(&mut self) {
79        match self.stream.next().await {
80            Some(Ok(fio::DirectoryRequest::Rewind { responder })) => {
81                responder.send(Status::OK.into_raw()).unwrap();
82            }
83            other => panic!("unexpected request: {other:?}"),
84        }
85    }
86
87    /// Consume directory requests, verifying they are requests to read directory entries.  Respond
88    /// with dirents constructed from the given entries.
89    ///
90    /// # Panics
91    ///
92    /// Panics on error or assertion violation (unexpected requests or not all entries are read)
93    pub async fn expect_readdir(&mut self, entries: impl Iterator<Item = Hash>) {
94        // fuchsia_fs::directory starts by resetting the directory channel's readdir position.
95        self.handle_rewind().await;
96
97        const NAME_LEN: usize = 64;
98        #[repr(C, packed)]
99        struct Dirent {
100            ino: u64,
101            size: u8,
102            kind: u8,
103            name: [u8; NAME_LEN],
104        }
105
106        impl Dirent {
107            fn as_bytes(&self) -> &'_ [u8] {
108                let start = self as *const Self as *const u8;
109                // Safe because the FIDL wire format for directory entries is
110                // defined to be the C packed struct representation used here.
111                unsafe { std::slice::from_raw_parts(start, std::mem::size_of::<Self>()) }
112            }
113        }
114
115        let mut entries_iter = entries.map(|hash| Dirent {
116            ino: fio::INO_UNKNOWN,
117            size: NAME_LEN as u8,
118            kind: fio::DirentType::File.into_primitive(),
119            name: hash.to_string().as_bytes().try_into().unwrap(),
120        });
121
122        loop {
123            match self.stream.try_next().await.unwrap() {
124                Some(fio::DirectoryRequest::ReadDirents { max_bytes, responder }) => {
125                    let max_bytes = max_bytes as usize;
126                    assert!(max_bytes >= std::mem::size_of::<Dirent>());
127
128                    let mut buf = vec![];
129                    while buf.len() + std::mem::size_of::<Dirent>() <= max_bytes {
130                        match entries_iter.next() {
131                            Some(need) => {
132                                buf.extend(need.as_bytes());
133                            }
134                            None => break,
135                        }
136                    }
137
138                    responder.send(Status::OK.into_raw(), &buf).unwrap();
139
140                    // Finish after providing an empty chunk.
141                    if buf.is_empty() {
142                        break;
143                    }
144                }
145                Some(other) => panic!("unexpected request: {other:?}"),
146                None => panic!("unexpected stream termination"),
147            }
148        }
149    }
150
151    /// Consume N directory requests, verifying they are intended to determine whether the blobs
152    /// specified `readable` and `missing` are readable or not, responding to the check based on
153    /// which collection the hash is in.
154    ///
155    /// # Panics
156    ///
157    /// Panics on error or assertion violation (unexpected requests, request for unspecified blob)
158    pub async fn expect_readable_missing_checks(&mut self, readable: &[Hash], missing: &[Hash]) {
159        let mut readable = readable.iter().copied().collect::<HashSet<_>>();
160        let mut missing = missing.iter().copied().collect::<HashSet<_>>();
161
162        while !(readable.is_empty() && missing.is_empty()) {
163            match self.stream.next().await {
164                Some(Ok(fio::DirectoryRequest::Open {
165                    path,
166                    flags,
167                    options: _,
168                    object,
169                    control_handle: _,
170                })) => {
171                    assert!(flags.contains(fio::PERM_READABLE));
172                    assert!(
173                        !flags.intersects(fio::Flags::PERM_WRITE | fio::Flags::FLAG_MAYBE_CREATE)
174                    );
175                    let path: Hash = path.parse().unwrap();
176
177                    let stream =
178                        fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
179                            .cast_stream();
180                    let blob = Blob { stream };
181                    if readable.remove(&path) {
182                        blob.succeed_open_with_blob_readable().await;
183                    } else if missing.remove(&path) {
184                        blob.fail_open_with_not_found();
185                    } else {
186                        panic!("Unexpected blob existance check for {path}");
187                    }
188                }
189                other => panic!("unexpected request: {other:?}"),
190            }
191        }
192    }
193
194    /// Expects and handles a call to [`Client::filter_to_missing_blobs`].
195    /// Verifies the call intends to determine whether the blobs specified in `readable` and
196    /// `missing` are readable or not, responding to the check based on which collection the hash is
197    /// in.
198    ///
199    /// # Panics
200    ///
201    /// Panics on error or assertion violation (unexpected requests, request for unspecified blob)
202    pub async fn expect_filter_to_missing_blobs_with_readable_missing_ids(
203        &mut self,
204        readable: &[Hash],
205        missing: &[Hash],
206    ) {
207        self.expect_readable_missing_checks(readable, missing).await;
208    }
209
210    /// Asserts that the request stream closes without any further requests.
211    ///
212    /// # Panics
213    ///
214    /// Panics on error
215    pub async fn expect_done(mut self) {
216        match self.stream.next().await {
217            None => {}
218            Some(request) => panic!("unexpected request: {request:?}"),
219        }
220    }
221}
222
223/// A testing server implementation of an open /blob/<merkle> file.
224///
225/// Blob does not send the OnOpen event or handle requests until instructed to do so.
226pub struct Blob {
227    stream: fio::FileRequestStream,
228}
229
230impl Blob {
231    fn send_on_open_with_file_signals(&mut self, status: Status, signals: zx::Signals) {
232        let event = fidl::Event::create();
233        event.signal_handle(zx::Signals::NONE, signals).unwrap();
234
235        let info =
236            fio::NodeInfoDeprecated::File(fio::FileObject { event: Some(event), stream: None });
237        let () = self.stream.control_handle().send_on_open_(status.into_raw(), Some(info)).unwrap();
238    }
239
240    fn send_on_open(&mut self, status: Status) {
241        self.send_on_open_with_file_signals(status, zx::Signals::NONE);
242    }
243
244    fn send_on_open_with_readable(&mut self, status: Status) {
245        // Send USER_0 signal to indicate that the blob is available.
246        self.send_on_open_with_file_signals(status, zx::Signals::USER_0);
247    }
248
249    fn fail_open_with_error(mut self, status: Status) {
250        assert_ne!(status, Status::OK);
251        self.send_on_open(status);
252    }
253
254    /// Fail the open request with an error indicating the blob already exists.
255    ///
256    /// # Panics
257    ///
258    /// Panics on error
259    pub fn fail_open_with_already_exists(self) {
260        self.fail_open_with_error(Status::ACCESS_DENIED);
261    }
262
263    /// Fail the open request with an error indicating the blob does not exist.
264    ///
265    /// # Panics
266    ///
267    /// Panics on error
268    pub fn fail_open_with_not_found(self) {
269        self.fail_open_with_error(Status::NOT_FOUND);
270    }
271
272    /// Fail the open request with a generic IO error.
273    ///
274    /// # Panics
275    ///
276    /// Panics on error
277    pub fn fail_open_with_io_error(self) {
278        self.fail_open_with_error(Status::IO);
279    }
280
281    /// Succeeds the open request, but indicate the blob is not yet readable by not asserting the
282    /// USER_0 signal on the file event handle, then asserts that the connection to the blob is
283    /// closed.
284    ///
285    /// # Panics
286    ///
287    /// Panics on error
288    pub async fn fail_open_with_not_readable(mut self) {
289        self.send_on_open(Status::OK);
290        self.expect_done().await;
291    }
292
293    /// Succeeds the open request, indicating that the blob is readable, then asserts that the
294    /// connection to the blob is closed.
295    ///
296    /// # Panics
297    ///
298    /// Panics on error
299    pub async fn succeed_open_with_blob_readable(mut self) {
300        self.send_on_open_with_readable(Status::OK);
301        self.expect_done().await;
302    }
303
304    /// Succeeds the open request, then verifies the blob is immediately closed (possibly after
305    /// handling a single Close request).
306    ///
307    /// # Panics
308    ///
309    /// Panics on error
310    pub async fn expect_close(mut self) {
311        self.send_on_open_with_readable(Status::OK);
312
313        match self.stream.next().await {
314            None => {}
315            Some(Ok(fio::FileRequest::Close { responder })) => {
316                let _ = responder.send(Ok(()));
317                self.expect_done().await;
318            }
319            Some(other) => panic!("unexpected request: {other:?}"),
320        }
321    }
322
323    /// Asserts that the request stream closes without any further requests.
324    ///
325    /// # Panics
326    ///
327    /// Panics on error
328    pub async fn expect_done(mut self) {
329        match self.stream.next().await {
330            None => {}
331            Some(request) => panic!("unexpected request: {request:?}"),
332        }
333    }
334
335    async fn handle_read(&mut self, data: &[u8]) -> usize {
336        match self.stream.next().await {
337            Some(Ok(fio::FileRequest::Read { count, responder })) => {
338                let count = min(count.try_into().unwrap(), data.len());
339                responder.send(Ok(&data[..count])).unwrap();
340                count
341            }
342            other => panic!("unexpected request: {other:?}"),
343        }
344    }
345
346    /// Succeeds the open request, then handle read request with the given blob data.
347    ///
348    /// # Panics
349    ///
350    /// Panics on error
351    pub async fn expect_read(mut self, blob: &[u8]) {
352        self.send_on_open_with_readable(Status::OK);
353
354        let mut rest = blob;
355        while !rest.is_empty() {
356            let count = self.handle_read(rest).await;
357            rest = &rest[count..];
358        }
359
360        // Handle one extra request with empty buffer to signal EOF.
361        self.handle_read(rest).await;
362
363        match self.stream.next().await {
364            None => {}
365            Some(Ok(fio::FileRequest::Close { responder })) => {
366                let _ = responder.send(Ok(()));
367            }
368            Some(other) => panic!("unexpected request: {other:?}"),
369        }
370    }
371
372    /// Succeeds the open request. Then handles get_attr, read, read_at, and possibly a final close
373    /// requests with the given blob data.
374    ///
375    /// # Panics
376    ///
377    /// Panics on error
378    pub async fn serve_contents(mut self, data: &[u8]) {
379        self.send_on_open_with_readable(Status::OK);
380
381        let mut pos: usize = 0;
382
383        loop {
384            match self.stream.next().await {
385                Some(Ok(fio::FileRequest::Read { count, responder })) => {
386                    let avail = data.len() - pos;
387                    let count = min(count.try_into().unwrap(), avail);
388                    responder.send(Ok(&data[pos..pos + count])).unwrap();
389                    pos += count;
390                }
391                Some(Ok(fio::FileRequest::ReadAt { count, offset, responder })) => {
392                    let pos: usize = offset.try_into().unwrap();
393                    let avail = data.len() - pos;
394                    let count = min(count.try_into().unwrap(), avail);
395                    responder.send(Ok(&data[pos..pos + count])).unwrap();
396                }
397                Some(Ok(fio::FileRequest::GetAttr { responder })) => {
398                    let mut attr = fio::NodeAttributes {
399                        mode: 0,
400                        id: 0,
401                        content_size: 0,
402                        storage_size: 0,
403                        link_count: 0,
404                        creation_time: 0,
405                        modification_time: 0,
406                    };
407                    attr.content_size = data.len().try_into().unwrap();
408                    responder.send(Status::OK.into_raw(), &attr).unwrap();
409                }
410                Some(Ok(fio::FileRequest::Close { responder })) => {
411                    let _ = responder.send(Ok(()));
412                    return;
413                }
414                Some(Ok(fio::FileRequest::GetBackingMemory { flags, responder })) => {
415                    assert!(flags.contains(fio::VmoFlags::READ));
416                    assert!(!flags.contains(fio::VmoFlags::WRITE));
417                    assert!(!flags.contains(fio::VmoFlags::EXECUTE));
418                    let vmo = zx::Vmo::create(data.len() as u64).unwrap();
419                    vmo.write(data, 0).unwrap();
420                    let vmo = vmo
421                        .replace_handle(
422                            zx::Rights::READ
423                                | zx::Rights::BASIC
424                                | zx::Rights::MAP
425                                | zx::Rights::GET_PROPERTY,
426                        )
427                        .unwrap();
428                    responder.send(Ok(vmo)).unwrap();
429                }
430                None => {
431                    return;
432                }
433                other => panic!("unexpected request: {other:?}"),
434            }
435        }
436    }
437
438    async fn handle_truncate(&mut self, status: Status) -> u64 {
439        match self.stream.next().await {
440            Some(Ok(fio::FileRequest::Resize { length, responder })) => {
441                responder
442                    .send(if status == Status::OK { Ok(()) } else { Err(status.into_raw()) })
443                    .unwrap();
444
445                length
446            }
447            other => panic!("unexpected request: {other:?}"),
448        }
449    }
450
451    async fn expect_truncate(&mut self) -> u64 {
452        self.handle_truncate(Status::OK).await
453    }
454
455    async fn handle_write(&mut self, status: Status) -> Vec<u8> {
456        match self.stream.next().await {
457            Some(Ok(fio::FileRequest::Write { data, responder })) => {
458                responder
459                    .send(if status == Status::OK {
460                        Ok(data.len() as u64)
461                    } else {
462                        Err(status.into_raw())
463                    })
464                    .unwrap();
465
466                data
467            }
468            other => panic!("unexpected request: {other:?}"),
469        }
470    }
471
472    async fn fail_write_with_status(mut self, status: Status) {
473        self.send_on_open(Status::OK);
474
475        let length = self.expect_truncate().await;
476        // divide rounding up
477        let expected_write_calls = length.div_ceil(fio::MAX_BUF);
478        for _ in 0..(expected_write_calls - 1) {
479            self.handle_write(Status::OK).await;
480        }
481        self.handle_write(status).await;
482    }
483
484    /// Succeeds the open request, consumes the truncate request, the initial write calls, then
485    /// fails the final write indicating the written data was corrupt.
486    ///
487    /// # Panics
488    ///
489    /// Panics on error
490    pub async fn fail_write_with_corrupt(self) {
491        self.fail_write_with_status(Status::IO_DATA_INTEGRITY).await
492    }
493
494    /// Succeeds the open request, then returns a future that, when awaited, verifies the blob is
495    /// truncated, written, and closed with the given `expected` payload.
496    ///
497    /// # Panics
498    ///
499    /// Panics on error
500    pub fn expect_payload(mut self, expected: &[u8]) -> impl Future<Output = ()> + '_ {
501        self.send_on_open(Status::OK);
502
503        async move {
504            assert_eq!(self.expect_truncate().await, expected.len() as u64);
505
506            let mut rest = expected;
507            while !rest.is_empty() {
508                let expected_chunk = if rest.len() > fio::MAX_BUF as usize {
509                    &rest[..fio::MAX_BUF as usize]
510                } else {
511                    rest
512                };
513                assert_eq!(self.handle_write(Status::OK).await, expected_chunk);
514                rest = &rest[expected_chunk.len()..];
515            }
516
517            match self.stream.next().await {
518                Some(Ok(fio::FileRequest::Close { responder })) => {
519                    responder.send(Ok(())).unwrap();
520                }
521                other => panic!("unexpected request: {other:?}"),
522            }
523
524            self.expect_done().await;
525        }
526    }
527}