1use fidl::endpoints::RequestStream as _;
8use fuchsia_hash::Hash;
9use futures::{Future, StreamExt as _, TryStreamExt as _};
10use std::cmp::min;
11use std::collections::HashSet;
12use std::convert::TryInto as _;
13use zx::{self as zx, AsHandleRef as _, HandleBased as _, Status};
14use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
15
16pub struct Mock {
20 pub(super) stream: fio::DirectoryRequestStream,
21}
22
23impl Mock {
24 pub async fn expect_open_blob(&mut self, merkle: Hash) -> Blob {
31 match self.stream.next().await {
32 Some(Ok(fio::DirectoryRequest::Open {
33 path,
34 flags,
35 options: _,
36 object,
37 control_handle: _,
38 })) => {
39 assert_eq!(path, merkle.to_string());
40 assert!(flags.contains(fio::PERM_READABLE));
41 assert!(!flags.intersects(fio::Flags::PERM_WRITE | fio::Flags::FLAG_MAYBE_CREATE));
42
43 let stream =
44 fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
45 .cast_stream();
46 Blob { stream }
47 }
48 other => panic!("unexpected request: {other:?}"),
49 }
50 }
51
52 pub async fn expect_create_blob(&mut self, merkle: Hash) -> Blob {
59 match self.stream.next().await {
60 Some(Ok(fio::DirectoryRequest::Open {
61 path,
62 flags,
63 options: _,
64 object,
65 control_handle: _,
66 })) => {
67 assert!(flags.contains(fio::PERM_WRITABLE | fio::Flags::FLAG_MAYBE_CREATE));
68 assert_eq!(path, delivery_blob::delivery_blob_path(merkle));
69 let stream =
70 fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
71 .cast_stream();
72 Blob { stream }
73 }
74 other => panic!("unexpected request: {other:?}"),
75 }
76 }
77
78 async fn handle_rewind(&mut self) {
79 match self.stream.next().await {
80 Some(Ok(fio::DirectoryRequest::Rewind { responder })) => {
81 responder.send(Status::OK.into_raw()).unwrap();
82 }
83 other => panic!("unexpected request: {other:?}"),
84 }
85 }
86
87 pub async fn expect_readdir(&mut self, entries: impl Iterator<Item = Hash>) {
94 self.handle_rewind().await;
96
97 const NAME_LEN: usize = 64;
98 #[repr(C, packed)]
99 struct Dirent {
100 ino: u64,
101 size: u8,
102 kind: u8,
103 name: [u8; NAME_LEN],
104 }
105
106 impl Dirent {
107 fn as_bytes(&self) -> &'_ [u8] {
108 let start = self as *const Self as *const u8;
109 unsafe { std::slice::from_raw_parts(start, std::mem::size_of::<Self>()) }
112 }
113 }
114
115 let mut entries_iter = entries.map(|hash| Dirent {
116 ino: fio::INO_UNKNOWN,
117 size: NAME_LEN as u8,
118 kind: fio::DirentType::File.into_primitive(),
119 name: hash.to_string().as_bytes().try_into().unwrap(),
120 });
121
122 loop {
123 match self.stream.try_next().await.unwrap() {
124 Some(fio::DirectoryRequest::ReadDirents { max_bytes, responder }) => {
125 let max_bytes = max_bytes as usize;
126 assert!(max_bytes >= std::mem::size_of::<Dirent>());
127
128 let mut buf = vec![];
129 while buf.len() + std::mem::size_of::<Dirent>() <= max_bytes {
130 match entries_iter.next() {
131 Some(need) => {
132 buf.extend(need.as_bytes());
133 }
134 None => break,
135 }
136 }
137
138 responder.send(Status::OK.into_raw(), &buf).unwrap();
139
140 if buf.is_empty() {
142 break;
143 }
144 }
145 Some(other) => panic!("unexpected request: {other:?}"),
146 None => panic!("unexpected stream termination"),
147 }
148 }
149 }
150
151 pub async fn expect_readable_missing_checks(&mut self, readable: &[Hash], missing: &[Hash]) {
159 let mut readable = readable.iter().copied().collect::<HashSet<_>>();
160 let mut missing = missing.iter().copied().collect::<HashSet<_>>();
161
162 while !(readable.is_empty() && missing.is_empty()) {
163 match self.stream.next().await {
164 Some(Ok(fio::DirectoryRequest::Open {
165 path,
166 flags,
167 options: _,
168 object,
169 control_handle: _,
170 })) => {
171 assert!(flags.contains(fio::PERM_READABLE));
172 assert!(
173 !flags.intersects(fio::Flags::PERM_WRITE | fio::Flags::FLAG_MAYBE_CREATE)
174 );
175 let path: Hash = path.parse().unwrap();
176
177 let stream =
178 fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
179 .cast_stream();
180 let blob = Blob { stream };
181 if readable.remove(&path) {
182 blob.succeed_open_with_blob_readable().await;
183 } else if missing.remove(&path) {
184 blob.fail_open_with_not_found();
185 } else {
186 panic!("Unexpected blob existance check for {path}");
187 }
188 }
189 other => panic!("unexpected request: {other:?}"),
190 }
191 }
192 }
193
194 pub async fn expect_filter_to_missing_blobs_with_readable_missing_ids(
203 &mut self,
204 readable: &[Hash],
205 missing: &[Hash],
206 ) {
207 self.expect_readable_missing_checks(readable, missing).await;
208 }
209
210 pub async fn expect_done(mut self) {
216 match self.stream.next().await {
217 None => {}
218 Some(request) => panic!("unexpected request: {request:?}"),
219 }
220 }
221}
222
223pub struct Blob {
227 stream: fio::FileRequestStream,
228}
229
230impl Blob {
231 fn send_on_open_with_file_signals(&mut self, status: Status, signals: zx::Signals) {
232 let event = fidl::Event::create();
233 event.signal_handle(zx::Signals::NONE, signals).unwrap();
234
235 let info =
236 fio::NodeInfoDeprecated::File(fio::FileObject { event: Some(event), stream: None });
237 let () = self.stream.control_handle().send_on_open_(status.into_raw(), Some(info)).unwrap();
238 }
239
240 fn send_on_open(&mut self, status: Status) {
241 self.send_on_open_with_file_signals(status, zx::Signals::NONE);
242 }
243
244 fn send_on_open_with_readable(&mut self, status: Status) {
245 self.send_on_open_with_file_signals(status, zx::Signals::USER_0);
247 }
248
249 fn fail_open_with_error(mut self, status: Status) {
250 assert_ne!(status, Status::OK);
251 self.send_on_open(status);
252 }
253
254 pub fn fail_open_with_already_exists(self) {
260 self.fail_open_with_error(Status::ACCESS_DENIED);
261 }
262
263 pub fn fail_open_with_not_found(self) {
269 self.fail_open_with_error(Status::NOT_FOUND);
270 }
271
272 pub fn fail_open_with_io_error(self) {
278 self.fail_open_with_error(Status::IO);
279 }
280
281 pub async fn fail_open_with_not_readable(mut self) {
289 self.send_on_open(Status::OK);
290 self.expect_done().await;
291 }
292
293 pub async fn succeed_open_with_blob_readable(mut self) {
300 self.send_on_open_with_readable(Status::OK);
301 self.expect_done().await;
302 }
303
304 pub async fn expect_close(mut self) {
311 self.send_on_open_with_readable(Status::OK);
312
313 match self.stream.next().await {
314 None => {}
315 Some(Ok(fio::FileRequest::Close { responder })) => {
316 let _ = responder.send(Ok(()));
317 self.expect_done().await;
318 }
319 Some(other) => panic!("unexpected request: {other:?}"),
320 }
321 }
322
323 pub async fn expect_done(mut self) {
329 match self.stream.next().await {
330 None => {}
331 Some(request) => panic!("unexpected request: {request:?}"),
332 }
333 }
334
335 async fn handle_read(&mut self, data: &[u8]) -> usize {
336 match self.stream.next().await {
337 Some(Ok(fio::FileRequest::Read { count, responder })) => {
338 let count = min(count.try_into().unwrap(), data.len());
339 responder.send(Ok(&data[..count])).unwrap();
340 count
341 }
342 other => panic!("unexpected request: {other:?}"),
343 }
344 }
345
346 pub async fn expect_read(mut self, blob: &[u8]) {
352 self.send_on_open_with_readable(Status::OK);
353
354 let mut rest = blob;
355 while !rest.is_empty() {
356 let count = self.handle_read(rest).await;
357 rest = &rest[count..];
358 }
359
360 self.handle_read(rest).await;
362
363 match self.stream.next().await {
364 None => {}
365 Some(Ok(fio::FileRequest::Close { responder })) => {
366 let _ = responder.send(Ok(()));
367 }
368 Some(other) => panic!("unexpected request: {other:?}"),
369 }
370 }
371
372 pub async fn serve_contents(mut self, data: &[u8]) {
379 self.send_on_open_with_readable(Status::OK);
380
381 let mut pos: usize = 0;
382
383 loop {
384 match self.stream.next().await {
385 Some(Ok(fio::FileRequest::Read { count, responder })) => {
386 let avail = data.len() - pos;
387 let count = min(count.try_into().unwrap(), avail);
388 responder.send(Ok(&data[pos..pos + count])).unwrap();
389 pos += count;
390 }
391 Some(Ok(fio::FileRequest::ReadAt { count, offset, responder })) => {
392 let pos: usize = offset.try_into().unwrap();
393 let avail = data.len() - pos;
394 let count = min(count.try_into().unwrap(), avail);
395 responder.send(Ok(&data[pos..pos + count])).unwrap();
396 }
397 Some(Ok(fio::FileRequest::GetAttr { responder })) => {
398 let mut attr = fio::NodeAttributes {
399 mode: 0,
400 id: 0,
401 content_size: 0,
402 storage_size: 0,
403 link_count: 0,
404 creation_time: 0,
405 modification_time: 0,
406 };
407 attr.content_size = data.len().try_into().unwrap();
408 responder.send(Status::OK.into_raw(), &attr).unwrap();
409 }
410 Some(Ok(fio::FileRequest::Close { responder })) => {
411 let _ = responder.send(Ok(()));
412 return;
413 }
414 Some(Ok(fio::FileRequest::GetBackingMemory { flags, responder })) => {
415 assert!(flags.contains(fio::VmoFlags::READ));
416 assert!(!flags.contains(fio::VmoFlags::WRITE));
417 assert!(!flags.contains(fio::VmoFlags::EXECUTE));
418 let vmo = zx::Vmo::create(data.len() as u64).unwrap();
419 vmo.write(data, 0).unwrap();
420 let vmo = vmo
421 .replace_handle(
422 zx::Rights::READ
423 | zx::Rights::BASIC
424 | zx::Rights::MAP
425 | zx::Rights::GET_PROPERTY,
426 )
427 .unwrap();
428 responder.send(Ok(vmo)).unwrap();
429 }
430 None => {
431 return;
432 }
433 other => panic!("unexpected request: {other:?}"),
434 }
435 }
436 }
437
438 async fn handle_truncate(&mut self, status: Status) -> u64 {
439 match self.stream.next().await {
440 Some(Ok(fio::FileRequest::Resize { length, responder })) => {
441 responder
442 .send(if status == Status::OK { Ok(()) } else { Err(status.into_raw()) })
443 .unwrap();
444
445 length
446 }
447 other => panic!("unexpected request: {other:?}"),
448 }
449 }
450
451 async fn expect_truncate(&mut self) -> u64 {
452 self.handle_truncate(Status::OK).await
453 }
454
455 async fn handle_write(&mut self, status: Status) -> Vec<u8> {
456 match self.stream.next().await {
457 Some(Ok(fio::FileRequest::Write { data, responder })) => {
458 responder
459 .send(if status == Status::OK {
460 Ok(data.len() as u64)
461 } else {
462 Err(status.into_raw())
463 })
464 .unwrap();
465
466 data
467 }
468 other => panic!("unexpected request: {other:?}"),
469 }
470 }
471
472 async fn fail_write_with_status(mut self, status: Status) {
473 self.send_on_open(Status::OK);
474
475 let length = self.expect_truncate().await;
476 let expected_write_calls = length.div_ceil(fio::MAX_BUF);
478 for _ in 0..(expected_write_calls - 1) {
479 self.handle_write(Status::OK).await;
480 }
481 self.handle_write(status).await;
482 }
483
484 pub async fn fail_write_with_corrupt(self) {
491 self.fail_write_with_status(Status::IO_DATA_INTEGRITY).await
492 }
493
494 pub fn expect_payload(mut self, expected: &[u8]) -> impl Future<Output = ()> + '_ {
501 self.send_on_open(Status::OK);
502
503 async move {
504 assert_eq!(self.expect_truncate().await, expected.len() as u64);
505
506 let mut rest = expected;
507 while !rest.is_empty() {
508 let expected_chunk = if rest.len() > fio::MAX_BUF as usize {
509 &rest[..fio::MAX_BUF as usize]
510 } else {
511 rest
512 };
513 assert_eq!(self.handle_write(Status::OK).await, expected_chunk);
514 rest = &rest[expected_chunk.len()..];
515 }
516
517 match self.stream.next().await {
518 Some(Ok(fio::FileRequest::Close { responder })) => {
519 responder.send(Ok(())).unwrap();
520 }
521 other => panic!("unexpected request: {other:?}"),
522 }
523
524 self.expect_done().await;
525 }
526 }
527}