1#![deny(missing_docs)]
8
9use anyhow::{format_err, Context as _, Error};
10use fidl::endpoints::{
11 DiscoverableProtocolMarker, MemberOpener, ProtocolMarker, ServiceMarker, ServiceProxy,
12};
13use fidl_fuchsia_component::{RealmMarker, RealmProxy};
14use fidl_fuchsia_component_decl::ChildRef;
15use fidl_fuchsia_io as fio;
16use fuchsia_component_directory::{open_directory_async, AsRefDirectory};
17use fuchsia_fs::directory::{WatchEvent, Watcher};
18use futures::stream::FusedStream;
19use futures::{Stream, StreamExt};
20use pin_project::pin_project;
21use std::borrow::Borrow;
22use std::marker::PhantomData;
23use std::pin::pin;
24use std::task::Poll;
25
26pub const SVC_DIR: &'static str = "/svc";
28
29pub struct ProtocolConnector<D: Borrow<fio::DirectoryProxy>, P: DiscoverableProtocolMarker> {
31 svc_dir: D,
32 _svc_marker: PhantomData<P>,
33}
34
35impl<D: Borrow<fio::DirectoryProxy>, P: DiscoverableProtocolMarker> ProtocolConnector<D, P> {
36 fn new(svc_dir: D) -> ProtocolConnector<D, P> {
38 ProtocolConnector { svc_dir, _svc_marker: PhantomData }
39 }
40
41 pub async fn exists(&self) -> Result<bool, Error> {
46 match fuchsia_fs::directory::dir_contains(self.svc_dir.borrow(), P::PROTOCOL_NAME).await {
47 Ok(v) => Ok(v),
48 Err(fuchsia_fs::directory::EnumerateError::Fidl(
51 _,
52 fidl::Error::ClientChannelClosed { status, .. },
53 )) if status == zx::Status::PEER_CLOSED => Ok(false),
54 Err(e) => Err(Error::new(e).context("error checking for service entry in directory")),
55 }
56 }
57
58 pub fn connect_with(self, server_end: zx::Channel) -> Result<(), Error> {
63 #[cfg(fuchsia_api_level_at_least = "NEXT")]
64 return self
65 .svc_dir
66 .borrow()
67 .open(
68 P::PROTOCOL_NAME,
69 fio::Flags::PROTOCOL_SERVICE,
70 &fio::Options::default(),
71 server_end.into(),
72 )
73 .context("error connecting to protocol");
74 #[cfg(not(fuchsia_api_level_at_least = "NEXT"))]
75 return self
76 .svc_dir
77 .borrow()
78 .open3(
79 P::PROTOCOL_NAME,
80 fio::Flags::PROTOCOL_SERVICE,
81 &fio::Options::default(),
82 server_end.into(),
83 )
84 .context("error connecting to protocol");
85 }
86
87 pub fn connect(self) -> Result<P::Proxy, Error> {
92 let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
93 let () = self
94 .connect_with(server_end.into_channel())
95 .context("error connecting with server channel")?;
96 Ok(proxy)
97 }
98}
99
100pub fn clone_namespace_svc() -> Result<fio::DirectoryProxy, Error> {
102 fuchsia_fs::directory::open_in_namespace(SVC_DIR, fio::Flags::empty())
103 .context("error opening svc directory")
104}
105
106pub fn new_protocol_connector<P: DiscoverableProtocolMarker>(
109) -> Result<ProtocolConnector<fio::DirectoryProxy, P>, Error> {
110 new_protocol_connector_at::<P>(SVC_DIR)
111}
112
113pub fn new_protocol_connector_at<P: DiscoverableProtocolMarker>(
118 service_directory_path: &str,
119) -> Result<ProtocolConnector<fio::DirectoryProxy, P>, Error> {
120 let dir = fuchsia_fs::directory::open_in_namespace(service_directory_path, fio::Flags::empty())
121 .context("error opening service directory")?;
122
123 Ok(ProtocolConnector::new(dir))
124}
125
126pub fn new_protocol_connector_in_dir<P: DiscoverableProtocolMarker>(
128 dir: &fio::DirectoryProxy,
129) -> ProtocolConnector<&fio::DirectoryProxy, P> {
130 ProtocolConnector::new(dir)
131}
132
133pub fn connect_channel_to_protocol<P: DiscoverableProtocolMarker>(
135 server_end: zx::Channel,
136) -> Result<(), Error> {
137 connect_channel_to_protocol_at::<P>(server_end, SVC_DIR)
138}
139
140pub fn connect_channel_to_protocol_at<P: DiscoverableProtocolMarker>(
142 server_end: zx::Channel,
143 service_directory_path: &str,
144) -> Result<(), Error> {
145 let protocol_path = format!("{}/{}", service_directory_path, P::PROTOCOL_NAME);
146 connect_channel_to_protocol_at_path(server_end, &protocol_path)
147}
148
149pub fn connect_channel_to_protocol_at_path(
151 server_end: zx::Channel,
152 protocol_path: &str,
153) -> Result<(), Error> {
154 fdio::service_connect(&protocol_path, server_end)
155 .with_context(|| format!("Error connecting to protocol path: {}", protocol_path))
156}
157
158pub fn connect_to_protocol<P: DiscoverableProtocolMarker>() -> Result<P::Proxy, Error> {
160 connect_to_protocol_at::<P>(SVC_DIR)
161}
162
163pub fn connect_to_protocol_sync<P: DiscoverableProtocolMarker>(
169) -> Result<P::SynchronousProxy, Error> {
170 connect_to_protocol_sync_at::<P>(SVC_DIR)
171}
172
173pub fn connect_to_protocol_at<P: DiscoverableProtocolMarker>(
175 service_prefix: impl AsRef<str>,
176) -> Result<P::Proxy, Error> {
177 let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
178 let () =
179 connect_channel_to_protocol_at::<P>(server_end.into_channel(), service_prefix.as_ref())?;
180 Ok(proxy)
181}
182
183pub fn connect_to_protocol_sync_at<P: DiscoverableProtocolMarker>(
189 service_prefix: impl AsRef<str>,
190) -> Result<P::SynchronousProxy, Error> {
191 let (proxy, server_end) = fidl::endpoints::create_sync_proxy::<P>();
192 let () =
193 connect_channel_to_protocol_at::<P>(server_end.into_channel(), service_prefix.as_ref())?;
194 Ok(proxy)
195}
196
197pub fn connect_to_protocol_at_path<P: ProtocolMarker>(
199 protocol_path: impl AsRef<str>,
200) -> Result<P::Proxy, Error> {
201 let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
202 let () =
203 connect_channel_to_protocol_at_path(server_end.into_channel(), protocol_path.as_ref())?;
204 Ok(proxy)
205}
206
207pub fn connect_to_protocol_at_dir_root<P: DiscoverableProtocolMarker>(
209 directory: &impl AsRefDirectory,
210) -> Result<P::Proxy, Error> {
211 connect_to_named_protocol_at_dir_root::<P>(directory, P::PROTOCOL_NAME)
212}
213
214pub fn connect_to_named_protocol_at_dir_root<P: ProtocolMarker>(
216 directory: &impl AsRefDirectory,
217 filename: &str,
218) -> Result<P::Proxy, Error> {
219 let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
220 directory.as_ref_directory().open(
221 filename,
222 fio::Flags::PROTOCOL_SERVICE,
223 server_end.into_channel().into(),
224 )?;
225 Ok(proxy)
226}
227
228pub fn connect_to_protocol_at_dir_svc<P: DiscoverableProtocolMarker>(
230 directory: &impl AsRefDirectory,
231) -> Result<P::Proxy, Error> {
232 let protocol_path = format!("{}/{}", SVC_DIR, P::PROTOCOL_NAME);
233 connect_to_named_protocol_at_dir_root::<P>(directory, &protocol_path)
234}
235
236pub struct ServiceInstanceDirectory(pub fio::DirectoryProxy, pub String);
239
240impl MemberOpener for ServiceInstanceDirectory {
241 fn open_member(&self, member: &str, server_end: zx::Channel) -> Result<(), fidl::Error> {
242 let Self(directory, _) = self;
243 #[cfg(fuchsia_api_level_at_least = "NEXT")]
244 return directory.open(
245 member,
246 fio::Flags::PROTOCOL_SERVICE,
247 &fio::Options::default(),
248 server_end,
249 );
250 #[cfg(not(fuchsia_api_level_at_least = "NEXT"))]
251 return directory.open3(
252 member,
253 fio::Flags::PROTOCOL_SERVICE,
254 &fio::Options::default(),
255 server_end,
256 );
257 }
258 fn instance_name(&self) -> &str {
259 let Self(_, instance_name) = self;
260 return &instance_name;
261 }
262}
263
264struct ServiceInstance<S> {
267 name: String,
269 service: Service<S>,
270}
271
272impl<S: ServiceMarker> MemberOpener for ServiceInstance<S> {
273 fn open_member(&self, member: &str, server_end: zx::Channel) -> Result<(), fidl::Error> {
274 self.service.connect_to_instance_member_with_channel(&self.name, member, server_end)
275 }
276 fn instance_name(&self) -> &str {
277 return &self.name;
278 }
279}
280
281pub struct Service<S> {
283 dir: fio::DirectoryProxy,
284 _marker: S,
285}
286
287impl<S: Clone> Clone for Service<S> {
288 fn clone(&self) -> Self {
289 Self { dir: Clone::clone(&self.dir), _marker: self._marker.clone() }
290 }
291}
292
293impl<S> From<fio::DirectoryProxy> for Service<S>
296where
297 S: Default,
298{
299 fn from(dir: fio::DirectoryProxy) -> Self {
300 Self { dir, _marker: S::default() }
301 }
302}
303
304impl<S: ServiceMarker> Service<S> {
305 pub fn from_service_dir_proxy(dir: fio::DirectoryProxy, _marker: S) -> Self {
308 Self { dir, _marker }
309 }
310
311 pub fn open(marker: S) -> Result<Self, Error> {
313 Ok(Self::from_service_dir_proxy(open_service::<S>()?, marker))
314 }
315
316 pub fn open_from_dir(svc_dir: impl AsRefDirectory, marker: S) -> Result<Self, Error> {
318 let dir = open_directory_async(&svc_dir, S::SERVICE_NAME, fio::Rights::empty())?;
319 Ok(Self::from_service_dir_proxy(dir, marker))
320 }
321
322 pub fn open_from_dir_prefix(
325 dir: impl AsRefDirectory,
326 prefix: impl AsRef<str>,
327 marker: S,
328 ) -> Result<Self, Error> {
329 let prefix = prefix.as_ref();
330 let service_path = format!("{prefix}/{}", S::SERVICE_NAME);
331 let service_path = service_path.strip_prefix('/').unwrap_or_else(|| service_path.as_ref());
335 let dir = open_directory_async(&dir, &service_path, fio::Rights::empty())?;
336 Ok(Self::from_service_dir_proxy(dir, marker))
337 }
338
339 pub fn connect_to_instance(&self, name: impl AsRef<str>) -> Result<S::Proxy, Error> {
343 let directory_proxy = fuchsia_fs::directory::open_directory_async(
344 &self.dir,
345 name.as_ref(),
346 fio::Flags::empty(),
347 )?;
348 Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
349 directory_proxy,
350 name.as_ref().to_string(),
351 ))))
352 }
353
354 fn connect_to_instance_member_with_channel(
358 &self,
359 instance: impl AsRef<str>,
360 member: impl AsRef<str>,
361 server_end: zx::Channel,
362 ) -> Result<(), fidl::Error> {
363 let path = format!("{}/{}", instance.as_ref(), member.as_ref());
364 #[cfg(fuchsia_api_level_at_least = "NEXT")]
365 return self.dir.open(
366 &path,
367 fio::Flags::PROTOCOL_SERVICE,
368 &fio::Options::default(),
369 server_end,
370 );
371 #[cfg(not(fuchsia_api_level_at_least = "NEXT"))]
372 return self.dir.open3(
373 &path,
374 fio::Flags::PROTOCOL_SERVICE,
375 &fio::Options::default(),
376 server_end,
377 );
378 }
379
380 pub async fn watch(self) -> Result<ServiceInstanceStream<S>, Error> {
383 let watcher = Watcher::new(&self.dir).await?;
384 let finished = false;
385 Ok(ServiceInstanceStream { service: self, watcher, finished })
386 }
387
388 pub async fn watch_for_any(self) -> Result<S::Proxy, Error> {
390 self.watch()
391 .await?
392 .next()
393 .await
394 .context("No instances found before service directory was removed")?
395 }
396
397 pub async fn enumerate(self) -> Result<Vec<S::Proxy>, Error> {
399 let instances: Vec<S::Proxy> = fuchsia_fs::directory::readdir(&self.dir)
400 .await?
401 .into_iter()
402 .map(|dirent| {
403 S::Proxy::from_member_opener(Box::new(ServiceInstance {
404 service: self.clone(),
405 name: dirent.name,
406 }))
407 })
408 .collect();
409 Ok(instances)
410 }
411}
412
413#[pin_project]
419pub struct ServiceInstanceStream<S: Clone> {
420 service: Service<S>,
421 watcher: Watcher,
422 finished: bool,
423}
424
425impl<S: ServiceMarker> Stream for ServiceInstanceStream<S> {
426 type Item = Result<S::Proxy, Error>;
427
428 fn poll_next(
429 self: std::pin::Pin<&mut Self>,
430 cx: &mut std::task::Context<'_>,
431 ) -> Poll<Option<Self::Item>> {
432 let this = self.project();
433 use Poll::*;
434 if *this.finished {
435 return Poll::Ready(None);
436 }
437 while let Ready(next) = this.watcher.poll_next_unpin(cx) {
440 match next {
441 Some(Ok(state)) => match state.event {
442 WatchEvent::DELETED => {
443 *this.finished = true;
444 return Ready(None);
445 }
446 WatchEvent::ADD_FILE | WatchEvent::EXISTING => {
447 let filename = state.filename.to_str().unwrap();
448 if filename != "." {
449 let proxy = S::Proxy::from_member_opener(Box::new(ServiceInstance {
450 service: this.service.clone(),
451 name: filename.to_owned(),
452 }));
453 return Ready(Some(Ok(proxy)));
454 }
455 }
456 _ => {}
457 },
458 Some(Err(err)) => {
459 *this.finished = true;
460 return Ready(Some(Err(err.into())));
461 }
462 None => {
463 *this.finished = true;
464 return Ready(None);
465 }
466 }
467 }
468 Pending
469 }
470}
471
472impl<S: ServiceMarker> FusedStream for ServiceInstanceStream<S> {
473 fn is_terminated(&self) -> bool {
474 self.finished
475 }
476}
477
478pub fn connect_to_service_instance<S: ServiceMarker>(instance: &str) -> Result<S::Proxy, Error> {
485 let service_path = format!("{}/{}/{}", SVC_DIR, S::SERVICE_NAME, instance);
486 let directory_proxy =
487 fuchsia_fs::directory::open_in_namespace(&service_path, fio::Flags::empty())?;
488 Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
489 directory_proxy,
490 instance.to_string(),
491 ))))
492}
493
494pub fn connect_to_service_instance_at_dir<S: ServiceMarker>(
500 directory: &fio::DirectoryProxy,
501 instance: &str,
502) -> Result<S::Proxy, Error> {
503 let service_path = format!("{}/{}", S::SERVICE_NAME, instance);
504 let directory_proxy =
505 fuchsia_fs::directory::open_directory_async(directory, &service_path, fio::Flags::empty())?;
506 Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
507 directory_proxy,
508 instance.to_string(),
509 ))))
510}
511
512pub fn connect_to_service_instance_at_dir_svc<S: ServiceMarker>(
514 directory: &impl AsRefDirectory,
515 instance: impl AsRef<str>,
516) -> Result<S::Proxy, Error> {
517 let service_path = format!("{SVC_DIR}/{}/{}", S::SERVICE_NAME, instance.as_ref());
518 let service_path = service_path.strip_prefix('/').unwrap();
522 let directory_proxy = open_directory_async(directory, service_path, fio::Rights::empty())?;
523 Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
524 directory_proxy,
525 instance.as_ref().to_string(),
526 ))))
527}
528
529pub fn open_service<S: ServiceMarker>() -> Result<fio::DirectoryProxy, Error> {
531 let service_path = format!("{}/{}", SVC_DIR, S::SERVICE_NAME);
532 fuchsia_fs::directory::open_in_namespace(&service_path, fio::Flags::empty())
533 .context("namespace open failed")
534}
535
536pub async fn open_childs_exposed_directory(
539 child_name: impl Into<String>,
540 collection_name: Option<String>,
541) -> Result<fio::DirectoryProxy, Error> {
542 let realm_proxy = connect_to_protocol::<RealmMarker>()?;
543 let (directory_proxy, server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
544 let child_ref = ChildRef { name: child_name.into(), collection: collection_name };
545 realm_proxy.open_exposed_dir(&child_ref, server_end).await?.map_err(|e| {
546 let ChildRef { name, collection } = child_ref;
547 format_err!("failed to bind to child {} in collection {:?}: {:?}", name, collection, e)
548 })?;
549 Ok(directory_proxy)
550}
551
552pub async fn connect_to_childs_protocol<P: DiscoverableProtocolMarker>(
555 child_name: String,
556 collection_name: Option<String>,
557) -> Result<P::Proxy, Error> {
558 let child_exposed_directory =
559 open_childs_exposed_directory(child_name, collection_name).await?;
560 connect_to_protocol_at_dir_root::<P>(&child_exposed_directory)
561}
562
563pub fn realm() -> Result<RealmProxy, Error> {
565 connect_to_protocol::<RealmMarker>()
566}
567
568#[cfg(test)]
569mod tests {
570 use std::collections::HashSet;
571 use std::sync::Arc;
572
573 use super::*;
574 use fidl::endpoints::ServiceMarker as _;
575 use fidl_fuchsia_component_client_test::{
576 ProtocolAMarker, ProtocolAProxy, ProtocolBMarker, ProtocolBProxy, ServiceMarker,
577 };
578 use fuchsia_async::{self as fasync};
579 use futures::{future, TryStreamExt};
580 use vfs::directory::simple::Simple;
581 use vfs::file::vmo::read_only;
582 use vfs::pseudo_directory;
583
584 #[fasync::run_singlethreaded(test)]
585 async fn test_svc_connector_svc_does_not_exist() -> Result<(), Error> {
586 let req = new_protocol_connector::<ProtocolAMarker>().context("error probing service")?;
587 assert_matches::assert_matches!(
588 req.exists().await.context("error checking service"),
589 Ok(false)
590 );
591 let _: ProtocolAProxy = req.connect().context("error connecting to service")?;
592
593 let req = new_protocol_connector_at::<ProtocolAMarker>(SVC_DIR)
594 .context("error probing service at svc dir")?;
595 assert_matches::assert_matches!(
596 req.exists().await.context("error checking service at svc dir"),
597 Ok(false)
598 );
599 let _: ProtocolAProxy = req.connect().context("error connecting to service at svc dir")?;
600
601 Ok(())
602 }
603
604 #[fasync::run_singlethreaded(test)]
605 async fn test_svc_connector_connect_with_dir() -> Result<(), Error> {
606 let dir = pseudo_directory! {
607 ProtocolBMarker::PROTOCOL_NAME => read_only("read_only"),
608 };
609 let dir_proxy = vfs::directory::serve_read_only(dir);
610 let req = new_protocol_connector_in_dir::<ProtocolAMarker>(&dir_proxy);
611 assert_matches::assert_matches!(
612 req.exists().await.context("error probing invalid service"),
613 Ok(false)
614 );
615 let _: ProtocolAProxy = req.connect().context("error connecting to invalid service")?;
616
617 let req = new_protocol_connector_in_dir::<ProtocolBMarker>(&dir_proxy);
618 assert_matches::assert_matches!(
619 req.exists().await.context("error probing service"),
620 Ok(true)
621 );
622 let _: ProtocolBProxy = req.connect().context("error connecting to service")?;
623
624 Ok(())
625 }
626
627 fn make_inner_service_instance_tree() -> Arc<Simple> {
628 pseudo_directory! {
629 ServiceMarker::SERVICE_NAME => pseudo_directory! {
630 "default" => read_only("read_only"),
631 "another_instance" => read_only("read_only"),
632 },
633 }
634 }
635
636 fn make_service_instance_tree() -> Arc<Simple> {
637 pseudo_directory! {
638 "svc" => make_inner_service_instance_tree(),
639 }
640 }
641
642 #[fasync::run_until_stalled(test)]
643 async fn test_service_instance_watcher_from_root() -> Result<(), Error> {
644 let dir_proxy = vfs::directory::serve_read_only(make_service_instance_tree());
645 let watcher = Service::open_from_dir_prefix(&dir_proxy, SVC_DIR, ServiceMarker)?;
646 let found_names: HashSet<_> = watcher
647 .watch()
648 .await?
649 .take(2)
650 .and_then(|proxy| future::ready(Ok(proxy.instance_name().to_owned())))
651 .try_collect()
652 .await?;
653
654 assert_eq!(
655 found_names,
656 HashSet::from_iter(["default".to_owned(), "another_instance".to_owned()])
657 );
658
659 Ok(())
660 }
661
662 #[fasync::run_until_stalled(test)]
663 async fn test_service_instance_watcher_from_svc() -> Result<(), Error> {
664 let dir_proxy = vfs::directory::serve_read_only(make_inner_service_instance_tree());
665 let watcher = Service::open_from_dir(&dir_proxy, ServiceMarker)?;
666 let found_names: HashSet<_> = watcher
667 .watch()
668 .await?
669 .take(2)
670 .and_then(|proxy| future::ready(Ok(proxy.instance_name().to_owned())))
671 .try_collect()
672 .await?;
673
674 assert_eq!(
675 found_names,
676 HashSet::from_iter(["default".to_owned(), "another_instance".to_owned()])
677 );
678
679 Ok(())
680 }
681
682 #[fasync::run_until_stalled(test)]
683 async fn test_connect_to_all_services() -> Result<(), Error> {
684 let dir_proxy = vfs::directory::serve_read_only(make_service_instance_tree());
685 let watcher = Service::open_from_dir_prefix(&dir_proxy, SVC_DIR, ServiceMarker)?;
686 let _: Vec<_> = watcher.watch().await?.take(2).try_collect().await?;
687
688 Ok(())
689 }
690
691 #[fasync::run_until_stalled(test)]
692 async fn test_connect_to_any() -> Result<(), Error> {
693 let dir_proxy = vfs::directory::serve_read_only(make_service_instance_tree());
694 let watcher = Service::open_from_dir_prefix(&dir_proxy, SVC_DIR, ServiceMarker)?;
695 let found = watcher.watch_for_any().await?;
696 assert!(["default", "another_instance"].contains(&found.instance_name()));
697
698 Ok(())
699 }
700}