1use anyhow::{format_err, Context as _, Error};
6use fidl::endpoints::{
7 create_request_stream, ClientEnd, ControlHandle, DiscoverableProtocolMarker, ProtocolMarker,
8 RequestStream, ServerEnd, ServiceMarker, ServiceProxy,
9};
10use fuchsia_component::DEFAULT_SERVICE_INSTANCE;
11use futures::channel::oneshot;
12use futures::future::BoxFuture;
13use futures::lock::Mutex;
14use futures::{select, FutureExt, TryStreamExt};
15use log::*;
16use runner::get_value as get_dictionary_value;
17use std::collections::HashMap;
18use std::sync::Arc;
19use vfs::execution_scope::ExecutionScope;
20use {
21 fidl_fuchsia_component as fcomponent, fidl_fuchsia_component_runner as fcrunner,
22 fidl_fuchsia_component_test as ftest, fidl_fuchsia_data as fdata, fidl_fuchsia_io as fio,
23 fidl_fuchsia_process as fprocess, fuchsia_async as fasync,
24};
25
26pub struct LocalComponentHandles {
29 namespace: HashMap<String, fio::DirectoryProxy>,
30 numbered_handles: HashMap<u32, zx::Handle>,
31
32 stop_notifier: Arc<Mutex<Option<oneshot::Sender<()>>>>,
33
34 pub outgoing_dir: ServerEnd<fio::DirectoryMarker>,
37}
38
39impl LocalComponentHandles {
40 fn new(
41 fidl_namespace: Vec<fcrunner::ComponentNamespaceEntry>,
42 fidl_numbered_handles: Vec<fprocess::HandleInfo>,
43 outgoing_dir: ServerEnd<fio::DirectoryMarker>,
44 ) -> Result<(Self, Arc<Mutex<Option<oneshot::Sender<()>>>>), Error> {
45 let stop_notifier = Arc::new(Mutex::new(None));
46 let mut namespace = HashMap::new();
47 for namespace_entry in fidl_namespace {
48 namespace.insert(
49 namespace_entry.path.ok_or_else(|| format_err!("namespace entry missing path"))?,
50 namespace_entry
51 .directory
52 .ok_or_else(|| format_err!("namespace entry missing directory handle"))?
53 .into_proxy(),
54 );
55 }
56 let numbered_handles =
57 fidl_numbered_handles.into_iter().map(|h| (h.id, h.handle)).collect::<HashMap<_, _>>();
58 Ok((
59 Self {
60 namespace,
61 numbered_handles,
62 outgoing_dir,
63 stop_notifier: stop_notifier.clone(),
64 },
65 stop_notifier,
66 ))
67 }
68
69 pub fn take_numbered_handle(&mut self, id: u32) -> Option<zx::Handle> {
70 self.numbered_handles.remove(&id)
71 }
72
73 pub fn numbered_handles(&self) -> &HashMap<u32, zx::Handle> {
74 &self.numbered_handles
75 }
76
77 pub async fn register_stop_notifier(&self) -> oneshot::Receiver<()> {
88 let mut stop_notifier_guard = self.stop_notifier.lock().await;
89 if stop_notifier_guard.is_some() {
90 panic!("cannot register multiple stop handlers for a single local component");
91 }
92 let (sender, receiver) = oneshot::channel();
93 *stop_notifier_guard = Some(sender);
94 receiver
95 }
96
97 pub fn connect_to_protocol<P: DiscoverableProtocolMarker>(&self) -> Result<P::Proxy, Error> {
99 self.connect_to_named_protocol::<P>(P::PROTOCOL_NAME)
100 }
101
102 pub fn connect_to_named_protocol<P: ProtocolMarker>(
104 &self,
105 name: &str,
106 ) -> Result<P::Proxy, Error> {
107 let svc_dir_proxy = self.namespace.get("/svc").ok_or_else(|| {
108 format_err!("the component's namespace doesn't have a /svc directory")
109 })?;
110 fuchsia_component::client::connect_to_named_protocol_at_dir_root::<P>(svc_dir_proxy, name)
111 }
112
113 pub fn open_service<S: ServiceMarker>(&self) -> Result<fio::DirectoryProxy, Error> {
115 self.open_named_service(S::SERVICE_NAME)
116 }
117
118 pub fn open_named_service(&self, name: &str) -> Result<fio::DirectoryProxy, Error> {
121 let svc_dir_proxy = self.namespace.get("/svc").ok_or_else(|| {
122 format_err!("the component's namespace doesn't have a /svc directory")
123 })?;
124 fuchsia_fs::directory::open_directory_async(&svc_dir_proxy, name, fio::Flags::empty())
125 .map_err(Into::into)
126 }
127
128 pub fn connect_to_service<S: ServiceMarker>(&self) -> Result<S::Proxy, Error> {
131 self.connect_to_service_instance::<S>(DEFAULT_SERVICE_INSTANCE)
132 }
133
134 pub fn connect_to_service_instance<S: ServiceMarker>(
138 &self,
139 instance_name: &str,
140 ) -> Result<S::Proxy, Error> {
141 self.connect_to_named_service_instance::<S>(S::SERVICE_NAME, instance_name)
142 }
143
144 pub fn connect_to_named_service_instance<S: ServiceMarker>(
148 &self,
149 service_name: &str,
150 instance_name: &str,
151 ) -> Result<S::Proxy, Error> {
152 let service_dir = self.open_named_service(service_name)?;
153 let directory_proxy = fuchsia_fs::directory::open_directory_async(
154 &service_dir,
155 instance_name,
156 fio::Flags::empty(),
157 )?;
158 Ok(S::Proxy::from_member_opener(Box::new(
159 fuchsia_component::client::ServiceInstanceDirectory(
160 directory_proxy,
161 instance_name.to_owned(),
162 ),
163 )))
164 }
165
166 pub fn clone_from_namespace(&self, directory_name: &str) -> Result<fio::DirectoryProxy, Error> {
179 let dir_proxy = self.namespace.get(&format!("/{}", directory_name)).ok_or_else(|| {
180 format_err!(
181 "the local component's namespace doesn't have a /{} directory",
182 directory_name
183 )
184 })?;
185 fuchsia_fs::directory::clone(&dir_proxy).context("clone")
186 }
187}
188
189type LocalComponentImplementations = HashMap<
190 String,
191 Arc<
192 dyn Fn(LocalComponentHandles) -> BoxFuture<'static, Result<(), Error>>
193 + Sync
194 + Send
195 + 'static,
196 >,
197>;
198
199#[derive(Clone, Debug)]
200pub struct LocalComponentRunnerBuilder {
201 local_component_implementations: Arc<Mutex<Option<LocalComponentImplementations>>>,
202}
203
204impl LocalComponentRunnerBuilder {
205 pub fn new() -> Self {
206 Self { local_component_implementations: Arc::new(Mutex::new(Some(HashMap::new()))) }
207 }
208
209 pub(crate) async fn register_local_component<I>(
210 &self,
211 name: String,
212 implementation: I,
213 ) -> Result<(), ftest::RealmBuilderError>
214 where
215 I: Fn(LocalComponentHandles) -> BoxFuture<'static, Result<(), Error>>
216 + Sync
217 + Send
218 + 'static,
219 {
220 self.local_component_implementations
221 .lock()
222 .await
223 .as_mut()
224 .ok_or(ftest::RealmBuilderError::BuildAlreadyCalled)?
225 .insert(name, Arc::new(implementation));
226 Ok(())
227 }
228
229 pub(crate) async fn build(
230 self,
231 ) -> Result<
232 (ClientEnd<fcrunner::ComponentRunnerMarker>, fasync::Task<()>),
233 ftest::RealmBuilderError,
234 > {
235 let local_component_implementations = self
236 .local_component_implementations
237 .lock()
238 .await
239 .take()
240 .ok_or(ftest::RealmBuilderError::BuildAlreadyCalled)?;
241 let (runner_client_end, runner_request_stream) =
242 create_request_stream::<fcrunner::ComponentRunnerMarker>();
243 let runner = LocalComponentRunner::new(local_component_implementations);
244 let runner_task = fasync::Task::spawn(async move {
245 if let Err(e) = runner.handle_stream(runner_request_stream).await {
246 error!("failed to run local component runner: {:?}", e);
247 }
248 });
249
250 Ok((runner_client_end, runner_task))
251 }
252}
253
254pub struct LocalComponentRunner {
255 execution_scope: ExecutionScope,
256 local_component_implementations: HashMap<
257 String,
258 Arc<
259 dyn Fn(LocalComponentHandles) -> BoxFuture<'static, Result<(), Error>>
260 + Sync
261 + Send
262 + 'static,
263 >,
264 >,
265}
266
267impl Drop for LocalComponentRunner {
268 fn drop(&mut self) {
269 self.execution_scope.shutdown();
270 }
271}
272
273impl LocalComponentRunner {
274 fn new(
275 local_component_implementations: HashMap<
276 String,
277 Arc<
278 dyn Fn(LocalComponentHandles) -> BoxFuture<'static, Result<(), Error>>
279 + Sync
280 + Send
281 + 'static,
282 >,
283 >,
284 ) -> Self {
285 Self { local_component_implementations, execution_scope: ExecutionScope::new() }
286 }
287
288 async fn handle_stream(
289 &self,
290 mut runner_request_stream: fcrunner::ComponentRunnerRequestStream,
291 ) -> Result<(), Error> {
292 while let Some(req) = runner_request_stream.try_next().await? {
293 match req {
294 fcrunner::ComponentRunnerRequest::Start { start_info, controller, .. } => {
295 let program = start_info
296 .program
297 .ok_or_else(|| format_err!("program is missing from start_info"))?;
298 let namespace = start_info
299 .ns
300 .ok_or_else(|| format_err!("namespace is missing from start_info"))?;
301 let numbered_handles = start_info.numbered_handles.unwrap_or_default();
302 let outgoing_dir = start_info
303 .outgoing_dir
304 .ok_or_else(|| format_err!("outgoing_dir is missing from start_info"))?;
305 let _runtime_dir_server_end: ServerEnd<fio::DirectoryMarker> = start_info
306 .runtime_dir
307 .ok_or_else(|| format_err!("runtime_dir is missing from start_info"))?;
308
309 let local_component_name = extract_local_component_name(program)?;
310 let local_component_implementation = self
311 .local_component_implementations
312 .get(&local_component_name)
313 .ok_or_else(|| {
314 format_err!("no such local component: {:?}", local_component_name)
315 })?
316 .clone();
317 let (component_handles, stop_notifier) =
318 LocalComponentHandles::new(namespace, numbered_handles, outgoing_dir)?;
319
320 let mut controller_request_stream = controller.into_stream();
321 self.execution_scope.spawn(async move {
322 let mut local_component_implementation_fut =
323 (*local_component_implementation)(component_handles).fuse();
324 let controller_control_handle = controller_request_stream.control_handle();
325 let mut controller_request_fut =
326 controller_request_stream.try_next().fuse();
327 loop {
328 select! {
329 res = local_component_implementation_fut => {
330 let epitaph = match res {
331 Err(e) => {
332 error!(
333 "the local component {:?} returned an error: {:?}",
334 local_component_name,
335 e,
336 );
337 zx::Status::from_raw(fcomponent::Error::InstanceDied.into_primitive() as i32)
338 }
339 Ok(()) => zx::Status::OK,
340 };
341 controller_control_handle.shutdown_with_epitaph(epitaph);
342 return;
343 }
344 req_res = controller_request_fut => {
345 match req_res.expect("invalid controller request") {
346 Some(fcrunner::ComponentControllerRequest::Stop { .. }) => {
347 if let Some(stop_notifier) =
348 stop_notifier.lock().await.take()
349 {
350 let _ = stop_notifier.send(());
356
357 controller_request_fut = controller_request_stream.try_next().fuse();
360 } else {
361 controller_control_handle.shutdown_with_epitaph(
362 zx::Status::from_raw(fcomponent::Error::InstanceDied.into_primitive() as i32),
363 );
364 return;
365 }
366 }
367 Some(fcrunner::ComponentControllerRequest::Kill { .. }) => {
368 controller_control_handle.shutdown_with_epitaph(
369 zx::Status::from_raw(fcomponent::Error::InstanceDied.into_primitive() as i32),
370 );
371 return;
372 }
373 _ => return,
374 }
375 }
376 };
377 }
378 });
379 }
380 fcrunner::ComponentRunnerRequest::_UnknownMethod { ordinal, .. } => {
381 warn!(ordinal:%; "Unknown ComponentController request");
382 }
383 }
384 }
385 Ok(())
386 }
387}
388
389fn extract_local_component_name(dict: fdata::Dictionary) -> Result<String, Error> {
390 let entry_value = get_dictionary_value(&dict, ftest::LOCAL_COMPONENT_NAME_KEY)
391 .ok_or_else(|| format_err!("program section is missing component name"))?;
392 if let fdata::DictionaryValue::Str(s) = entry_value {
393 return Ok(s.clone());
394 } else {
395 return Err(format_err!("malformed program section"));
396 }
397}
398
399#[cfg(test)]
400mod tests {
401 use super::*;
402 use assert_matches::assert_matches;
403 use fidl::endpoints::{create_proxy, Proxy as _};
404 use futures::future::pending;
405 use zx::AsHandleRef;
406
407 #[fuchsia::test]
408 async fn runner_builder_correctly_stores_a_function() {
409 let runner_builder = LocalComponentRunnerBuilder::new();
410 let (sender, receiver) = oneshot::channel();
411 let sender = Arc::new(Mutex::new(Some(sender)));
412
413 let component_name = "test".to_string();
414
415 runner_builder
416 .register_local_component(component_name.clone(), move |_handles| {
417 let sender = sender.clone();
418 async move {
419 let sender = sender.lock().await.take().expect("local component invoked twice");
420 sender.send(()).expect("failed to send");
421 Ok(())
422 }
423 .boxed()
424 })
425 .await
426 .unwrap();
427
428 let (_, outgoing_dir) = create_proxy();
429 let handles = LocalComponentHandles {
430 namespace: HashMap::new(),
431 numbered_handles: HashMap::new(),
432 outgoing_dir,
433 stop_notifier: Arc::new(Mutex::new(None)),
434 };
435 let local_component_implementation = runner_builder
436 .local_component_implementations
437 .lock()
438 .await
439 .as_ref()
440 .unwrap()
441 .get(&component_name)
442 .expect("local component missing from runner builder")
443 .clone();
444
445 (*local_component_implementation)(handles)
446 .await
447 .expect("local component implementation failed");
448 let () = receiver.await.expect("failed to receive");
449 }
450
451 struct RunnerAndHandles {
452 _runner_task: fasync::Task<()>,
453 _component_runner_proxy: fcrunner::ComponentRunnerProxy,
454 _runtime_dir_proxy: fio::DirectoryProxy,
455 outgoing_dir_proxy: fio::DirectoryProxy,
456 controller_proxy: fcrunner::ComponentControllerProxy,
457 }
458
459 async fn build_and_start(
460 runner_builder: LocalComponentRunnerBuilder,
461 component_to_start: String,
462 ) -> RunnerAndHandles {
463 let (component_runner_client_end, runner_task) = runner_builder.build().await.unwrap();
464 let component_runner_proxy = component_runner_client_end.into_proxy();
465
466 let (runtime_dir_proxy, runtime_dir_server_end) = create_proxy();
467 let (outgoing_dir_proxy, outgoing_dir_server_end) = create_proxy();
468 let (controller_proxy, controller_server_end) = create_proxy();
469 component_runner_proxy
470 .start(
471 fcrunner::ComponentStartInfo {
472 resolved_url: Some("test://test".to_string()),
473 program: Some(fdata::Dictionary {
474 entries: Some(vec![fdata::DictionaryEntry {
475 key: ftest::LOCAL_COMPONENT_NAME_KEY.to_string(),
476 value: Some(Box::new(fdata::DictionaryValue::Str(component_to_start))),
477 }]),
478 ..Default::default()
479 }),
480 ns: Some(vec![]),
481 outgoing_dir: Some(outgoing_dir_server_end),
482 runtime_dir: Some(runtime_dir_server_end),
483 numbered_handles: Some(vec![]),
484 ..Default::default()
485 },
486 controller_server_end,
487 )
488 .expect("failed to send start");
489
490 RunnerAndHandles {
491 _runner_task: runner_task,
492 _component_runner_proxy: component_runner_proxy,
493 _runtime_dir_proxy: runtime_dir_proxy,
494 outgoing_dir_proxy,
495 controller_proxy,
496 }
497 }
498
499 #[fuchsia::test]
500 async fn the_runner_runs_a_component() {
501 let runner_builder = LocalComponentRunnerBuilder::new();
502 let (sender, receiver) = oneshot::channel();
503 let sender = Arc::new(Mutex::new(Some(sender)));
504
505 let component_name = "test".to_string();
506
507 runner_builder
508 .register_local_component(component_name.clone(), move |_handles| {
509 let sender = sender.clone();
510 async move {
511 let sender = sender.lock().await.take().expect("local component invoked twice");
512 sender.send(()).expect("failed to send");
513 Ok(())
514 }
515 .boxed()
516 })
517 .await
518 .unwrap();
519
520 let _runner_and_handles = build_and_start(runner_builder, component_name).await;
521
522 let () = receiver.await.expect("failed to receive");
523 }
524
525 #[fuchsia::test]
526 async fn the_runner_gives_the_component_its_outgoing_dir() {
527 let runner_builder = LocalComponentRunnerBuilder::new();
528 let (sender, receiver) = oneshot::channel::<ServerEnd<fio::DirectoryMarker>>();
529 let sender = Arc::new(Mutex::new(Some(sender)));
530
531 let component_name = "test".to_string();
532
533 runner_builder
534 .register_local_component(component_name.clone(), move |handles| {
535 let sender = sender.clone();
536 async move {
537 let _ = &handles;
538 sender
539 .lock()
540 .await
541 .take()
542 .expect("local component invoked twice")
543 .send(handles.outgoing_dir)
544 .expect("failed to send");
545 Ok(())
546 }
547 .boxed()
548 })
549 .await
550 .unwrap();
551
552 let runner_and_handles = build_and_start(runner_builder, component_name.clone()).await;
553
554 let outgoing_dir_server_end = receiver.await.expect("failed to receive");
555
556 assert_eq!(
557 outgoing_dir_server_end
558 .into_channel()
559 .basic_info()
560 .expect("failed to get basic info")
561 .koid,
562 runner_and_handles
563 .outgoing_dir_proxy
564 .into_channel()
565 .expect("failed to convert to channel")
566 .basic_info()
567 .expect("failed to get basic info")
568 .related_koid,
569 );
570 }
571
572 #[fuchsia::test]
573 async fn controller_stop_will_stop_a_component() {
574 let runner_builder = LocalComponentRunnerBuilder::new();
575 let (sender, receiver) = oneshot::channel::<()>();
576 let sender = Arc::new(Mutex::new(Some(sender)));
577
578 let component_name = "test".to_string();
579
580 runner_builder
581 .register_local_component(component_name.clone(), move |_handles| {
582 let sender = sender.clone();
583 async move {
584 let _sender =
585 sender.lock().await.take().expect("local component invoked twice");
586 pending().await
589 }
590 .boxed()
591 })
592 .await
593 .unwrap();
594
595 let runner_and_handles = build_and_start(runner_builder, component_name).await;
596 runner_and_handles.controller_proxy.stop().expect("failed to send stop");
597
598 assert_eq!(Err(oneshot::Canceled), receiver.await);
599 }
600
601 #[fuchsia::test]
602 async fn controller_kill_will_kill_a_component() {
603 let runner_builder = LocalComponentRunnerBuilder::new();
604 let (sender, receiver) = oneshot::channel::<()>();
605 let sender = Arc::new(Mutex::new(Some(sender)));
606
607 let component_name = "test".to_string();
608
609 runner_builder
610 .register_local_component(component_name.clone(), move |_handles| {
611 let sender = sender.clone();
612 async move {
613 let _sender =
614 sender.lock().await.take().expect("local component invoked twice");
615 pending().await
618 }
619 .boxed()
620 })
621 .await
622 .unwrap();
623
624 let runner_and_handles = build_and_start(runner_builder, component_name).await;
625 runner_and_handles.controller_proxy.kill().expect("failed to send stop");
626
627 assert_eq!(Err(oneshot::Canceled), receiver.await);
628 }
629
630 #[fuchsia::test]
631 async fn stopping_a_component_calls_the_notifier() {
632 let runner_builder = LocalComponentRunnerBuilder::new();
633 let (notifier_registered_sender, notifier_registered_receiver) = oneshot::channel::<()>();
634 let notifier_registered_sender = Arc::new(Mutex::new(Some(notifier_registered_sender)));
635
636 let (notifier_fired_sender, notifier_fired_receiver) = oneshot::channel::<()>();
637 let notifier_fired_sender = Arc::new(Mutex::new(Some(notifier_fired_sender)));
638
639 let component_name = "test".to_string();
640
641 runner_builder
642 .register_local_component(component_name.clone(), move |handles| {
643 let notifier_registered_sender = notifier_registered_sender.clone();
644 let notifier_fired_sender = notifier_fired_sender.clone();
645 async move {
646 let stop_notifier = handles.register_stop_notifier().await;
647
648 let sender = notifier_registered_sender
649 .lock()
650 .await
651 .take()
652 .expect("local component invoked twice");
653 sender.send(()).expect("failed to send that the stop notifier was registered");
654
655 stop_notifier.await.expect("failed to wait for stop notification");
656
657 let sender = notifier_fired_sender
658 .lock()
659 .await
660 .take()
661 .expect("local component invoked twice");
662 sender
663 .send(())
664 .expect("failed to send that the stop notifier received a message");
665
666 Ok(())
667 }
668 .boxed()
669 })
670 .await
671 .unwrap();
672
673 let runner_and_handles = build_and_start(runner_builder, component_name).await;
674
675 assert_matches!(notifier_registered_receiver.await, Ok(()));
677
678 runner_and_handles.controller_proxy.stop().expect("failed to send stop");
680
681 assert_matches!(notifier_fired_receiver.await, Ok(()));
683 }
684
685 #[fuchsia::test]
686 async fn dropping_the_runner_will_kill_a_component() {
687 let runner_builder = LocalComponentRunnerBuilder::new();
688 let (sender, receiver) = oneshot::channel::<()>();
689 let sender = Arc::new(Mutex::new(Some(sender)));
690
691 let component_name = "test".to_string();
692
693 runner_builder
694 .register_local_component(component_name.clone(), move |_handles| {
695 let sender = sender.clone();
696 async move {
697 let _sender =
698 sender.lock().await.take().expect("local component invoked twice");
699 pending().await
702 }
703 .boxed()
704 })
705 .await
706 .unwrap();
707
708 let runner_and_handles = build_and_start(runner_builder, component_name).await;
709 drop(runner_and_handles);
710
711 assert_eq!(Err(oneshot::Canceled), receiver.await);
712 }
713}