1use fidl::endpoints::RequestStream as _;
8use fuchsia_hash::Hash;
9use futures::{Future, StreamExt as _, TryStreamExt as _};
10use std::cmp::min;
11use std::collections::HashSet;
12use std::convert::TryInto as _;
13use vfs::attributes;
14use zx::{self as zx, AsHandleRef as _, HandleBased as _, Status};
15use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
16
17pub struct Mock {
21 pub(super) stream: fio::DirectoryRequestStream,
22}
23
24impl Mock {
25 pub async fn expect_open_blob(&mut self, merkle: Hash) -> Blob {
32 match self.stream.next().await {
33 Some(Ok(fio::DirectoryRequest::Open {
34 path,
35 flags,
36 options: _,
37 object,
38 control_handle: _,
39 })) => {
40 assert_eq!(path, merkle.to_string());
41 assert!(flags.contains(fio::PERM_READABLE));
42 assert!(
43 !flags.intersects(fio::Flags::PERM_WRITE_BYTES | fio::Flags::FLAG_MAYBE_CREATE)
44 );
45
46 let stream =
47 fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
48 .cast_stream();
49 Blob { stream }
50 }
51 other => panic!("unexpected request: {other:?}"),
52 }
53 }
54
55 pub async fn expect_create_blob(&mut self, merkle: Hash) -> Blob {
62 match self.stream.next().await {
63 Some(Ok(fio::DirectoryRequest::Open {
64 path,
65 flags,
66 options: _,
67 object,
68 control_handle: _,
69 })) => {
70 assert!(flags.contains(fio::PERM_WRITABLE | fio::Flags::FLAG_MAYBE_CREATE));
71 assert_eq!(path, delivery_blob::delivery_blob_path(merkle));
72 let stream =
73 fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
74 .cast_stream();
75 Blob { stream }
76 }
77 other => panic!("unexpected request: {other:?}"),
78 }
79 }
80
81 async fn handle_rewind(&mut self) {
82 match self.stream.next().await {
83 Some(Ok(fio::DirectoryRequest::Rewind { responder })) => {
84 responder.send(Status::OK.into_raw()).unwrap();
85 }
86 other => panic!("unexpected request: {other:?}"),
87 }
88 }
89
90 pub async fn expect_readdir(&mut self, entries: impl Iterator<Item = Hash>) {
97 self.handle_rewind().await;
99
100 const NAME_LEN: usize = 64;
101 #[repr(C, packed)]
102 struct Dirent {
103 ino: u64,
104 size: u8,
105 kind: u8,
106 name: [u8; NAME_LEN],
107 }
108
109 impl Dirent {
110 fn as_bytes(&self) -> &'_ [u8] {
111 let start = self as *const Self as *const u8;
112 unsafe { std::slice::from_raw_parts(start, std::mem::size_of::<Self>()) }
115 }
116 }
117
118 let mut entries_iter = entries.map(|hash| Dirent {
119 ino: fio::INO_UNKNOWN,
120 size: NAME_LEN as u8,
121 kind: fio::DirentType::File.into_primitive(),
122 name: hash.to_string().as_bytes().try_into().unwrap(),
123 });
124
125 loop {
126 match self.stream.try_next().await.unwrap() {
127 Some(fio::DirectoryRequest::ReadDirents { max_bytes, responder }) => {
128 let max_bytes = max_bytes as usize;
129 assert!(max_bytes >= std::mem::size_of::<Dirent>());
130
131 let mut buf = vec![];
132 while buf.len() + std::mem::size_of::<Dirent>() <= max_bytes {
133 match entries_iter.next() {
134 Some(need) => {
135 buf.extend(need.as_bytes());
136 }
137 None => break,
138 }
139 }
140
141 responder.send(Status::OK.into_raw(), &buf).unwrap();
142
143 if buf.is_empty() {
145 break;
146 }
147 }
148 Some(other) => panic!("unexpected request: {other:?}"),
149 None => panic!("unexpected stream termination"),
150 }
151 }
152 }
153
154 pub async fn expect_readable_missing_checks(&mut self, readable: &[Hash], missing: &[Hash]) {
162 let mut readable = readable.iter().copied().collect::<HashSet<_>>();
163 let mut missing = missing.iter().copied().collect::<HashSet<_>>();
164
165 while !(readable.is_empty() && missing.is_empty()) {
166 match self.stream.next().await {
167 Some(Ok(fio::DirectoryRequest::Open {
168 path,
169 flags,
170 options: _,
171 object,
172 control_handle: _,
173 })) => {
174 assert!(flags.contains(fio::PERM_READABLE));
175 assert!(!flags
176 .intersects(fio::Flags::PERM_WRITE_BYTES | fio::Flags::FLAG_MAYBE_CREATE));
177 let path: Hash = path.parse().unwrap();
178
179 let stream =
180 fio::NodeRequestStream::from_channel(fasync::Channel::from_channel(object))
181 .cast_stream();
182 let blob = Blob { stream };
183 if readable.remove(&path) {
184 blob.succeed_open_with_blob_readable().await;
185 } else if missing.remove(&path) {
186 blob.fail_open_with_not_found();
187 } else {
188 panic!("Unexpected blob existance check for {path}");
189 }
190 }
191 other => panic!("unexpected request: {other:?}"),
192 }
193 }
194 }
195
196 pub async fn expect_filter_to_missing_blobs_with_readable_missing_ids(
205 &mut self,
206 readable: &[Hash],
207 missing: &[Hash],
208 ) {
209 self.expect_readable_missing_checks(readable, missing).await;
210 }
211
212 pub async fn expect_done(mut self) {
218 match self.stream.next().await {
219 None => {}
220 Some(request) => panic!("unexpected request: {request:?}"),
221 }
222 }
223}
224
225pub struct Blob {
229 stream: fio::FileRequestStream,
230}
231
232impl Blob {
233 fn send_on_open_with_file_signals(&mut self, status: Status, signals: zx::Signals) {
234 let event = fidl::Event::create();
235 event.signal_handle(zx::Signals::NONE, signals).unwrap();
236
237 let info =
238 fio::NodeInfoDeprecated::File(fio::FileObject { event: Some(event), stream: None });
239 let () = self.stream.control_handle().send_on_open_(status.into_raw(), Some(info)).unwrap();
240 }
241
242 fn send_on_open(&mut self, status: Status) {
243 self.send_on_open_with_file_signals(status, zx::Signals::NONE);
244 }
245
246 fn send_on_open_with_readable(&mut self, status: Status) {
247 self.send_on_open_with_file_signals(status, zx::Signals::USER_0);
249 }
250
251 fn fail_open_with_error(mut self, status: Status) {
252 assert_ne!(status, Status::OK);
253 self.send_on_open(status);
254 }
255
256 pub fn fail_open_with_already_exists(self) {
262 self.fail_open_with_error(Status::ACCESS_DENIED);
263 }
264
265 pub fn fail_open_with_not_found(self) {
271 self.fail_open_with_error(Status::NOT_FOUND);
272 }
273
274 pub fn fail_open_with_io_error(self) {
280 self.fail_open_with_error(Status::IO);
281 }
282
283 pub async fn fail_open_with_not_readable(mut self) {
291 self.send_on_open(Status::OK);
292 self.expect_done().await;
293 }
294
295 pub async fn succeed_open_with_blob_readable(mut self) {
302 self.send_on_open_with_readable(Status::OK);
303 self.expect_done().await;
304 }
305
306 pub async fn expect_close(mut self) {
313 self.send_on_open_with_readable(Status::OK);
314
315 match self.stream.next().await {
316 None => {}
317 Some(Ok(fio::FileRequest::Close { responder })) => {
318 let _ = responder.send(Ok(()));
319 self.expect_done().await;
320 }
321 Some(other) => panic!("unexpected request: {other:?}"),
322 }
323 }
324
325 pub async fn expect_done(mut self) {
331 match self.stream.next().await {
332 None => {}
333 Some(request) => panic!("unexpected request: {request:?}"),
334 }
335 }
336
337 async fn handle_read(&mut self, data: &[u8]) -> usize {
338 match self.stream.next().await {
339 Some(Ok(fio::FileRequest::Read { count, responder })) => {
340 let count = min(count.try_into().unwrap(), data.len());
341 responder.send(Ok(&data[..count])).unwrap();
342 count
343 }
344 other => panic!("unexpected request: {other:?}"),
345 }
346 }
347
348 pub async fn expect_read(mut self, blob: &[u8]) {
354 self.send_on_open_with_readable(Status::OK);
355
356 let mut rest = blob;
357 while !rest.is_empty() {
358 let count = self.handle_read(rest).await;
359 rest = &rest[count..];
360 }
361
362 self.handle_read(rest).await;
364
365 match self.stream.next().await {
366 None => {}
367 Some(Ok(fio::FileRequest::Close { responder })) => {
368 let _ = responder.send(Ok(()));
369 }
370 Some(other) => panic!("unexpected request: {other:?}"),
371 }
372 }
373
374 pub async fn serve_contents(mut self, data: &[u8]) {
381 self.send_on_open_with_readable(Status::OK);
382
383 let mut pos: usize = 0;
384
385 loop {
386 match self.stream.next().await {
387 Some(Ok(fio::FileRequest::Read { count, responder })) => {
388 let avail = data.len() - pos;
389 let count = min(count.try_into().unwrap(), avail);
390 responder.send(Ok(&data[pos..pos + count])).unwrap();
391 pos += count;
392 }
393 Some(Ok(fio::FileRequest::ReadAt { count, offset, responder })) => {
394 let pos: usize = offset.try_into().unwrap();
395 let avail = data.len() - pos;
396 let count = min(count.try_into().unwrap(), avail);
397 responder.send(Ok(&data[pos..pos + count])).unwrap();
398 }
399 Some(Ok(fio::FileRequest::GetAttributes { query, responder })) => {
400 let attrs = attributes!(
401 query,
402 Mutable { creation_time: 0, modification_time: 0, mode: 0 },
403 Immutable {
404 protocols: fio::NodeProtocolKinds::FILE,
405 content_size: data.len() as u64,
406 storage_size: 0,
407 link_count: 0,
408 id: 0,
409 }
410 );
411 responder
412 .send(Ok((&attrs.mutable_attributes, &attrs.immutable_attributes)))
413 .unwrap();
414 }
415 Some(Ok(fio::FileRequest::Close { responder })) => {
416 let _ = responder.send(Ok(()));
417 return;
418 }
419 Some(Ok(fio::FileRequest::GetBackingMemory { flags, responder })) => {
420 assert!(flags.contains(fio::VmoFlags::READ));
421 assert!(!flags.contains(fio::VmoFlags::WRITE));
422 assert!(!flags.contains(fio::VmoFlags::EXECUTE));
423 let vmo = zx::Vmo::create(data.len() as u64).unwrap();
424 vmo.write(data, 0).unwrap();
425 let vmo = vmo
426 .replace_handle(
427 zx::Rights::READ
428 | zx::Rights::BASIC
429 | zx::Rights::MAP
430 | zx::Rights::GET_PROPERTY,
431 )
432 .unwrap();
433 responder.send(Ok(vmo)).unwrap();
434 }
435 None => {
436 return;
437 }
438 other => panic!("unexpected request: {other:?}"),
439 }
440 }
441 }
442
443 async fn handle_truncate(&mut self, status: Status) -> u64 {
444 match self.stream.next().await {
445 Some(Ok(fio::FileRequest::Resize { length, responder })) => {
446 responder
447 .send(if status == Status::OK { Ok(()) } else { Err(status.into_raw()) })
448 .unwrap();
449
450 length
451 }
452 other => panic!("unexpected request: {other:?}"),
453 }
454 }
455
456 async fn expect_truncate(&mut self) -> u64 {
457 self.handle_truncate(Status::OK).await
458 }
459
460 async fn handle_write(&mut self, status: Status) -> Vec<u8> {
461 match self.stream.next().await {
462 Some(Ok(fio::FileRequest::Write { data, responder })) => {
463 responder
464 .send(if status == Status::OK {
465 Ok(data.len() as u64)
466 } else {
467 Err(status.into_raw())
468 })
469 .unwrap();
470
471 data
472 }
473 other => panic!("unexpected request: {other:?}"),
474 }
475 }
476
477 async fn fail_write_with_status(mut self, status: Status) {
478 self.send_on_open(Status::OK);
479
480 let length = self.expect_truncate().await;
481 let expected_write_calls = length.div_ceil(fio::MAX_BUF);
483 for _ in 0..(expected_write_calls - 1) {
484 self.handle_write(Status::OK).await;
485 }
486 self.handle_write(status).await;
487 }
488
489 pub async fn fail_write_with_corrupt(self) {
496 self.fail_write_with_status(Status::IO_DATA_INTEGRITY).await
497 }
498
499 pub fn expect_payload(mut self, expected: &[u8]) -> impl Future<Output = ()> + '_ {
506 self.send_on_open(Status::OK);
507
508 async move {
509 assert_eq!(self.expect_truncate().await, expected.len() as u64);
510
511 let mut rest = expected;
512 while !rest.is_empty() {
513 let expected_chunk = if rest.len() > fio::MAX_BUF as usize {
514 &rest[..fio::MAX_BUF as usize]
515 } else {
516 rest
517 };
518 assert_eq!(self.handle_write(Status::OK).await, expected_chunk);
519 rest = &rest[expected_chunk.len()..];
520 }
521
522 match self.stream.next().await {
523 Some(Ok(fio::FileRequest::Close { responder })) => {
524 responder.send(Ok(())).unwrap();
525 }
526 other => panic!("unexpected request: {other:?}"),
527 }
528
529 self.expect_done().await;
530 }
531 }
532}