1#![deny(missing_docs)]
8
9use anyhow::{format_err, Context as _, Error};
10use fidl::endpoints::{
11 DiscoverableProtocolMarker, FromClient, MemberOpener, ProtocolMarker, ServiceMarker,
12 ServiceProxy,
13};
14use fidl_fuchsia_component::{RealmMarker, RealmProxy};
15use fidl_fuchsia_component_decl::ChildRef;
16use fidl_fuchsia_io as fio;
17use fuchsia_component_directory::{open_directory_async, AsRefDirectory};
18use fuchsia_fs::directory::{WatchEvent, Watcher};
19use futures::stream::FusedStream;
20use futures::{Stream, StreamExt};
21use pin_project::pin_project;
22use std::borrow::Borrow;
23use std::marker::PhantomData;
24use std::pin::pin;
25use std::task::Poll;
26
27pub trait Connect: Sized + FromClient<Protocol: DiscoverableProtocolMarker> {
29 fn connect() -> Result<Self, Error> {
31 Self::connect_at(SVC_DIR)
32 }
33
34 fn connect_at(service_prefix: impl AsRef<str>) -> Result<Self, Error> {
36 let (client, server_end) = fidl::endpoints::create_endpoints::<Self::Protocol>();
37 let () = connect_channel_to_protocol_at::<Self::Protocol>(
38 server_end.into_channel(),
39 service_prefix.as_ref(),
40 )?;
41 Ok(Self::from_client(client))
42 }
43
44 fn connect_at_dir_svc(directory: &impl AsRefDirectory) -> Result<Self, Error> {
46 let protocol_path = format!("{}/{}", SVC_DIR, Self::Protocol::PROTOCOL_NAME);
47 Self::connect_at_dir_root_with_name(directory, &protocol_path)
48 }
49
50 fn connect_at_dir_root(directory: &impl AsRefDirectory) -> Result<Self, Error> {
52 Self::connect_at_dir_root_with_name(directory, Self::Protocol::PROTOCOL_NAME)
53 }
54
55 fn connect_at_dir_root_with_name(
57 directory: &impl AsRefDirectory,
58 filename: &str,
59 ) -> Result<Self, Error> {
60 let (client, server) = fidl::endpoints::create_endpoints::<Self::Protocol>();
61 directory.as_ref_directory().open(
62 filename,
63 fio::Flags::PROTOCOL_SERVICE,
64 server.into_channel().into(),
65 )?;
66 Ok(Self::from_client(client))
67 }
68}
69
70impl<T: FromClient<Protocol: DiscoverableProtocolMarker>> Connect for T {}
71
72pub mod connect {
78 use super::*;
79
80 pub fn connect_to_protocol<T: Connect>() -> Result<T, Error> {
82 T::connect()
83 }
84
85 pub fn connect_to_protocol_at<T: Connect>(service_prefix: impl AsRef<str>) -> Result<T, Error> {
87 T::connect_at(service_prefix)
88 }
89
90 pub fn connect_to_protocol_at_dir_svc<T: Connect>(
92 directory: &impl AsRefDirectory,
93 ) -> Result<T, Error> {
94 T::connect_at_dir_svc(directory)
95 }
96
97 pub fn connect_to_protocol_at_dir_root<T: Connect>(
99 directory: &impl AsRefDirectory,
100 ) -> Result<T, Error> {
101 T::connect_at_dir_root(directory)
102 }
103
104 pub fn connect_to_named_protocol_at_dir_root<T: Connect>(
106 directory: &impl AsRefDirectory,
107 filename: &str,
108 ) -> Result<T, Error> {
109 T::connect_at_dir_root_with_name(directory, filename)
110 }
111}
112
113pub const SVC_DIR: &'static str = "/svc";
115
116pub struct ProtocolConnector<D: Borrow<fio::DirectoryProxy>, P: DiscoverableProtocolMarker> {
118 svc_dir: D,
119 _svc_marker: PhantomData<P>,
120}
121
122impl<D: Borrow<fio::DirectoryProxy>, P: DiscoverableProtocolMarker> ProtocolConnector<D, P> {
123 fn new(svc_dir: D) -> ProtocolConnector<D, P> {
125 ProtocolConnector { svc_dir, _svc_marker: PhantomData }
126 }
127
128 pub async fn exists(&self) -> Result<bool, Error> {
133 match fuchsia_fs::directory::dir_contains(self.svc_dir.borrow(), P::PROTOCOL_NAME).await {
134 Ok(v) => Ok(v),
135 Err(fuchsia_fs::directory::EnumerateError::Fidl(
138 _,
139 fidl::Error::ClientChannelClosed { status, .. },
140 )) if status == zx::Status::PEER_CLOSED => Ok(false),
141 Err(e) => Err(Error::new(e).context("error checking for service entry in directory")),
142 }
143 }
144
145 pub fn connect_with(self, server_end: zx::Channel) -> Result<(), Error> {
150 #[cfg(fuchsia_api_level_at_least = "27")]
151 return self
152 .svc_dir
153 .borrow()
154 .open(
155 P::PROTOCOL_NAME,
156 fio::Flags::PROTOCOL_SERVICE,
157 &fio::Options::default(),
158 server_end.into(),
159 )
160 .context("error connecting to protocol");
161 #[cfg(not(fuchsia_api_level_at_least = "27"))]
162 return self
163 .svc_dir
164 .borrow()
165 .open3(
166 P::PROTOCOL_NAME,
167 fio::Flags::PROTOCOL_SERVICE,
168 &fio::Options::default(),
169 server_end.into(),
170 )
171 .context("error connecting to protocol");
172 }
173
174 pub fn connect(self) -> Result<P::Proxy, Error> {
179 let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
180 let () = self
181 .connect_with(server_end.into_channel())
182 .context("error connecting with server channel")?;
183 Ok(proxy)
184 }
185}
186
187pub fn clone_namespace_svc() -> Result<fio::DirectoryProxy, Error> {
189 fuchsia_fs::directory::open_in_namespace(SVC_DIR, fio::Flags::empty())
190 .context("error opening svc directory")
191}
192
193pub fn new_protocol_connector<P: DiscoverableProtocolMarker>(
196) -> Result<ProtocolConnector<fio::DirectoryProxy, P>, Error> {
197 new_protocol_connector_at::<P>(SVC_DIR)
198}
199
200pub fn new_protocol_connector_at<P: DiscoverableProtocolMarker>(
205 service_directory_path: &str,
206) -> Result<ProtocolConnector<fio::DirectoryProxy, P>, Error> {
207 let dir = fuchsia_fs::directory::open_in_namespace(service_directory_path, fio::Flags::empty())
208 .context("error opening service directory")?;
209
210 Ok(ProtocolConnector::new(dir))
211}
212
213pub fn new_protocol_connector_in_dir<P: DiscoverableProtocolMarker>(
215 dir: &fio::DirectoryProxy,
216) -> ProtocolConnector<&fio::DirectoryProxy, P> {
217 ProtocolConnector::new(dir)
218}
219
220pub fn connect_channel_to_protocol<P: DiscoverableProtocolMarker>(
222 server_end: zx::Channel,
223) -> Result<(), Error> {
224 connect_channel_to_protocol_at::<P>(server_end, SVC_DIR)
225}
226
227pub fn connect_channel_to_protocol_at<P: DiscoverableProtocolMarker>(
229 server_end: zx::Channel,
230 service_directory_path: &str,
231) -> Result<(), Error> {
232 let protocol_path = format!("{}/{}", service_directory_path, P::PROTOCOL_NAME);
233 connect_channel_to_protocol_at_path(server_end, &protocol_path)
234}
235
236pub fn connect_channel_to_protocol_at_path(
238 server_end: zx::Channel,
239 protocol_path: &str,
240) -> Result<(), Error> {
241 fdio::service_connect(&protocol_path, server_end)
242 .with_context(|| format!("Error connecting to protocol path: {}", protocol_path))
243}
244
245pub fn connect_to_protocol<P: DiscoverableProtocolMarker>() -> Result<P::Proxy, Error> {
247 connect_to_protocol_at::<P>(SVC_DIR)
248}
249
250pub fn connect_to_protocol_sync<P: DiscoverableProtocolMarker>(
256) -> Result<P::SynchronousProxy, Error> {
257 connect_to_protocol_sync_at::<P>(SVC_DIR)
258}
259
260pub fn connect_to_protocol_at<P: DiscoverableProtocolMarker>(
262 service_prefix: impl AsRef<str>,
263) -> Result<P::Proxy, Error> {
264 let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
265 let () =
266 connect_channel_to_protocol_at::<P>(server_end.into_channel(), service_prefix.as_ref())?;
267 Ok(proxy)
268}
269
270pub fn connect_to_protocol_sync_at<P: DiscoverableProtocolMarker>(
276 service_prefix: impl AsRef<str>,
277) -> Result<P::SynchronousProxy, Error> {
278 let (proxy, server_end) = fidl::endpoints::create_sync_proxy::<P>();
279 let () =
280 connect_channel_to_protocol_at::<P>(server_end.into_channel(), service_prefix.as_ref())?;
281 Ok(proxy)
282}
283
284pub fn connect_to_protocol_at_path<P: ProtocolMarker>(
286 protocol_path: impl AsRef<str>,
287) -> Result<P::Proxy, Error> {
288 let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
289 let () =
290 connect_channel_to_protocol_at_path(server_end.into_channel(), protocol_path.as_ref())?;
291 Ok(proxy)
292}
293
294pub fn connect_to_protocol_at_dir_root<P: DiscoverableProtocolMarker>(
296 directory: &impl AsRefDirectory,
297) -> Result<P::Proxy, Error> {
298 connect_to_named_protocol_at_dir_root::<P>(directory, P::PROTOCOL_NAME)
299}
300
301pub fn connect_to_named_protocol_at_dir_root<P: ProtocolMarker>(
303 directory: &impl AsRefDirectory,
304 filename: &str,
305) -> Result<P::Proxy, Error> {
306 let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
307 directory.as_ref_directory().open(
308 filename,
309 fio::Flags::PROTOCOL_SERVICE,
310 server_end.into_channel().into(),
311 )?;
312 Ok(proxy)
313}
314
315pub fn connect_to_protocol_at_dir_svc<P: DiscoverableProtocolMarker>(
317 directory: &impl AsRefDirectory,
318) -> Result<P::Proxy, Error> {
319 let protocol_path = format!("{}/{}", SVC_DIR, P::PROTOCOL_NAME);
320 connect_to_named_protocol_at_dir_root::<P>(directory, &protocol_path)
321}
322
323pub struct ServiceInstanceDirectory(pub fio::DirectoryProxy, pub String);
326
327impl MemberOpener for ServiceInstanceDirectory {
328 fn open_member(&self, member: &str, server_end: zx::Channel) -> Result<(), fidl::Error> {
329 let Self(directory, _) = self;
330 #[cfg(fuchsia_api_level_at_least = "27")]
331 return directory.open(
332 member,
333 fio::Flags::PROTOCOL_SERVICE,
334 &fio::Options::default(),
335 server_end,
336 );
337 #[cfg(not(fuchsia_api_level_at_least = "27"))]
338 return directory.open3(
339 member,
340 fio::Flags::PROTOCOL_SERVICE,
341 &fio::Options::default(),
342 server_end,
343 );
344 }
345 fn instance_name(&self) -> &str {
346 let Self(_, instance_name) = self;
347 return &instance_name;
348 }
349}
350
351struct ServiceInstance<S> {
354 name: String,
356 service: Service<S>,
357}
358
359impl<S: ServiceMarker> MemberOpener for ServiceInstance<S> {
360 fn open_member(&self, member: &str, server_end: zx::Channel) -> Result<(), fidl::Error> {
361 self.service.connect_to_instance_member_with_channel(&self.name, member, server_end)
362 }
363 fn instance_name(&self) -> &str {
364 return &self.name;
365 }
366}
367
368pub struct Service<S> {
370 dir: fio::DirectoryProxy,
371 _marker: S,
372}
373
374impl<S: Clone> Clone for Service<S> {
375 fn clone(&self) -> Self {
376 Self { dir: Clone::clone(&self.dir), _marker: self._marker.clone() }
377 }
378}
379
380impl<S> From<fio::DirectoryProxy> for Service<S>
383where
384 S: Default,
385{
386 fn from(dir: fio::DirectoryProxy) -> Self {
387 Self { dir, _marker: S::default() }
388 }
389}
390
391impl<S: ServiceMarker> Service<S> {
392 pub fn from_service_dir_proxy(dir: fio::DirectoryProxy, _marker: S) -> Self {
395 Self { dir, _marker }
396 }
397
398 pub fn open(marker: S) -> Result<Self, Error> {
400 Ok(Self::from_service_dir_proxy(open_service::<S>()?, marker))
401 }
402
403 pub fn open_at(service_name: impl AsRef<str>, marker: S) -> Result<Self, Error> {
405 Ok(Self::from_service_dir_proxy(open_service_at(service_name)?, marker))
406 }
407
408 pub fn open_from_dir(svc_dir: impl AsRefDirectory, marker: S) -> Result<Self, Error> {
410 let dir = open_directory_async(&svc_dir, S::SERVICE_NAME, fio::Rights::empty())?;
411 Ok(Self::from_service_dir_proxy(dir, marker))
412 }
413
414 pub fn open_from_dir_prefix(
417 dir: impl AsRefDirectory,
418 prefix: impl AsRef<str>,
419 marker: S,
420 ) -> Result<Self, Error> {
421 let prefix = prefix.as_ref();
422 let service_path = format!("{prefix}/{}", S::SERVICE_NAME);
423 let service_path = service_path.strip_prefix('/').unwrap_or_else(|| service_path.as_ref());
427 let dir = open_directory_async(&dir, &service_path, fio::Rights::empty())?;
428 Ok(Self::from_service_dir_proxy(dir, marker))
429 }
430
431 pub fn connect_to_instance(&self, name: impl AsRef<str>) -> Result<S::Proxy, Error> {
435 let directory_proxy = fuchsia_fs::directory::open_directory_async(
436 &self.dir,
437 name.as_ref(),
438 fio::Flags::empty(),
439 )?;
440 Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
441 directory_proxy,
442 name.as_ref().to_string(),
443 ))))
444 }
445
446 fn connect_to_instance_member_with_channel(
450 &self,
451 instance: impl AsRef<str>,
452 member: impl AsRef<str>,
453 server_end: zx::Channel,
454 ) -> Result<(), fidl::Error> {
455 let path = format!("{}/{}", instance.as_ref(), member.as_ref());
456 #[cfg(fuchsia_api_level_at_least = "27")]
457 return self.dir.open(
458 &path,
459 fio::Flags::PROTOCOL_SERVICE,
460 &fio::Options::default(),
461 server_end,
462 );
463 #[cfg(not(fuchsia_api_level_at_least = "27"))]
464 return self.dir.open3(
465 &path,
466 fio::Flags::PROTOCOL_SERVICE,
467 &fio::Options::default(),
468 server_end,
469 );
470 }
471
472 pub async fn watch(self) -> Result<ServiceInstanceStream<S>, Error> {
475 let watcher = Watcher::new(&self.dir).await?;
476 let finished = false;
477 Ok(ServiceInstanceStream { service: self, watcher, finished })
478 }
479
480 pub async fn watch_for_any(self) -> Result<S::Proxy, Error> {
482 self.watch()
483 .await?
484 .next()
485 .await
486 .context("No instances found before service directory was removed")?
487 }
488
489 pub async fn enumerate(self) -> Result<Vec<S::Proxy>, Error> {
491 let instances: Vec<S::Proxy> = fuchsia_fs::directory::readdir(&self.dir)
492 .await?
493 .into_iter()
494 .map(|dirent| {
495 S::Proxy::from_member_opener(Box::new(ServiceInstance {
496 service: self.clone(),
497 name: dirent.name,
498 }))
499 })
500 .collect();
501 Ok(instances)
502 }
503}
504
505#[pin_project]
511pub struct ServiceInstanceStream<S: Clone> {
512 service: Service<S>,
513 watcher: Watcher,
514 finished: bool,
515}
516
517impl<S: ServiceMarker> Stream for ServiceInstanceStream<S> {
518 type Item = Result<S::Proxy, Error>;
519
520 fn poll_next(
521 self: std::pin::Pin<&mut Self>,
522 cx: &mut std::task::Context<'_>,
523 ) -> Poll<Option<Self::Item>> {
524 let this = self.project();
525 use Poll::*;
526 if *this.finished {
527 return Poll::Ready(None);
528 }
529 while let Ready(next) = this.watcher.poll_next_unpin(cx) {
532 match next {
533 Some(Ok(state)) => match state.event {
534 WatchEvent::DELETED => {
535 *this.finished = true;
536 return Ready(None);
537 }
538 WatchEvent::ADD_FILE | WatchEvent::EXISTING => {
539 let filename = state.filename.to_str().unwrap();
540 if filename != "." {
541 let proxy = S::Proxy::from_member_opener(Box::new(ServiceInstance {
542 service: this.service.clone(),
543 name: filename.to_owned(),
544 }));
545 return Ready(Some(Ok(proxy)));
546 }
547 }
548 _ => {}
549 },
550 Some(Err(err)) => {
551 *this.finished = true;
552 return Ready(Some(Err(err.into())));
553 }
554 None => {
555 *this.finished = true;
556 return Ready(None);
557 }
558 }
559 }
560 Pending
561 }
562}
563
564impl<S: ServiceMarker> FusedStream for ServiceInstanceStream<S> {
565 fn is_terminated(&self) -> bool {
566 self.finished
567 }
568}
569
570pub fn connect_to_service_instance<S: ServiceMarker>(instance: &str) -> Result<S::Proxy, Error> {
577 let service_path = format!("{}/{}/{}", SVC_DIR, S::SERVICE_NAME, instance);
578 let directory_proxy =
579 fuchsia_fs::directory::open_in_namespace(&service_path, fio::Flags::empty())?;
580 Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
581 directory_proxy,
582 instance.to_string(),
583 ))))
584}
585
586pub fn connect_to_service_instance_at_dir<S: ServiceMarker>(
592 directory: &fio::DirectoryProxy,
593 instance: &str,
594) -> Result<S::Proxy, Error> {
595 let service_path = format!("{}/{}", S::SERVICE_NAME, instance);
596 let directory_proxy =
597 fuchsia_fs::directory::open_directory_async(directory, &service_path, fio::Flags::empty())?;
598 Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
599 directory_proxy,
600 instance.to_string(),
601 ))))
602}
603
604pub fn connect_to_service_instance_at_dir_svc<S: ServiceMarker>(
606 directory: &impl AsRefDirectory,
607 instance: impl AsRef<str>,
608) -> Result<S::Proxy, Error> {
609 let service_path = format!("{SVC_DIR}/{}/{}", S::SERVICE_NAME, instance.as_ref());
610 let service_path = service_path.strip_prefix('/').unwrap();
614 let directory_proxy = open_directory_async(directory, service_path, fio::Rights::empty())?;
615 Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
616 directory_proxy,
617 instance.as_ref().to_string(),
618 ))))
619}
620
621pub fn open_service<S: ServiceMarker>() -> Result<fio::DirectoryProxy, Error> {
623 let service_path = format!("{}/{}", SVC_DIR, S::SERVICE_NAME);
624 fuchsia_fs::directory::open_in_namespace(&service_path, fio::Flags::empty())
625 .context("namespace open failed")
626}
627
628pub fn open_service_at(service_name: impl AsRef<str>) -> Result<fio::DirectoryProxy, Error> {
630 let service_path = format!("{SVC_DIR}/{}", service_name.as_ref());
631 fuchsia_fs::directory::open_in_namespace(&service_path, fio::Flags::empty())
632 .context("namespace open failed")
633}
634
635pub async fn open_childs_exposed_directory(
638 child_name: impl Into<String>,
639 collection_name: Option<String>,
640) -> Result<fio::DirectoryProxy, Error> {
641 let realm_proxy = connect_to_protocol::<RealmMarker>()?;
642 let (directory_proxy, server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
643 let child_ref = ChildRef { name: child_name.into(), collection: collection_name };
644 realm_proxy.open_exposed_dir(&child_ref, server_end).await?.map_err(|e| {
645 let ChildRef { name, collection } = child_ref;
646 format_err!("failed to bind to child {} in collection {:?}: {:?}", name, collection, e)
647 })?;
648 Ok(directory_proxy)
649}
650
651pub async fn connect_to_childs_protocol<P: DiscoverableProtocolMarker>(
654 child_name: String,
655 collection_name: Option<String>,
656) -> Result<P::Proxy, Error> {
657 let child_exposed_directory =
658 open_childs_exposed_directory(child_name, collection_name).await?;
659 connect_to_protocol_at_dir_root::<P>(&child_exposed_directory)
660}
661
662pub fn realm() -> Result<RealmProxy, Error> {
664 connect_to_protocol::<RealmMarker>()
665}
666
667#[cfg(test)]
668mod tests {
669 use std::collections::HashSet;
670 use std::sync::Arc;
671
672 use super::*;
673 use fidl::endpoints::ServiceMarker as _;
674 use fidl_fuchsia_component_client_test::{
675 ProtocolAMarker, ProtocolAProxy, ProtocolBMarker, ProtocolBProxy, ServiceMarker,
676 };
677 use fuchsia_async::{self as fasync};
678 use futures::{future, TryStreamExt};
679 use vfs::directory::simple::Simple;
680 use vfs::file::vmo::read_only;
681 use vfs::pseudo_directory;
682
683 #[fasync::run_singlethreaded(test)]
684 async fn test_svc_connector_svc_does_not_exist() -> Result<(), Error> {
685 let req = new_protocol_connector::<ProtocolAMarker>().context("error probing service")?;
686 assert_matches::assert_matches!(
687 req.exists().await.context("error checking service"),
688 Ok(false)
689 );
690 let _: ProtocolAProxy = req.connect().context("error connecting to service")?;
691
692 let req = new_protocol_connector_at::<ProtocolAMarker>(SVC_DIR)
693 .context("error probing service at svc dir")?;
694 assert_matches::assert_matches!(
695 req.exists().await.context("error checking service at svc dir"),
696 Ok(false)
697 );
698 let _: ProtocolAProxy = req.connect().context("error connecting to service at svc dir")?;
699
700 Ok(())
701 }
702
703 #[fasync::run_singlethreaded(test)]
704 async fn test_svc_connector_connect_with_dir() -> Result<(), Error> {
705 let dir = pseudo_directory! {
706 ProtocolBMarker::PROTOCOL_NAME => read_only("read_only"),
707 };
708 let dir_proxy = vfs::directory::serve_read_only(dir);
709 let req = new_protocol_connector_in_dir::<ProtocolAMarker>(&dir_proxy);
710 assert_matches::assert_matches!(
711 req.exists().await.context("error probing invalid service"),
712 Ok(false)
713 );
714 let _: ProtocolAProxy = req.connect().context("error connecting to invalid service")?;
715
716 let req = new_protocol_connector_in_dir::<ProtocolBMarker>(&dir_proxy);
717 assert_matches::assert_matches!(
718 req.exists().await.context("error probing service"),
719 Ok(true)
720 );
721 let _: ProtocolBProxy = req.connect().context("error connecting to service")?;
722
723 Ok(())
724 }
725
726 fn make_inner_service_instance_tree() -> Arc<Simple> {
727 pseudo_directory! {
728 ServiceMarker::SERVICE_NAME => pseudo_directory! {
729 "default" => read_only("read_only"),
730 "another_instance" => read_only("read_only"),
731 },
732 }
733 }
734
735 fn make_service_instance_tree() -> Arc<Simple> {
736 pseudo_directory! {
737 "svc" => make_inner_service_instance_tree(),
738 }
739 }
740
741 #[fasync::run_until_stalled(test)]
742 async fn test_service_instance_watcher_from_root() -> Result<(), Error> {
743 let dir_proxy = vfs::directory::serve_read_only(make_service_instance_tree());
744 let watcher = Service::open_from_dir_prefix(&dir_proxy, SVC_DIR, ServiceMarker)?;
745 let found_names: HashSet<_> = watcher
746 .watch()
747 .await?
748 .take(2)
749 .and_then(|proxy| future::ready(Ok(proxy.instance_name().to_owned())))
750 .try_collect()
751 .await?;
752
753 assert_eq!(
754 found_names,
755 HashSet::from_iter(["default".to_owned(), "another_instance".to_owned()])
756 );
757
758 Ok(())
759 }
760
761 #[fasync::run_until_stalled(test)]
762 async fn test_service_instance_watcher_from_svc() -> Result<(), Error> {
763 let dir_proxy = vfs::directory::serve_read_only(make_inner_service_instance_tree());
764 let watcher = Service::open_from_dir(&dir_proxy, ServiceMarker)?;
765 let found_names: HashSet<_> = watcher
766 .watch()
767 .await?
768 .take(2)
769 .and_then(|proxy| future::ready(Ok(proxy.instance_name().to_owned())))
770 .try_collect()
771 .await?;
772
773 assert_eq!(
774 found_names,
775 HashSet::from_iter(["default".to_owned(), "another_instance".to_owned()])
776 );
777
778 Ok(())
779 }
780
781 #[fasync::run_until_stalled(test)]
782 async fn test_connect_to_all_services() -> Result<(), Error> {
783 let dir_proxy = vfs::directory::serve_read_only(make_service_instance_tree());
784 let watcher = Service::open_from_dir_prefix(&dir_proxy, SVC_DIR, ServiceMarker)?;
785 let _: Vec<_> = watcher.watch().await?.take(2).try_collect().await?;
786
787 Ok(())
788 }
789
790 #[fasync::run_until_stalled(test)]
791 async fn test_connect_to_any() -> Result<(), Error> {
792 let dir_proxy = vfs::directory::serve_read_only(make_service_instance_tree());
793 let watcher = Service::open_from_dir_prefix(&dir_proxy, SVC_DIR, ServiceMarker)?;
794 let found = watcher.watch_for_any().await?;
795 assert!(["default", "another_instance"].contains(&found.instance_name()));
796
797 Ok(())
798 }
799}