1#![deny(missing_docs)]
8
9use anyhow::{format_err, Context as _, Error};
10use fidl::endpoints::{
11 DiscoverableProtocolMarker, MemberOpener, ProtocolMarker, ServiceMarker, ServiceProxy,
12};
13use fidl_fuchsia_component::{RealmMarker, RealmProxy};
14use fidl_fuchsia_component_decl::ChildRef;
15use fidl_fuchsia_io as fio;
16use fuchsia_component_directory::{open_directory_async, AsRefDirectory};
17use fuchsia_fs::directory::{WatchEvent, Watcher};
18use futures::stream::FusedStream;
19use futures::{Stream, StreamExt};
20use pin_project::pin_project;
21use std::borrow::Borrow;
22use std::marker::PhantomData;
23use std::pin::pin;
24use std::task::Poll;
25
26pub const SVC_DIR: &'static str = "/svc";
28
29pub struct ProtocolConnector<D: Borrow<fio::DirectoryProxy>, P: DiscoverableProtocolMarker> {
31 svc_dir: D,
32 _svc_marker: PhantomData<P>,
33}
34
35impl<D: Borrow<fio::DirectoryProxy>, P: DiscoverableProtocolMarker> ProtocolConnector<D, P> {
36 fn new(svc_dir: D) -> ProtocolConnector<D, P> {
38 ProtocolConnector { svc_dir, _svc_marker: PhantomData }
39 }
40
41 pub async fn exists(&self) -> Result<bool, Error> {
46 match fuchsia_fs::directory::dir_contains(self.svc_dir.borrow(), P::PROTOCOL_NAME).await {
47 Ok(v) => Ok(v),
48 Err(fuchsia_fs::directory::EnumerateError::Fidl(
51 _,
52 fidl::Error::ClientChannelClosed { status, .. },
53 )) if status == zx::Status::PEER_CLOSED => Ok(false),
54 Err(e) => Err(Error::new(e).context("error checking for service entry in directory")),
55 }
56 }
57
58 pub fn connect_with(self, server_end: zx::Channel) -> Result<(), Error> {
63 #[cfg(fuchsia_api_level_at_least = "27")]
64 return self
65 .svc_dir
66 .borrow()
67 .open(
68 P::PROTOCOL_NAME,
69 fio::Flags::PROTOCOL_SERVICE,
70 &fio::Options::default(),
71 server_end.into(),
72 )
73 .context("error connecting to protocol");
74 #[cfg(not(fuchsia_api_level_at_least = "27"))]
75 return self
76 .svc_dir
77 .borrow()
78 .open3(
79 P::PROTOCOL_NAME,
80 fio::Flags::PROTOCOL_SERVICE,
81 &fio::Options::default(),
82 server_end.into(),
83 )
84 .context("error connecting to protocol");
85 }
86
87 pub fn connect(self) -> Result<P::Proxy, Error> {
92 let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
93 let () = self
94 .connect_with(server_end.into_channel())
95 .context("error connecting with server channel")?;
96 Ok(proxy)
97 }
98}
99
100pub fn clone_namespace_svc() -> Result<fio::DirectoryProxy, Error> {
102 fuchsia_fs::directory::open_in_namespace(SVC_DIR, fio::Flags::empty())
103 .context("error opening svc directory")
104}
105
106pub fn new_protocol_connector<P: DiscoverableProtocolMarker>(
109) -> Result<ProtocolConnector<fio::DirectoryProxy, P>, Error> {
110 new_protocol_connector_at::<P>(SVC_DIR)
111}
112
113pub fn new_protocol_connector_at<P: DiscoverableProtocolMarker>(
118 service_directory_path: &str,
119) -> Result<ProtocolConnector<fio::DirectoryProxy, P>, Error> {
120 let dir = fuchsia_fs::directory::open_in_namespace(service_directory_path, fio::Flags::empty())
121 .context("error opening service directory")?;
122
123 Ok(ProtocolConnector::new(dir))
124}
125
126pub fn new_protocol_connector_in_dir<P: DiscoverableProtocolMarker>(
128 dir: &fio::DirectoryProxy,
129) -> ProtocolConnector<&fio::DirectoryProxy, P> {
130 ProtocolConnector::new(dir)
131}
132
133pub fn connect_channel_to_protocol<P: DiscoverableProtocolMarker>(
135 server_end: zx::Channel,
136) -> Result<(), Error> {
137 connect_channel_to_protocol_at::<P>(server_end, SVC_DIR)
138}
139
140pub fn connect_channel_to_protocol_at<P: DiscoverableProtocolMarker>(
142 server_end: zx::Channel,
143 service_directory_path: &str,
144) -> Result<(), Error> {
145 let protocol_path = format!("{}/{}", service_directory_path, P::PROTOCOL_NAME);
146 connect_channel_to_protocol_at_path(server_end, &protocol_path)
147}
148
149pub fn connect_channel_to_protocol_at_path(
151 server_end: zx::Channel,
152 protocol_path: &str,
153) -> Result<(), Error> {
154 fdio::service_connect(&protocol_path, server_end)
155 .with_context(|| format!("Error connecting to protocol path: {}", protocol_path))
156}
157
158pub fn connect_to_protocol<P: DiscoverableProtocolMarker>() -> Result<P::Proxy, Error> {
160 connect_to_protocol_at::<P>(SVC_DIR)
161}
162
163pub fn connect_to_protocol_sync<P: DiscoverableProtocolMarker>(
169) -> Result<P::SynchronousProxy, Error> {
170 connect_to_protocol_sync_at::<P>(SVC_DIR)
171}
172
173pub fn connect_to_protocol_at<P: DiscoverableProtocolMarker>(
175 service_prefix: impl AsRef<str>,
176) -> Result<P::Proxy, Error> {
177 let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
178 let () =
179 connect_channel_to_protocol_at::<P>(server_end.into_channel(), service_prefix.as_ref())?;
180 Ok(proxy)
181}
182
183pub fn connect_to_protocol_sync_at<P: DiscoverableProtocolMarker>(
189 service_prefix: impl AsRef<str>,
190) -> Result<P::SynchronousProxy, Error> {
191 let (proxy, server_end) = fidl::endpoints::create_sync_proxy::<P>();
192 let () =
193 connect_channel_to_protocol_at::<P>(server_end.into_channel(), service_prefix.as_ref())?;
194 Ok(proxy)
195}
196
197pub fn connect_to_protocol_at_path<P: ProtocolMarker>(
199 protocol_path: impl AsRef<str>,
200) -> Result<P::Proxy, Error> {
201 let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
202 let () =
203 connect_channel_to_protocol_at_path(server_end.into_channel(), protocol_path.as_ref())?;
204 Ok(proxy)
205}
206
207pub fn connect_to_protocol_at_dir_root<P: DiscoverableProtocolMarker>(
209 directory: &impl AsRefDirectory,
210) -> Result<P::Proxy, Error> {
211 connect_to_named_protocol_at_dir_root::<P>(directory, P::PROTOCOL_NAME)
212}
213
214pub fn connect_to_named_protocol_at_dir_root<P: ProtocolMarker>(
216 directory: &impl AsRefDirectory,
217 filename: &str,
218) -> Result<P::Proxy, Error> {
219 let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
220 directory.as_ref_directory().open(
221 filename,
222 fio::Flags::PROTOCOL_SERVICE,
223 server_end.into_channel().into(),
224 )?;
225 Ok(proxy)
226}
227
228pub fn connect_to_protocol_at_dir_svc<P: DiscoverableProtocolMarker>(
230 directory: &impl AsRefDirectory,
231) -> Result<P::Proxy, Error> {
232 let protocol_path = format!("{}/{}", SVC_DIR, P::PROTOCOL_NAME);
233 connect_to_named_protocol_at_dir_root::<P>(directory, &protocol_path)
234}
235
236pub struct ServiceInstanceDirectory(pub fio::DirectoryProxy, pub String);
239
240impl MemberOpener for ServiceInstanceDirectory {
241 fn open_member(&self, member: &str, server_end: zx::Channel) -> Result<(), fidl::Error> {
242 let Self(directory, _) = self;
243 #[cfg(fuchsia_api_level_at_least = "27")]
244 return directory.open(
245 member,
246 fio::Flags::PROTOCOL_SERVICE,
247 &fio::Options::default(),
248 server_end,
249 );
250 #[cfg(not(fuchsia_api_level_at_least = "27"))]
251 return directory.open3(
252 member,
253 fio::Flags::PROTOCOL_SERVICE,
254 &fio::Options::default(),
255 server_end,
256 );
257 }
258 fn instance_name(&self) -> &str {
259 let Self(_, instance_name) = self;
260 return &instance_name;
261 }
262}
263
264struct ServiceInstance<S> {
267 name: String,
269 service: Service<S>,
270}
271
272impl<S: ServiceMarker> MemberOpener for ServiceInstance<S> {
273 fn open_member(&self, member: &str, server_end: zx::Channel) -> Result<(), fidl::Error> {
274 self.service.connect_to_instance_member_with_channel(&self.name, member, server_end)
275 }
276 fn instance_name(&self) -> &str {
277 return &self.name;
278 }
279}
280
281pub struct Service<S> {
283 dir: fio::DirectoryProxy,
284 _marker: S,
285}
286
287impl<S: Clone> Clone for Service<S> {
288 fn clone(&self) -> Self {
289 Self { dir: Clone::clone(&self.dir), _marker: self._marker.clone() }
290 }
291}
292
293impl<S> From<fio::DirectoryProxy> for Service<S>
296where
297 S: Default,
298{
299 fn from(dir: fio::DirectoryProxy) -> Self {
300 Self { dir, _marker: S::default() }
301 }
302}
303
304impl<S: ServiceMarker> Service<S> {
305 pub fn from_service_dir_proxy(dir: fio::DirectoryProxy, _marker: S) -> Self {
308 Self { dir, _marker }
309 }
310
311 pub fn open(marker: S) -> Result<Self, Error> {
313 Ok(Self::from_service_dir_proxy(open_service::<S>()?, marker))
314 }
315
316 pub fn open_at(service_name: impl AsRef<str>, marker: S) -> Result<Self, Error> {
318 Ok(Self::from_service_dir_proxy(open_service_at(service_name)?, marker))
319 }
320
321 pub fn open_from_dir(svc_dir: impl AsRefDirectory, marker: S) -> Result<Self, Error> {
323 let dir = open_directory_async(&svc_dir, S::SERVICE_NAME, fio::Rights::empty())?;
324 Ok(Self::from_service_dir_proxy(dir, marker))
325 }
326
327 pub fn open_from_dir_prefix(
330 dir: impl AsRefDirectory,
331 prefix: impl AsRef<str>,
332 marker: S,
333 ) -> Result<Self, Error> {
334 let prefix = prefix.as_ref();
335 let service_path = format!("{prefix}/{}", S::SERVICE_NAME);
336 let service_path = service_path.strip_prefix('/').unwrap_or_else(|| service_path.as_ref());
340 let dir = open_directory_async(&dir, &service_path, fio::Rights::empty())?;
341 Ok(Self::from_service_dir_proxy(dir, marker))
342 }
343
344 pub fn connect_to_instance(&self, name: impl AsRef<str>) -> Result<S::Proxy, Error> {
348 let directory_proxy = fuchsia_fs::directory::open_directory_async(
349 &self.dir,
350 name.as_ref(),
351 fio::Flags::empty(),
352 )?;
353 Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
354 directory_proxy,
355 name.as_ref().to_string(),
356 ))))
357 }
358
359 fn connect_to_instance_member_with_channel(
363 &self,
364 instance: impl AsRef<str>,
365 member: impl AsRef<str>,
366 server_end: zx::Channel,
367 ) -> Result<(), fidl::Error> {
368 let path = format!("{}/{}", instance.as_ref(), member.as_ref());
369 #[cfg(fuchsia_api_level_at_least = "27")]
370 return self.dir.open(
371 &path,
372 fio::Flags::PROTOCOL_SERVICE,
373 &fio::Options::default(),
374 server_end,
375 );
376 #[cfg(not(fuchsia_api_level_at_least = "27"))]
377 return self.dir.open3(
378 &path,
379 fio::Flags::PROTOCOL_SERVICE,
380 &fio::Options::default(),
381 server_end,
382 );
383 }
384
385 pub async fn watch(self) -> Result<ServiceInstanceStream<S>, Error> {
388 let watcher = Watcher::new(&self.dir).await?;
389 let finished = false;
390 Ok(ServiceInstanceStream { service: self, watcher, finished })
391 }
392
393 pub async fn watch_for_any(self) -> Result<S::Proxy, Error> {
395 self.watch()
396 .await?
397 .next()
398 .await
399 .context("No instances found before service directory was removed")?
400 }
401
402 pub async fn enumerate(self) -> Result<Vec<S::Proxy>, Error> {
404 let instances: Vec<S::Proxy> = fuchsia_fs::directory::readdir(&self.dir)
405 .await?
406 .into_iter()
407 .map(|dirent| {
408 S::Proxy::from_member_opener(Box::new(ServiceInstance {
409 service: self.clone(),
410 name: dirent.name,
411 }))
412 })
413 .collect();
414 Ok(instances)
415 }
416}
417
418#[pin_project]
424pub struct ServiceInstanceStream<S: Clone> {
425 service: Service<S>,
426 watcher: Watcher,
427 finished: bool,
428}
429
430impl<S: ServiceMarker> Stream for ServiceInstanceStream<S> {
431 type Item = Result<S::Proxy, Error>;
432
433 fn poll_next(
434 self: std::pin::Pin<&mut Self>,
435 cx: &mut std::task::Context<'_>,
436 ) -> Poll<Option<Self::Item>> {
437 let this = self.project();
438 use Poll::*;
439 if *this.finished {
440 return Poll::Ready(None);
441 }
442 while let Ready(next) = this.watcher.poll_next_unpin(cx) {
445 match next {
446 Some(Ok(state)) => match state.event {
447 WatchEvent::DELETED => {
448 *this.finished = true;
449 return Ready(None);
450 }
451 WatchEvent::ADD_FILE | WatchEvent::EXISTING => {
452 let filename = state.filename.to_str().unwrap();
453 if filename != "." {
454 let proxy = S::Proxy::from_member_opener(Box::new(ServiceInstance {
455 service: this.service.clone(),
456 name: filename.to_owned(),
457 }));
458 return Ready(Some(Ok(proxy)));
459 }
460 }
461 _ => {}
462 },
463 Some(Err(err)) => {
464 *this.finished = true;
465 return Ready(Some(Err(err.into())));
466 }
467 None => {
468 *this.finished = true;
469 return Ready(None);
470 }
471 }
472 }
473 Pending
474 }
475}
476
477impl<S: ServiceMarker> FusedStream for ServiceInstanceStream<S> {
478 fn is_terminated(&self) -> bool {
479 self.finished
480 }
481}
482
483pub fn connect_to_service_instance<S: ServiceMarker>(instance: &str) -> Result<S::Proxy, Error> {
490 let service_path = format!("{}/{}/{}", SVC_DIR, S::SERVICE_NAME, instance);
491 let directory_proxy =
492 fuchsia_fs::directory::open_in_namespace(&service_path, fio::Flags::empty())?;
493 Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
494 directory_proxy,
495 instance.to_string(),
496 ))))
497}
498
499pub fn connect_to_service_instance_at_dir<S: ServiceMarker>(
505 directory: &fio::DirectoryProxy,
506 instance: &str,
507) -> Result<S::Proxy, Error> {
508 let service_path = format!("{}/{}", S::SERVICE_NAME, instance);
509 let directory_proxy =
510 fuchsia_fs::directory::open_directory_async(directory, &service_path, fio::Flags::empty())?;
511 Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
512 directory_proxy,
513 instance.to_string(),
514 ))))
515}
516
517pub fn connect_to_service_instance_at_dir_svc<S: ServiceMarker>(
519 directory: &impl AsRefDirectory,
520 instance: impl AsRef<str>,
521) -> Result<S::Proxy, Error> {
522 let service_path = format!("{SVC_DIR}/{}/{}", S::SERVICE_NAME, instance.as_ref());
523 let service_path = service_path.strip_prefix('/').unwrap();
527 let directory_proxy = open_directory_async(directory, service_path, fio::Rights::empty())?;
528 Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
529 directory_proxy,
530 instance.as_ref().to_string(),
531 ))))
532}
533
534pub fn open_service<S: ServiceMarker>() -> Result<fio::DirectoryProxy, Error> {
536 let service_path = format!("{}/{}", SVC_DIR, S::SERVICE_NAME);
537 fuchsia_fs::directory::open_in_namespace(&service_path, fio::Flags::empty())
538 .context("namespace open failed")
539}
540
541pub fn open_service_at(service_name: impl AsRef<str>) -> Result<fio::DirectoryProxy, Error> {
543 let service_path = format!("{SVC_DIR}/{}", service_name.as_ref());
544 fuchsia_fs::directory::open_in_namespace(&service_path, fio::Flags::empty())
545 .context("namespace open failed")
546}
547
548pub async fn open_childs_exposed_directory(
551 child_name: impl Into<String>,
552 collection_name: Option<String>,
553) -> Result<fio::DirectoryProxy, Error> {
554 let realm_proxy = connect_to_protocol::<RealmMarker>()?;
555 let (directory_proxy, server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
556 let child_ref = ChildRef { name: child_name.into(), collection: collection_name };
557 realm_proxy.open_exposed_dir(&child_ref, server_end).await?.map_err(|e| {
558 let ChildRef { name, collection } = child_ref;
559 format_err!("failed to bind to child {} in collection {:?}: {:?}", name, collection, e)
560 })?;
561 Ok(directory_proxy)
562}
563
564pub async fn connect_to_childs_protocol<P: DiscoverableProtocolMarker>(
567 child_name: String,
568 collection_name: Option<String>,
569) -> Result<P::Proxy, Error> {
570 let child_exposed_directory =
571 open_childs_exposed_directory(child_name, collection_name).await?;
572 connect_to_protocol_at_dir_root::<P>(&child_exposed_directory)
573}
574
575pub fn realm() -> Result<RealmProxy, Error> {
577 connect_to_protocol::<RealmMarker>()
578}
579
580#[cfg(test)]
581mod tests {
582 use std::collections::HashSet;
583 use std::sync::Arc;
584
585 use super::*;
586 use fidl::endpoints::ServiceMarker as _;
587 use fidl_fuchsia_component_client_test::{
588 ProtocolAMarker, ProtocolAProxy, ProtocolBMarker, ProtocolBProxy, ServiceMarker,
589 };
590 use fuchsia_async::{self as fasync};
591 use futures::{future, TryStreamExt};
592 use vfs::directory::simple::Simple;
593 use vfs::file::vmo::read_only;
594 use vfs::pseudo_directory;
595
596 #[fasync::run_singlethreaded(test)]
597 async fn test_svc_connector_svc_does_not_exist() -> Result<(), Error> {
598 let req = new_protocol_connector::<ProtocolAMarker>().context("error probing service")?;
599 assert_matches::assert_matches!(
600 req.exists().await.context("error checking service"),
601 Ok(false)
602 );
603 let _: ProtocolAProxy = req.connect().context("error connecting to service")?;
604
605 let req = new_protocol_connector_at::<ProtocolAMarker>(SVC_DIR)
606 .context("error probing service at svc dir")?;
607 assert_matches::assert_matches!(
608 req.exists().await.context("error checking service at svc dir"),
609 Ok(false)
610 );
611 let _: ProtocolAProxy = req.connect().context("error connecting to service at svc dir")?;
612
613 Ok(())
614 }
615
616 #[fasync::run_singlethreaded(test)]
617 async fn test_svc_connector_connect_with_dir() -> Result<(), Error> {
618 let dir = pseudo_directory! {
619 ProtocolBMarker::PROTOCOL_NAME => read_only("read_only"),
620 };
621 let dir_proxy = vfs::directory::serve_read_only(dir);
622 let req = new_protocol_connector_in_dir::<ProtocolAMarker>(&dir_proxy);
623 assert_matches::assert_matches!(
624 req.exists().await.context("error probing invalid service"),
625 Ok(false)
626 );
627 let _: ProtocolAProxy = req.connect().context("error connecting to invalid service")?;
628
629 let req = new_protocol_connector_in_dir::<ProtocolBMarker>(&dir_proxy);
630 assert_matches::assert_matches!(
631 req.exists().await.context("error probing service"),
632 Ok(true)
633 );
634 let _: ProtocolBProxy = req.connect().context("error connecting to service")?;
635
636 Ok(())
637 }
638
639 fn make_inner_service_instance_tree() -> Arc<Simple> {
640 pseudo_directory! {
641 ServiceMarker::SERVICE_NAME => pseudo_directory! {
642 "default" => read_only("read_only"),
643 "another_instance" => read_only("read_only"),
644 },
645 }
646 }
647
648 fn make_service_instance_tree() -> Arc<Simple> {
649 pseudo_directory! {
650 "svc" => make_inner_service_instance_tree(),
651 }
652 }
653
654 #[fasync::run_until_stalled(test)]
655 async fn test_service_instance_watcher_from_root() -> Result<(), Error> {
656 let dir_proxy = vfs::directory::serve_read_only(make_service_instance_tree());
657 let watcher = Service::open_from_dir_prefix(&dir_proxy, SVC_DIR, ServiceMarker)?;
658 let found_names: HashSet<_> = watcher
659 .watch()
660 .await?
661 .take(2)
662 .and_then(|proxy| future::ready(Ok(proxy.instance_name().to_owned())))
663 .try_collect()
664 .await?;
665
666 assert_eq!(
667 found_names,
668 HashSet::from_iter(["default".to_owned(), "another_instance".to_owned()])
669 );
670
671 Ok(())
672 }
673
674 #[fasync::run_until_stalled(test)]
675 async fn test_service_instance_watcher_from_svc() -> Result<(), Error> {
676 let dir_proxy = vfs::directory::serve_read_only(make_inner_service_instance_tree());
677 let watcher = Service::open_from_dir(&dir_proxy, ServiceMarker)?;
678 let found_names: HashSet<_> = watcher
679 .watch()
680 .await?
681 .take(2)
682 .and_then(|proxy| future::ready(Ok(proxy.instance_name().to_owned())))
683 .try_collect()
684 .await?;
685
686 assert_eq!(
687 found_names,
688 HashSet::from_iter(["default".to_owned(), "another_instance".to_owned()])
689 );
690
691 Ok(())
692 }
693
694 #[fasync::run_until_stalled(test)]
695 async fn test_connect_to_all_services() -> Result<(), Error> {
696 let dir_proxy = vfs::directory::serve_read_only(make_service_instance_tree());
697 let watcher = Service::open_from_dir_prefix(&dir_proxy, SVC_DIR, ServiceMarker)?;
698 let _: Vec<_> = watcher.watch().await?.take(2).try_collect().await?;
699
700 Ok(())
701 }
702
703 #[fasync::run_until_stalled(test)]
704 async fn test_connect_to_any() -> Result<(), Error> {
705 let dir_proxy = vfs::directory::serve_read_only(make_service_instance_tree());
706 let watcher = Service::open_from_dir_prefix(&dir_proxy, SVC_DIR, ServiceMarker)?;
707 let found = watcher.watch_for_any().await?;
708 assert!(["default", "another_instance"].contains(&found.instance_name()));
709
710 Ok(())
711 }
712}