1use anyhow::{format_err, Context as _, Error};
6use fidl::endpoints::{
7 create_request_stream, ClientEnd, ControlHandle, DiscoverableProtocolMarker, RequestStream,
8 ServerEnd, ServiceMarker, ServiceProxy,
9};
10use fuchsia_component::client::Connect;
11use fuchsia_component::DEFAULT_SERVICE_INSTANCE;
12use futures::channel::oneshot;
13use futures::future::BoxFuture;
14use futures::lock::Mutex;
15use futures::{select, FutureExt, TryStreamExt};
16use log::*;
17use runner::get_value as get_dictionary_value;
18use std::collections::HashMap;
19use std::sync::Arc;
20use vfs::execution_scope::ExecutionScope;
21use {
22 fidl_fuchsia_component as fcomponent, fidl_fuchsia_component_runner as fcrunner,
23 fidl_fuchsia_component_test as ftest, fidl_fuchsia_data as fdata, fidl_fuchsia_io as fio,
24 fidl_fuchsia_process as fprocess, fuchsia_async as fasync,
25};
26
27pub struct LocalComponentHandles {
30 namespace: HashMap<String, fio::DirectoryProxy>,
31 numbered_handles: HashMap<u32, zx::Handle>,
32
33 stop_notifier: Arc<Mutex<Option<oneshot::Sender<()>>>>,
34
35 pub outgoing_dir: ServerEnd<fio::DirectoryMarker>,
38}
39
40impl LocalComponentHandles {
41 fn new(
42 fidl_namespace: Vec<fcrunner::ComponentNamespaceEntry>,
43 fidl_numbered_handles: Vec<fprocess::HandleInfo>,
44 outgoing_dir: ServerEnd<fio::DirectoryMarker>,
45 ) -> Result<(Self, Arc<Mutex<Option<oneshot::Sender<()>>>>), Error> {
46 let stop_notifier = Arc::new(Mutex::new(None));
47 let mut namespace = HashMap::new();
48 for namespace_entry in fidl_namespace {
49 namespace.insert(
50 namespace_entry.path.ok_or_else(|| format_err!("namespace entry missing path"))?,
51 namespace_entry
52 .directory
53 .ok_or_else(|| format_err!("namespace entry missing directory handle"))?
54 .into_proxy(),
55 );
56 }
57 let numbered_handles =
58 fidl_numbered_handles.into_iter().map(|h| (h.id, h.handle)).collect::<HashMap<_, _>>();
59 Ok((
60 Self {
61 namespace,
62 numbered_handles,
63 outgoing_dir,
64 stop_notifier: stop_notifier.clone(),
65 },
66 stop_notifier,
67 ))
68 }
69
70 pub fn take_numbered_handle(&mut self, id: u32) -> Option<zx::Handle> {
71 self.numbered_handles.remove(&id)
72 }
73
74 pub fn numbered_handles(&self) -> &HashMap<u32, zx::Handle> {
75 &self.numbered_handles
76 }
77
78 pub async fn register_stop_notifier(&self) -> oneshot::Receiver<()> {
89 let mut stop_notifier_guard = self.stop_notifier.lock().await;
90 if stop_notifier_guard.is_some() {
91 panic!("cannot register multiple stop handlers for a single local component");
92 }
93 let (sender, receiver) = oneshot::channel();
94 *stop_notifier_guard = Some(sender);
95 receiver
96 }
97
98 pub fn connect_to_protocol<T: Connect>(&self) -> Result<T, Error> {
100 self.connect_to_named_protocol(T::Protocol::PROTOCOL_NAME)
101 }
102
103 pub fn connect_to_named_protocol<T: Connect>(&self, name: &str) -> Result<T, Error> {
105 let svc_dir_proxy = self.namespace.get("/svc").ok_or_else(|| {
106 format_err!("the component's namespace doesn't have a /svc directory")
107 })?;
108 T::connect_at_dir_root_with_name(svc_dir_proxy, name)
109 }
110
111 pub fn open_service<S: ServiceMarker>(&self) -> Result<fio::DirectoryProxy, Error> {
113 self.open_named_service(S::SERVICE_NAME)
114 }
115
116 pub fn open_named_service(&self, name: &str) -> Result<fio::DirectoryProxy, Error> {
119 let svc_dir_proxy = self.namespace.get("/svc").ok_or_else(|| {
120 format_err!("the component's namespace doesn't have a /svc directory")
121 })?;
122 fuchsia_fs::directory::open_directory_async(&svc_dir_proxy, name, fio::Flags::empty())
123 .map_err(Into::into)
124 }
125
126 pub fn connect_to_service<S: ServiceMarker>(&self) -> Result<S::Proxy, Error> {
129 self.connect_to_service_instance::<S>(DEFAULT_SERVICE_INSTANCE)
130 }
131
132 pub fn connect_to_service_instance<S: ServiceMarker>(
136 &self,
137 instance_name: &str,
138 ) -> Result<S::Proxy, Error> {
139 self.connect_to_named_service_instance::<S>(S::SERVICE_NAME, instance_name)
140 }
141
142 pub fn connect_to_named_service_instance<S: ServiceMarker>(
146 &self,
147 service_name: &str,
148 instance_name: &str,
149 ) -> Result<S::Proxy, Error> {
150 let service_dir = self.open_named_service(service_name)?;
151 let directory_proxy = fuchsia_fs::directory::open_directory_async(
152 &service_dir,
153 instance_name,
154 fio::Flags::empty(),
155 )?;
156 Ok(S::Proxy::from_member_opener(Box::new(
157 fuchsia_component::client::ServiceInstanceDirectory(
158 directory_proxy,
159 instance_name.to_owned(),
160 ),
161 )))
162 }
163
164 pub fn clone_from_namespace(&self, directory_name: &str) -> Result<fio::DirectoryProxy, Error> {
177 let dir_proxy = self.namespace.get(&format!("/{}", directory_name)).ok_or_else(|| {
178 format_err!(
179 "the local component's namespace doesn't have a /{} directory",
180 directory_name
181 )
182 })?;
183 fuchsia_fs::directory::clone(&dir_proxy).context("clone")
184 }
185}
186
187type LocalComponentImplementations = HashMap<
188 String,
189 Arc<
190 dyn Fn(LocalComponentHandles) -> BoxFuture<'static, Result<(), Error>>
191 + Sync
192 + Send
193 + 'static,
194 >,
195>;
196
197#[derive(Clone, Debug)]
198pub struct LocalComponentRunnerBuilder {
199 local_component_implementations: Arc<Mutex<Option<LocalComponentImplementations>>>,
200}
201
202impl LocalComponentRunnerBuilder {
203 pub fn new() -> Self {
204 Self { local_component_implementations: Arc::new(Mutex::new(Some(HashMap::new()))) }
205 }
206
207 pub(crate) async fn register_local_component<I>(
208 &self,
209 name: String,
210 implementation: I,
211 ) -> Result<(), ftest::RealmBuilderError>
212 where
213 I: Fn(LocalComponentHandles) -> BoxFuture<'static, Result<(), Error>>
214 + Sync
215 + Send
216 + 'static,
217 {
218 self.local_component_implementations
219 .lock()
220 .await
221 .as_mut()
222 .ok_or(ftest::RealmBuilderError::BuildAlreadyCalled)?
223 .insert(name, Arc::new(implementation));
224 Ok(())
225 }
226
227 pub(crate) async fn build(
228 self,
229 ) -> Result<
230 (ClientEnd<fcrunner::ComponentRunnerMarker>, fasync::Task<()>),
231 ftest::RealmBuilderError,
232 > {
233 let local_component_implementations = self
234 .local_component_implementations
235 .lock()
236 .await
237 .take()
238 .ok_or(ftest::RealmBuilderError::BuildAlreadyCalled)?;
239 let (runner_client_end, runner_request_stream) =
240 create_request_stream::<fcrunner::ComponentRunnerMarker>();
241 let runner = LocalComponentRunner::new(local_component_implementations);
242 let runner_task = fasync::Task::spawn(async move {
243 if let Err(e) = runner.handle_stream(runner_request_stream).await {
244 error!("failed to run local component runner: {:?}", e);
245 }
246 });
247
248 Ok((runner_client_end, runner_task))
249 }
250}
251
252pub struct LocalComponentRunner {
253 execution_scope: ExecutionScope,
254 local_component_implementations: HashMap<
255 String,
256 Arc<
257 dyn Fn(LocalComponentHandles) -> BoxFuture<'static, Result<(), Error>>
258 + Sync
259 + Send
260 + 'static,
261 >,
262 >,
263}
264
265impl Drop for LocalComponentRunner {
266 fn drop(&mut self) {
267 self.execution_scope.shutdown();
268 }
269}
270
271impl LocalComponentRunner {
272 fn new(
273 local_component_implementations: HashMap<
274 String,
275 Arc<
276 dyn Fn(LocalComponentHandles) -> BoxFuture<'static, Result<(), Error>>
277 + Sync
278 + Send
279 + 'static,
280 >,
281 >,
282 ) -> Self {
283 Self { local_component_implementations, execution_scope: ExecutionScope::new() }
284 }
285
286 async fn handle_stream(
287 &self,
288 mut runner_request_stream: fcrunner::ComponentRunnerRequestStream,
289 ) -> Result<(), Error> {
290 while let Some(req) = runner_request_stream.try_next().await? {
291 match req {
292 fcrunner::ComponentRunnerRequest::Start { start_info, controller, .. } => {
293 let program = start_info
294 .program
295 .ok_or_else(|| format_err!("program is missing from start_info"))?;
296 let namespace = start_info
297 .ns
298 .ok_or_else(|| format_err!("namespace is missing from start_info"))?;
299 let numbered_handles = start_info.numbered_handles.unwrap_or_default();
300 let outgoing_dir = start_info
301 .outgoing_dir
302 .ok_or_else(|| format_err!("outgoing_dir is missing from start_info"))?;
303 let _runtime_dir_server_end: ServerEnd<fio::DirectoryMarker> = start_info
304 .runtime_dir
305 .ok_or_else(|| format_err!("runtime_dir is missing from start_info"))?;
306
307 let local_component_name = extract_local_component_name(program)?;
308 let local_component_implementation = self
309 .local_component_implementations
310 .get(&local_component_name)
311 .ok_or_else(|| {
312 format_err!("no such local component: {:?}", local_component_name)
313 })?
314 .clone();
315 let (component_handles, stop_notifier) =
316 LocalComponentHandles::new(namespace, numbered_handles, outgoing_dir)?;
317
318 let mut controller_request_stream = controller.into_stream();
319 self.execution_scope.spawn(async move {
320 let mut local_component_implementation_fut =
321 (*local_component_implementation)(component_handles).fuse();
322 let controller_control_handle = controller_request_stream.control_handle();
323 let mut controller_request_fut =
324 controller_request_stream.try_next().fuse();
325 loop {
326 select! {
327 res = local_component_implementation_fut => {
328 let epitaph = match res {
329 Err(e) => {
330 error!(
331 "the local component {:?} returned an error: {:?}",
332 local_component_name,
333 e,
334 );
335 zx::Status::from_raw(fcomponent::Error::InstanceDied.into_primitive() as i32)
336 }
337 Ok(()) => zx::Status::OK,
338 };
339 controller_control_handle.shutdown_with_epitaph(epitaph);
340 return;
341 }
342 req_res = controller_request_fut => {
343 match req_res.expect("invalid controller request") {
344 Some(fcrunner::ComponentControllerRequest::Stop { .. }) => {
345 if let Some(stop_notifier) =
346 stop_notifier.lock().await.take()
347 {
348 let _ = stop_notifier.send(());
354
355 controller_request_fut = controller_request_stream.try_next().fuse();
358 } else {
359 controller_control_handle.shutdown_with_epitaph(
360 zx::Status::from_raw(fcomponent::Error::InstanceDied.into_primitive() as i32),
361 );
362 return;
363 }
364 }
365 Some(fcrunner::ComponentControllerRequest::Kill { .. }) => {
366 controller_control_handle.shutdown_with_epitaph(
367 zx::Status::from_raw(fcomponent::Error::InstanceDied.into_primitive() as i32),
368 );
369 return;
370 }
371 _ => return,
372 }
373 }
374 };
375 }
376 });
377 }
378 fcrunner::ComponentRunnerRequest::_UnknownMethod { ordinal, .. } => {
379 warn!(ordinal:%; "Unknown ComponentController request");
380 }
381 }
382 }
383 Ok(())
384 }
385}
386
387fn extract_local_component_name(dict: fdata::Dictionary) -> Result<String, Error> {
388 let entry_value = get_dictionary_value(&dict, ftest::LOCAL_COMPONENT_NAME_KEY)
389 .ok_or_else(|| format_err!("program section is missing component name"))?;
390 if let fdata::DictionaryValue::Str(s) = entry_value {
391 return Ok(s.clone());
392 } else {
393 return Err(format_err!("malformed program section"));
394 }
395}
396
397#[cfg(test)]
398mod tests {
399 use super::*;
400 use assert_matches::assert_matches;
401 use fidl::endpoints::{create_proxy, Proxy as _};
402 use futures::future::pending;
403 use zx::AsHandleRef;
404
405 #[fuchsia::test]
406 async fn runner_builder_correctly_stores_a_function() {
407 let runner_builder = LocalComponentRunnerBuilder::new();
408 let (sender, receiver) = oneshot::channel();
409 let sender = Arc::new(Mutex::new(Some(sender)));
410
411 let component_name = "test".to_string();
412
413 runner_builder
414 .register_local_component(component_name.clone(), move |_handles| {
415 let sender = sender.clone();
416 async move {
417 let sender = sender.lock().await.take().expect("local component invoked twice");
418 sender.send(()).expect("failed to send");
419 Ok(())
420 }
421 .boxed()
422 })
423 .await
424 .unwrap();
425
426 let (_, outgoing_dir) = create_proxy();
427 let handles = LocalComponentHandles {
428 namespace: HashMap::new(),
429 numbered_handles: HashMap::new(),
430 outgoing_dir,
431 stop_notifier: Arc::new(Mutex::new(None)),
432 };
433 let local_component_implementation = runner_builder
434 .local_component_implementations
435 .lock()
436 .await
437 .as_ref()
438 .unwrap()
439 .get(&component_name)
440 .expect("local component missing from runner builder")
441 .clone();
442
443 (*local_component_implementation)(handles)
444 .await
445 .expect("local component implementation failed");
446 let () = receiver.await.expect("failed to receive");
447 }
448
449 struct RunnerAndHandles {
450 _runner_task: fasync::Task<()>,
451 _component_runner_proxy: fcrunner::ComponentRunnerProxy,
452 _runtime_dir_proxy: fio::DirectoryProxy,
453 outgoing_dir_proxy: fio::DirectoryProxy,
454 controller_proxy: fcrunner::ComponentControllerProxy,
455 }
456
457 async fn build_and_start(
458 runner_builder: LocalComponentRunnerBuilder,
459 component_to_start: String,
460 ) -> RunnerAndHandles {
461 let (component_runner_client_end, runner_task) = runner_builder.build().await.unwrap();
462 let component_runner_proxy = component_runner_client_end.into_proxy();
463
464 let (runtime_dir_proxy, runtime_dir_server_end) = create_proxy();
465 let (outgoing_dir_proxy, outgoing_dir_server_end) = create_proxy();
466 let (controller_proxy, controller_server_end) = create_proxy();
467 component_runner_proxy
468 .start(
469 fcrunner::ComponentStartInfo {
470 resolved_url: Some("test://test".to_string()),
471 program: Some(fdata::Dictionary {
472 entries: Some(vec![fdata::DictionaryEntry {
473 key: ftest::LOCAL_COMPONENT_NAME_KEY.to_string(),
474 value: Some(Box::new(fdata::DictionaryValue::Str(component_to_start))),
475 }]),
476 ..Default::default()
477 }),
478 ns: Some(vec![]),
479 outgoing_dir: Some(outgoing_dir_server_end),
480 runtime_dir: Some(runtime_dir_server_end),
481 numbered_handles: Some(vec![]),
482 ..Default::default()
483 },
484 controller_server_end,
485 )
486 .expect("failed to send start");
487
488 RunnerAndHandles {
489 _runner_task: runner_task,
490 _component_runner_proxy: component_runner_proxy,
491 _runtime_dir_proxy: runtime_dir_proxy,
492 outgoing_dir_proxy,
493 controller_proxy,
494 }
495 }
496
497 #[fuchsia::test]
498 async fn the_runner_runs_a_component() {
499 let runner_builder = LocalComponentRunnerBuilder::new();
500 let (sender, receiver) = oneshot::channel();
501 let sender = Arc::new(Mutex::new(Some(sender)));
502
503 let component_name = "test".to_string();
504
505 runner_builder
506 .register_local_component(component_name.clone(), move |_handles| {
507 let sender = sender.clone();
508 async move {
509 let sender = sender.lock().await.take().expect("local component invoked twice");
510 sender.send(()).expect("failed to send");
511 Ok(())
512 }
513 .boxed()
514 })
515 .await
516 .unwrap();
517
518 let _runner_and_handles = build_and_start(runner_builder, component_name).await;
519
520 let () = receiver.await.expect("failed to receive");
521 }
522
523 #[fuchsia::test]
524 async fn the_runner_gives_the_component_its_outgoing_dir() {
525 let runner_builder = LocalComponentRunnerBuilder::new();
526 let (sender, receiver) = oneshot::channel::<ServerEnd<fio::DirectoryMarker>>();
527 let sender = Arc::new(Mutex::new(Some(sender)));
528
529 let component_name = "test".to_string();
530
531 runner_builder
532 .register_local_component(component_name.clone(), move |handles| {
533 let sender = sender.clone();
534 async move {
535 let _ = &handles;
536 sender
537 .lock()
538 .await
539 .take()
540 .expect("local component invoked twice")
541 .send(handles.outgoing_dir)
542 .expect("failed to send");
543 Ok(())
544 }
545 .boxed()
546 })
547 .await
548 .unwrap();
549
550 let runner_and_handles = build_and_start(runner_builder, component_name.clone()).await;
551
552 let outgoing_dir_server_end = receiver.await.expect("failed to receive");
553
554 assert_eq!(
555 outgoing_dir_server_end
556 .into_channel()
557 .basic_info()
558 .expect("failed to get basic info")
559 .koid,
560 runner_and_handles
561 .outgoing_dir_proxy
562 .into_channel()
563 .expect("failed to convert to channel")
564 .basic_info()
565 .expect("failed to get basic info")
566 .related_koid,
567 );
568 }
569
570 #[fuchsia::test]
571 async fn controller_stop_will_stop_a_component() {
572 let runner_builder = LocalComponentRunnerBuilder::new();
573 let (sender, receiver) = oneshot::channel::<()>();
574 let sender = Arc::new(Mutex::new(Some(sender)));
575
576 let component_name = "test".to_string();
577
578 runner_builder
579 .register_local_component(component_name.clone(), move |_handles| {
580 let sender = sender.clone();
581 async move {
582 let _sender =
583 sender.lock().await.take().expect("local component invoked twice");
584 pending().await
587 }
588 .boxed()
589 })
590 .await
591 .unwrap();
592
593 let runner_and_handles = build_and_start(runner_builder, component_name).await;
594 runner_and_handles.controller_proxy.stop().expect("failed to send stop");
595
596 assert_eq!(Err(oneshot::Canceled), receiver.await);
597 }
598
599 #[fuchsia::test]
600 async fn controller_kill_will_kill_a_component() {
601 let runner_builder = LocalComponentRunnerBuilder::new();
602 let (sender, receiver) = oneshot::channel::<()>();
603 let sender = Arc::new(Mutex::new(Some(sender)));
604
605 let component_name = "test".to_string();
606
607 runner_builder
608 .register_local_component(component_name.clone(), move |_handles| {
609 let sender = sender.clone();
610 async move {
611 let _sender =
612 sender.lock().await.take().expect("local component invoked twice");
613 pending().await
616 }
617 .boxed()
618 })
619 .await
620 .unwrap();
621
622 let runner_and_handles = build_and_start(runner_builder, component_name).await;
623 runner_and_handles.controller_proxy.kill().expect("failed to send stop");
624
625 assert_eq!(Err(oneshot::Canceled), receiver.await);
626 }
627
628 #[fuchsia::test]
629 async fn stopping_a_component_calls_the_notifier() {
630 let runner_builder = LocalComponentRunnerBuilder::new();
631 let (notifier_registered_sender, notifier_registered_receiver) = oneshot::channel::<()>();
632 let notifier_registered_sender = Arc::new(Mutex::new(Some(notifier_registered_sender)));
633
634 let (notifier_fired_sender, notifier_fired_receiver) = oneshot::channel::<()>();
635 let notifier_fired_sender = Arc::new(Mutex::new(Some(notifier_fired_sender)));
636
637 let component_name = "test".to_string();
638
639 runner_builder
640 .register_local_component(component_name.clone(), move |handles| {
641 let notifier_registered_sender = notifier_registered_sender.clone();
642 let notifier_fired_sender = notifier_fired_sender.clone();
643 async move {
644 let stop_notifier = handles.register_stop_notifier().await;
645
646 let sender = notifier_registered_sender
647 .lock()
648 .await
649 .take()
650 .expect("local component invoked twice");
651 sender.send(()).expect("failed to send that the stop notifier was registered");
652
653 stop_notifier.await.expect("failed to wait for stop notification");
654
655 let sender = notifier_fired_sender
656 .lock()
657 .await
658 .take()
659 .expect("local component invoked twice");
660 sender
661 .send(())
662 .expect("failed to send that the stop notifier received a message");
663
664 Ok(())
665 }
666 .boxed()
667 })
668 .await
669 .unwrap();
670
671 let runner_and_handles = build_and_start(runner_builder, component_name).await;
672
673 assert_matches!(notifier_registered_receiver.await, Ok(()));
675
676 runner_and_handles.controller_proxy.stop().expect("failed to send stop");
678
679 assert_matches!(notifier_fired_receiver.await, Ok(()));
681 }
682
683 #[fuchsia::test]
684 async fn dropping_the_runner_will_kill_a_component() {
685 let runner_builder = LocalComponentRunnerBuilder::new();
686 let (sender, receiver) = oneshot::channel::<()>();
687 let sender = Arc::new(Mutex::new(Some(sender)));
688
689 let component_name = "test".to_string();
690
691 runner_builder
692 .register_local_component(component_name.clone(), move |_handles| {
693 let sender = sender.clone();
694 async move {
695 let _sender =
696 sender.lock().await.take().expect("local component invoked twice");
697 pending().await
700 }
701 .boxed()
702 })
703 .await
704 .unwrap();
705
706 let runner_and_handles = build_and_start(runner_builder, component_name).await;
707 drop(runner_and_handles);
708
709 assert_eq!(Err(oneshot::Canceled), receiver.await);
710 }
711}