blobfs/
mock.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Mock implementation of blobfs for blobfs::Client.
6
7use fidl::endpoints::RequestStream as _;
8use fuchsia_hash::Hash;
9use futures::{Future, StreamExt as _, TryStreamExt as _};
10use std::cmp::min;
11use std::collections::HashSet;
12use std::convert::TryInto as _;
13use vfs::attributes;
14use zx::{self as zx, AsHandleRef as _, HandleBased as _, Status};
15use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
16
17/// A testing server implementation of /blob.
18///
19/// Mock does not handle requests until instructed to do so.
20pub struct Mock {
21    pub(super) stream: fio::DirectoryRequestStream,
22}
23
24impl Mock {
25    /// Consume the next directory request, verifying it is intended to read the blob identified
26    /// by `merkle`.  Returns a `Blob` representing the open blob file.
27    ///
28    /// # Panics
29    ///
30    /// Panics on error or assertion violation (unexpected requests or a mismatched open call)
31    pub async fn expect_open_blob(&mut self, merkle: Hash) -> Blob {
32        match self.stream.next().await {
33            Some(Ok(fio::DirectoryRequest::Open {
34                path,
35                flags,
36                options: _,
37                object,
38                control_handle: _,
39            })) => {
40                assert_eq!(path, merkle.to_string());
41                assert!(flags.contains(fio::PERM_READABLE));
42                assert!(
43                    !flags.intersects(fio::Flags::PERM_WRITE_BYTES | fio::Flags::FLAG_MAYBE_CREATE)
44                );
45
46                let stream =
47                    fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
48                        .cast_stream();
49                Blob { stream }
50            }
51            other => panic!("unexpected request: {other:?}"),
52        }
53    }
54
55    /// Consume the next directory request, verifying it is intended to create the blob identified
56    /// by `merkle`.  Returns a `Blob` representing the open blob file.
57    ///
58    /// # Panics
59    ///
60    /// Panics on error or assertion violation (unexpected requests or a mismatched open call)
61    pub async fn expect_create_blob(&mut self, merkle: Hash) -> Blob {
62        match self.stream.next().await {
63            Some(Ok(fio::DirectoryRequest::Open {
64                path,
65                flags,
66                options: _,
67                object,
68                control_handle: _,
69            })) => {
70                assert!(flags.contains(fio::PERM_WRITABLE | fio::Flags::FLAG_MAYBE_CREATE));
71                assert_eq!(path, delivery_blob::delivery_blob_path(merkle));
72                let stream =
73                    fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
74                        .cast_stream();
75                Blob { stream }
76            }
77            other => panic!("unexpected request: {other:?}"),
78        }
79    }
80
81    async fn handle_rewind(&mut self) {
82        match self.stream.next().await {
83            Some(Ok(fio::DirectoryRequest::Rewind { responder })) => {
84                responder.send(Status::OK.into_raw()).unwrap();
85            }
86            other => panic!("unexpected request: {other:?}"),
87        }
88    }
89
90    /// Consume directory requests, verifying they are requests to read directory entries.  Respond
91    /// with dirents constructed from the given entries.
92    ///
93    /// # Panics
94    ///
95    /// Panics on error or assertion violation (unexpected requests or not all entries are read)
96    pub async fn expect_readdir(&mut self, entries: impl Iterator<Item = Hash>) {
97        // fuchsia_fs::directory starts by resetting the directory channel's readdir position.
98        self.handle_rewind().await;
99
100        const NAME_LEN: usize = 64;
101        #[repr(C, packed)]
102        struct Dirent {
103            ino: u64,
104            size: u8,
105            kind: u8,
106            name: [u8; NAME_LEN],
107        }
108
109        impl Dirent {
110            fn as_bytes(&self) -> &'_ [u8] {
111                let start = self as *const Self as *const u8;
112                // Safe because the FIDL wire format for directory entries is
113                // defined to be the C packed struct representation used here.
114                unsafe { std::slice::from_raw_parts(start, std::mem::size_of::<Self>()) }
115            }
116        }
117
118        let mut entries_iter = entries.map(|hash| Dirent {
119            ino: fio::INO_UNKNOWN,
120            size: NAME_LEN as u8,
121            kind: fio::DirentType::File.into_primitive(),
122            name: hash.to_string().as_bytes().try_into().unwrap(),
123        });
124
125        loop {
126            match self.stream.try_next().await.unwrap() {
127                Some(fio::DirectoryRequest::ReadDirents { max_bytes, responder }) => {
128                    let max_bytes = max_bytes as usize;
129                    assert!(max_bytes >= std::mem::size_of::<Dirent>());
130
131                    let mut buf = vec![];
132                    while buf.len() + std::mem::size_of::<Dirent>() <= max_bytes {
133                        match entries_iter.next() {
134                            Some(need) => {
135                                buf.extend(need.as_bytes());
136                            }
137                            None => break,
138                        }
139                    }
140
141                    responder.send(Status::OK.into_raw(), &buf).unwrap();
142
143                    // Finish after providing an empty chunk.
144                    if buf.is_empty() {
145                        break;
146                    }
147                }
148                Some(other) => panic!("unexpected request: {other:?}"),
149                None => panic!("unexpected stream termination"),
150            }
151        }
152    }
153
154    /// Consume N directory requests, verifying they are intended to determine whether the blobs
155    /// specified `readable` and `missing` are readable or not, responding to the check based on
156    /// which collection the hash is in.
157    ///
158    /// # Panics
159    ///
160    /// Panics on error or assertion violation (unexpected requests, request for unspecified blob)
161    pub async fn expect_readable_missing_checks(&mut self, readable: &[Hash], missing: &[Hash]) {
162        let mut readable = readable.iter().copied().collect::<HashSet<_>>();
163        let mut missing = missing.iter().copied().collect::<HashSet<_>>();
164
165        while !(readable.is_empty() && missing.is_empty()) {
166            match self.stream.next().await {
167                Some(Ok(fio::DirectoryRequest::Open {
168                    path,
169                    flags,
170                    options: _,
171                    object,
172                    control_handle: _,
173                })) => {
174                    assert!(flags.contains(fio::PERM_READABLE));
175                    assert!(!flags
176                        .intersects(fio::Flags::PERM_WRITE_BYTES | fio::Flags::FLAG_MAYBE_CREATE));
177                    let path: Hash = path.parse().unwrap();
178
179                    let stream =
180                        fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
181                            .cast_stream();
182                    let blob = Blob { stream };
183                    if readable.remove(&path) {
184                        blob.succeed_open_with_blob_readable().await;
185                    } else if missing.remove(&path) {
186                        blob.fail_open_with_not_found();
187                    } else {
188                        panic!("Unexpected blob existance check for {path}");
189                    }
190                }
191                other => panic!("unexpected request: {other:?}"),
192            }
193        }
194    }
195
196    /// Expects and handles a call to [`Client::filter_to_missing_blobs`].
197    /// Verifies the call intends to determine whether the blobs specified in `readable` and
198    /// `missing` are readable or not, responding to the check based on which collection the hash is
199    /// in.
200    ///
201    /// # Panics
202    ///
203    /// Panics on error or assertion violation (unexpected requests, request for unspecified blob)
204    pub async fn expect_filter_to_missing_blobs_with_readable_missing_ids(
205        &mut self,
206        readable: &[Hash],
207        missing: &[Hash],
208    ) {
209        self.expect_readable_missing_checks(readable, missing).await;
210    }
211
212    /// Asserts that the request stream closes without any further requests.
213    ///
214    /// # Panics
215    ///
216    /// Panics on error
217    pub async fn expect_done(mut self) {
218        match self.stream.next().await {
219            None => {}
220            Some(request) => panic!("unexpected request: {request:?}"),
221        }
222    }
223}
224
225/// A testing server implementation of an open /blob/<merkle> file.
226///
227/// Blob does not send the OnOpen event or handle requests until instructed to do so.
228pub struct Blob {
229    stream: fio::FileRequestStream,
230}
231
232impl Blob {
233    fn send_on_open_with_file_signals(&mut self, status: Status, signals: zx::Signals) {
234        let event = fidl::Event::create();
235        event.signal_handle(zx::Signals::NONE, signals).unwrap();
236
237        let info =
238            fio::NodeInfoDeprecated::File(fio::FileObject { event: Some(event), stream: None });
239        let () = self.stream.control_handle().send_on_open_(status.into_raw(), Some(info)).unwrap();
240    }
241
242    fn send_on_open(&mut self, status: Status) {
243        self.send_on_open_with_file_signals(status, zx::Signals::NONE);
244    }
245
246    fn send_on_open_with_readable(&mut self, status: Status) {
247        // Send USER_0 signal to indicate that the blob is available.
248        self.send_on_open_with_file_signals(status, zx::Signals::USER_0);
249    }
250
251    fn fail_open_with_error(mut self, status: Status) {
252        assert_ne!(status, Status::OK);
253        self.send_on_open(status);
254    }
255
256    /// Fail the open request with an error indicating the blob already exists.
257    ///
258    /// # Panics
259    ///
260    /// Panics on error
261    pub fn fail_open_with_already_exists(self) {
262        self.fail_open_with_error(Status::ACCESS_DENIED);
263    }
264
265    /// Fail the open request with an error indicating the blob does not exist.
266    ///
267    /// # Panics
268    ///
269    /// Panics on error
270    pub fn fail_open_with_not_found(self) {
271        self.fail_open_with_error(Status::NOT_FOUND);
272    }
273
274    /// Fail the open request with a generic IO error.
275    ///
276    /// # Panics
277    ///
278    /// Panics on error
279    pub fn fail_open_with_io_error(self) {
280        self.fail_open_with_error(Status::IO);
281    }
282
283    /// Succeeds the open request, but indicate the blob is not yet readable by not asserting the
284    /// USER_0 signal on the file event handle, then asserts that the connection to the blob is
285    /// closed.
286    ///
287    /// # Panics
288    ///
289    /// Panics on error
290    pub async fn fail_open_with_not_readable(mut self) {
291        self.send_on_open(Status::OK);
292        self.expect_done().await;
293    }
294
295    /// Succeeds the open request, indicating that the blob is readable, then asserts that the
296    /// connection to the blob is closed.
297    ///
298    /// # Panics
299    ///
300    /// Panics on error
301    pub async fn succeed_open_with_blob_readable(mut self) {
302        self.send_on_open_with_readable(Status::OK);
303        self.expect_done().await;
304    }
305
306    /// Succeeds the open request, then verifies the blob is immediately closed (possibly after
307    /// handling a single Close request).
308    ///
309    /// # Panics
310    ///
311    /// Panics on error
312    pub async fn expect_close(mut self) {
313        self.send_on_open_with_readable(Status::OK);
314
315        match self.stream.next().await {
316            None => {}
317            Some(Ok(fio::FileRequest::Close { responder })) => {
318                let _ = responder.send(Ok(()));
319                self.expect_done().await;
320            }
321            Some(other) => panic!("unexpected request: {other:?}"),
322        }
323    }
324
325    /// Asserts that the request stream closes without any further requests.
326    ///
327    /// # Panics
328    ///
329    /// Panics on error
330    pub async fn expect_done(mut self) {
331        match self.stream.next().await {
332            None => {}
333            Some(request) => panic!("unexpected request: {request:?}"),
334        }
335    }
336
337    async fn handle_read(&mut self, data: &[u8]) -> usize {
338        match self.stream.next().await {
339            Some(Ok(fio::FileRequest::Read { count, responder })) => {
340                let count = min(count.try_into().unwrap(), data.len());
341                responder.send(Ok(&data[..count])).unwrap();
342                count
343            }
344            other => panic!("unexpected request: {other:?}"),
345        }
346    }
347
348    /// Succeeds the open request, then handle read request with the given blob data.
349    ///
350    /// # Panics
351    ///
352    /// Panics on error
353    pub async fn expect_read(mut self, blob: &[u8]) {
354        self.send_on_open_with_readable(Status::OK);
355
356        let mut rest = blob;
357        while !rest.is_empty() {
358            let count = self.handle_read(rest).await;
359            rest = &rest[count..];
360        }
361
362        // Handle one extra request with empty buffer to signal EOF.
363        self.handle_read(rest).await;
364
365        match self.stream.next().await {
366            None => {}
367            Some(Ok(fio::FileRequest::Close { responder })) => {
368                let _ = responder.send(Ok(()));
369            }
370            Some(other) => panic!("unexpected request: {other:?}"),
371        }
372    }
373
374    /// Succeeds the open request. Then handles get_attr, read, read_at, and possibly a final close
375    /// requests with the given blob data.
376    ///
377    /// # Panics
378    ///
379    /// Panics on error
380    pub async fn serve_contents(mut self, data: &[u8]) {
381        self.send_on_open_with_readable(Status::OK);
382
383        let mut pos: usize = 0;
384
385        loop {
386            match self.stream.next().await {
387                Some(Ok(fio::FileRequest::Read { count, responder })) => {
388                    let avail = data.len() - pos;
389                    let count = min(count.try_into().unwrap(), avail);
390                    responder.send(Ok(&data[pos..pos + count])).unwrap();
391                    pos += count;
392                }
393                Some(Ok(fio::FileRequest::ReadAt { count, offset, responder })) => {
394                    let pos: usize = offset.try_into().unwrap();
395                    let avail = data.len() - pos;
396                    let count = min(count.try_into().unwrap(), avail);
397                    responder.send(Ok(&data[pos..pos + count])).unwrap();
398                }
399                Some(Ok(fio::FileRequest::GetAttributes { query, responder })) => {
400                    let attrs = attributes!(
401                        query,
402                        Mutable { creation_time: 0, modification_time: 0, mode: 0 },
403                        Immutable {
404                            protocols: fio::NodeProtocolKinds::FILE,
405                            content_size: data.len() as u64,
406                            storage_size: 0,
407                            link_count: 0,
408                            id: 0,
409                        }
410                    );
411                    responder
412                        .send(Ok((&attrs.mutable_attributes, &attrs.immutable_attributes)))
413                        .unwrap();
414                }
415                Some(Ok(fio::FileRequest::Close { responder })) => {
416                    let _ = responder.send(Ok(()));
417                    return;
418                }
419                Some(Ok(fio::FileRequest::GetBackingMemory { flags, responder })) => {
420                    assert!(flags.contains(fio::VmoFlags::READ));
421                    assert!(!flags.contains(fio::VmoFlags::WRITE));
422                    assert!(!flags.contains(fio::VmoFlags::EXECUTE));
423                    let vmo = zx::Vmo::create(data.len() as u64).unwrap();
424                    vmo.write(data, 0).unwrap();
425                    let vmo = vmo
426                        .replace_handle(
427                            zx::Rights::READ
428                                | zx::Rights::BASIC
429                                | zx::Rights::MAP
430                                | zx::Rights::GET_PROPERTY,
431                        )
432                        .unwrap();
433                    responder.send(Ok(vmo)).unwrap();
434                }
435                None => {
436                    return;
437                }
438                other => panic!("unexpected request: {other:?}"),
439            }
440        }
441    }
442
443    async fn handle_truncate(&mut self, status: Status) -> u64 {
444        match self.stream.next().await {
445            Some(Ok(fio::FileRequest::Resize { length, responder })) => {
446                responder
447                    .send(if status == Status::OK { Ok(()) } else { Err(status.into_raw()) })
448                    .unwrap();
449
450                length
451            }
452            other => panic!("unexpected request: {other:?}"),
453        }
454    }
455
456    async fn expect_truncate(&mut self) -> u64 {
457        self.handle_truncate(Status::OK).await
458    }
459
460    async fn handle_write(&mut self, status: Status) -> Vec<u8> {
461        match self.stream.next().await {
462            Some(Ok(fio::FileRequest::Write { data, responder })) => {
463                responder
464                    .send(if status == Status::OK {
465                        Ok(data.len() as u64)
466                    } else {
467                        Err(status.into_raw())
468                    })
469                    .unwrap();
470
471                data
472            }
473            other => panic!("unexpected request: {other:?}"),
474        }
475    }
476
477    async fn fail_write_with_status(mut self, status: Status) {
478        self.send_on_open(Status::OK);
479
480        let length = self.expect_truncate().await;
481        // divide rounding up
482        let expected_write_calls = length.div_ceil(fio::MAX_BUF);
483        for _ in 0..(expected_write_calls - 1) {
484            self.handle_write(Status::OK).await;
485        }
486        self.handle_write(status).await;
487    }
488
489    /// Succeeds the open request, consumes the truncate request, the initial write calls, then
490    /// fails the final write indicating the written data was corrupt.
491    ///
492    /// # Panics
493    ///
494    /// Panics on error
495    pub async fn fail_write_with_corrupt(self) {
496        self.fail_write_with_status(Status::IO_DATA_INTEGRITY).await
497    }
498
499    /// Succeeds the open request, then returns a future that, when awaited, verifies the blob is
500    /// truncated, written, and closed with the given `expected` payload.
501    ///
502    /// # Panics
503    ///
504    /// Panics on error
505    pub fn expect_payload(mut self, expected: &[u8]) -> impl Future<Output = ()> + '_ {
506        self.send_on_open(Status::OK);
507
508        async move {
509            assert_eq!(self.expect_truncate().await, expected.len() as u64);
510
511            let mut rest = expected;
512            while !rest.is_empty() {
513                let expected_chunk = if rest.len() > fio::MAX_BUF as usize {
514                    &rest[..fio::MAX_BUF as usize]
515                } else {
516                    rest
517                };
518                assert_eq!(self.handle_write(Status::OK).await, expected_chunk);
519                rest = &rest[expected_chunk.len()..];
520            }
521
522            match self.stream.next().await {
523                Some(Ok(fio::FileRequest::Close { responder })) => {
524                    responder.send(Ok(())).unwrap();
525                }
526                other => panic!("unexpected request: {other:?}"),
527            }
528
529            self.expect_done().await;
530        }
531    }
532}