1use fidl::endpoints::RequestStream as _;
8use fuchsia_hash::Hash;
9use futures::{Future, StreamExt as _, TryStreamExt as _};
10use std::cmp::min;
11use std::collections::HashSet;
12use std::convert::TryInto as _;
13use zx::{self as zx, AsHandleRef as _, HandleBased as _, Status};
14use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
15
16pub struct Mock {
20 pub(super) stream: fio::DirectoryRequestStream,
21}
22
23impl Mock {
24 pub async fn expect_open_blob(&mut self, merkle: Hash) -> Blob {
31 match self.stream.next().await {
32 Some(Ok(fio::DirectoryRequest::Open {
33 path,
34 flags,
35 options: _,
36 object,
37 control_handle: _,
38 })) => {
39 assert_eq!(path, merkle.to_string());
40 assert!(flags.contains(fio::PERM_READABLE));
41 assert!(
42 !flags.intersects(fio::Flags::PERM_WRITE_BYTES | fio::Flags::FLAG_MAYBE_CREATE)
43 );
44
45 let stream =
46 fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
47 .cast_stream();
48 Blob { stream }
49 }
50 other => panic!("unexpected request: {other:?}"),
51 }
52 }
53
54 pub async fn expect_create_blob(&mut self, merkle: Hash) -> Blob {
61 match self.stream.next().await {
62 Some(Ok(fio::DirectoryRequest::Open {
63 path,
64 flags,
65 options: _,
66 object,
67 control_handle: _,
68 })) => {
69 assert!(flags.contains(fio::PERM_WRITABLE | fio::Flags::FLAG_MAYBE_CREATE));
70 assert_eq!(path, delivery_blob::delivery_blob_path(merkle));
71 let stream =
72 fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
73 .cast_stream();
74 Blob { stream }
75 }
76 other => panic!("unexpected request: {other:?}"),
77 }
78 }
79
80 async fn handle_rewind(&mut self) {
81 match self.stream.next().await {
82 Some(Ok(fio::DirectoryRequest::Rewind { responder })) => {
83 responder.send(Status::OK.into_raw()).unwrap();
84 }
85 other => panic!("unexpected request: {other:?}"),
86 }
87 }
88
89 pub async fn expect_readdir(&mut self, entries: impl Iterator<Item = Hash>) {
96 self.handle_rewind().await;
98
99 const NAME_LEN: usize = 64;
100 #[repr(C, packed)]
101 struct Dirent {
102 ino: u64,
103 size: u8,
104 kind: u8,
105 name: [u8; NAME_LEN],
106 }
107
108 impl Dirent {
109 fn as_bytes(&self) -> &'_ [u8] {
110 let start = self as *const Self as *const u8;
111 unsafe { std::slice::from_raw_parts(start, std::mem::size_of::<Self>()) }
114 }
115 }
116
117 let mut entries_iter = entries.map(|hash| Dirent {
118 ino: fio::INO_UNKNOWN,
119 size: NAME_LEN as u8,
120 kind: fio::DirentType::File.into_primitive(),
121 name: hash.to_string().as_bytes().try_into().unwrap(),
122 });
123
124 loop {
125 match self.stream.try_next().await.unwrap() {
126 Some(fio::DirectoryRequest::ReadDirents { max_bytes, responder }) => {
127 let max_bytes = max_bytes as usize;
128 assert!(max_bytes >= std::mem::size_of::<Dirent>());
129
130 let mut buf = vec![];
131 while buf.len() + std::mem::size_of::<Dirent>() <= max_bytes {
132 match entries_iter.next() {
133 Some(need) => {
134 buf.extend(need.as_bytes());
135 }
136 None => break,
137 }
138 }
139
140 responder.send(Status::OK.into_raw(), &buf).unwrap();
141
142 if buf.is_empty() {
144 break;
145 }
146 }
147 Some(other) => panic!("unexpected request: {other:?}"),
148 None => panic!("unexpected stream termination"),
149 }
150 }
151 }
152
153 pub async fn expect_readable_missing_checks(&mut self, readable: &[Hash], missing: &[Hash]) {
161 let mut readable = readable.iter().copied().collect::<HashSet<_>>();
162 let mut missing = missing.iter().copied().collect::<HashSet<_>>();
163
164 while !(readable.is_empty() && missing.is_empty()) {
165 match self.stream.next().await {
166 Some(Ok(fio::DirectoryRequest::Open {
167 path,
168 flags,
169 options: _,
170 object,
171 control_handle: _,
172 })) => {
173 assert!(flags.contains(fio::PERM_READABLE));
174 assert!(!flags
175 .intersects(fio::Flags::PERM_WRITE_BYTES | fio::Flags::FLAG_MAYBE_CREATE));
176 let path: Hash = path.parse().unwrap();
177
178 let stream =
179 fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
180 .cast_stream();
181 let blob = Blob { stream };
182 if readable.remove(&path) {
183 blob.succeed_open_with_blob_readable().await;
184 } else if missing.remove(&path) {
185 blob.fail_open_with_not_found();
186 } else {
187 panic!("Unexpected blob existance check for {path}");
188 }
189 }
190 other => panic!("unexpected request: {other:?}"),
191 }
192 }
193 }
194
195 pub async fn expect_filter_to_missing_blobs_with_readable_missing_ids(
204 &mut self,
205 readable: &[Hash],
206 missing: &[Hash],
207 ) {
208 self.expect_readable_missing_checks(readable, missing).await;
209 }
210
211 pub async fn expect_done(mut self) {
217 match self.stream.next().await {
218 None => {}
219 Some(request) => panic!("unexpected request: {request:?}"),
220 }
221 }
222}
223
224pub struct Blob {
228 stream: fio::FileRequestStream,
229}
230
231impl Blob {
232 fn send_on_open_with_file_signals(&mut self, status: Status, signals: zx::Signals) {
233 let event = fidl::Event::create();
234 event.signal_handle(zx::Signals::NONE, signals).unwrap();
235
236 let info =
237 fio::NodeInfoDeprecated::File(fio::FileObject { event: Some(event), stream: None });
238 let () = self.stream.control_handle().send_on_open_(status.into_raw(), Some(info)).unwrap();
239 }
240
241 fn send_on_open(&mut self, status: Status) {
242 self.send_on_open_with_file_signals(status, zx::Signals::NONE);
243 }
244
245 fn send_on_open_with_readable(&mut self, status: Status) {
246 self.send_on_open_with_file_signals(status, zx::Signals::USER_0);
248 }
249
250 fn fail_open_with_error(mut self, status: Status) {
251 assert_ne!(status, Status::OK);
252 self.send_on_open(status);
253 }
254
255 pub fn fail_open_with_already_exists(self) {
261 self.fail_open_with_error(Status::ACCESS_DENIED);
262 }
263
264 pub fn fail_open_with_not_found(self) {
270 self.fail_open_with_error(Status::NOT_FOUND);
271 }
272
273 pub fn fail_open_with_io_error(self) {
279 self.fail_open_with_error(Status::IO);
280 }
281
282 pub async fn fail_open_with_not_readable(mut self) {
290 self.send_on_open(Status::OK);
291 self.expect_done().await;
292 }
293
294 pub async fn succeed_open_with_blob_readable(mut self) {
301 self.send_on_open_with_readable(Status::OK);
302 self.expect_done().await;
303 }
304
305 pub async fn expect_close(mut self) {
312 self.send_on_open_with_readable(Status::OK);
313
314 match self.stream.next().await {
315 None => {}
316 Some(Ok(fio::FileRequest::Close { responder })) => {
317 let _ = responder.send(Ok(()));
318 self.expect_done().await;
319 }
320 Some(other) => panic!("unexpected request: {other:?}"),
321 }
322 }
323
324 pub async fn expect_done(mut self) {
330 match self.stream.next().await {
331 None => {}
332 Some(request) => panic!("unexpected request: {request:?}"),
333 }
334 }
335
336 async fn handle_read(&mut self, data: &[u8]) -> usize {
337 match self.stream.next().await {
338 Some(Ok(fio::FileRequest::Read { count, responder })) => {
339 let count = min(count.try_into().unwrap(), data.len());
340 responder.send(Ok(&data[..count])).unwrap();
341 count
342 }
343 other => panic!("unexpected request: {other:?}"),
344 }
345 }
346
347 pub async fn expect_read(mut self, blob: &[u8]) {
353 self.send_on_open_with_readable(Status::OK);
354
355 let mut rest = blob;
356 while !rest.is_empty() {
357 let count = self.handle_read(rest).await;
358 rest = &rest[count..];
359 }
360
361 self.handle_read(rest).await;
363
364 match self.stream.next().await {
365 None => {}
366 Some(Ok(fio::FileRequest::Close { responder })) => {
367 let _ = responder.send(Ok(()));
368 }
369 Some(other) => panic!("unexpected request: {other:?}"),
370 }
371 }
372
373 pub async fn serve_contents(mut self, data: &[u8]) {
380 self.send_on_open_with_readable(Status::OK);
381
382 let mut pos: usize = 0;
383
384 loop {
385 match self.stream.next().await {
386 Some(Ok(fio::FileRequest::Read { count, responder })) => {
387 let avail = data.len() - pos;
388 let count = min(count.try_into().unwrap(), avail);
389 responder.send(Ok(&data[pos..pos + count])).unwrap();
390 pos += count;
391 }
392 Some(Ok(fio::FileRequest::ReadAt { count, offset, responder })) => {
393 let pos: usize = offset.try_into().unwrap();
394 let avail = data.len() - pos;
395 let count = min(count.try_into().unwrap(), avail);
396 responder.send(Ok(&data[pos..pos + count])).unwrap();
397 }
398 Some(Ok(fio::FileRequest::GetAttr { responder })) => {
399 let mut attr = fio::NodeAttributes {
400 mode: 0,
401 id: 0,
402 content_size: 0,
403 storage_size: 0,
404 link_count: 0,
405 creation_time: 0,
406 modification_time: 0,
407 };
408 attr.content_size = data.len().try_into().unwrap();
409 responder.send(Status::OK.into_raw(), &attr).unwrap();
410 }
411 Some(Ok(fio::FileRequest::Close { responder })) => {
412 let _ = responder.send(Ok(()));
413 return;
414 }
415 Some(Ok(fio::FileRequest::GetBackingMemory { flags, responder })) => {
416 assert!(flags.contains(fio::VmoFlags::READ));
417 assert!(!flags.contains(fio::VmoFlags::WRITE));
418 assert!(!flags.contains(fio::VmoFlags::EXECUTE));
419 let vmo = zx::Vmo::create(data.len() as u64).unwrap();
420 vmo.write(data, 0).unwrap();
421 let vmo = vmo
422 .replace_handle(
423 zx::Rights::READ
424 | zx::Rights::BASIC
425 | zx::Rights::MAP
426 | zx::Rights::GET_PROPERTY,
427 )
428 .unwrap();
429 responder.send(Ok(vmo)).unwrap();
430 }
431 None => {
432 return;
433 }
434 other => panic!("unexpected request: {other:?}"),
435 }
436 }
437 }
438
439 async fn handle_truncate(&mut self, status: Status) -> u64 {
440 match self.stream.next().await {
441 Some(Ok(fio::FileRequest::Resize { length, responder })) => {
442 responder
443 .send(if status == Status::OK { Ok(()) } else { Err(status.into_raw()) })
444 .unwrap();
445
446 length
447 }
448 other => panic!("unexpected request: {other:?}"),
449 }
450 }
451
452 async fn expect_truncate(&mut self) -> u64 {
453 self.handle_truncate(Status::OK).await
454 }
455
456 async fn handle_write(&mut self, status: Status) -> Vec<u8> {
457 match self.stream.next().await {
458 Some(Ok(fio::FileRequest::Write { data, responder })) => {
459 responder
460 .send(if status == Status::OK {
461 Ok(data.len() as u64)
462 } else {
463 Err(status.into_raw())
464 })
465 .unwrap();
466
467 data
468 }
469 other => panic!("unexpected request: {other:?}"),
470 }
471 }
472
473 async fn fail_write_with_status(mut self, status: Status) {
474 self.send_on_open(Status::OK);
475
476 let length = self.expect_truncate().await;
477 let expected_write_calls = length.div_ceil(fio::MAX_BUF);
479 for _ in 0..(expected_write_calls - 1) {
480 self.handle_write(Status::OK).await;
481 }
482 self.handle_write(status).await;
483 }
484
485 pub async fn fail_write_with_corrupt(self) {
492 self.fail_write_with_status(Status::IO_DATA_INTEGRITY).await
493 }
494
495 pub fn expect_payload(mut self, expected: &[u8]) -> impl Future<Output = ()> + '_ {
502 self.send_on_open(Status::OK);
503
504 async move {
505 assert_eq!(self.expect_truncate().await, expected.len() as u64);
506
507 let mut rest = expected;
508 while !rest.is_empty() {
509 let expected_chunk = if rest.len() > fio::MAX_BUF as usize {
510 &rest[..fio::MAX_BUF as usize]
511 } else {
512 rest
513 };
514 assert_eq!(self.handle_write(Status::OK).await, expected_chunk);
515 rest = &rest[expected_chunk.len()..];
516 }
517
518 match self.stream.next().await {
519 Some(Ok(fio::FileRequest::Close { responder })) => {
520 responder.send(Ok(())).unwrap();
521 }
522 other => panic!("unexpected request: {other:?}"),
523 }
524
525 self.expect_done().await;
526 }
527 }
528}