1use crate::event::source::Event as SourceEvent;
17use crate::event::{self, Event};
18use crate::job::source::{self, Error};
19use crate::job::{self, Job, Payload, PinStream};
20use crate::message::base::MessengerType;
21use crate::service::{self, message};
22use crate::trace;
23use futures::stream::{FuturesUnordered, StreamFuture};
24use futures::{FutureExt, StreamExt};
25use std::collections::HashMap;
26use {fuchsia_async as fasync, fuchsia_trace as ftrace};
27
28type JobStreamItem = (source::Id, Option<Result<Job, Error>>);
29
30pub(crate) struct Manager {
33 sources: HashMap<source::Id, source::Handler>,
37 job_futures: FuturesUnordered<StreamFuture<PinStream<JobStreamItem>>>,
44 source_id_generator: source::IdGenerator,
47 execution_completion_sender: futures::channel::mpsc::UnboundedSender<(source::Id, job::Info)>,
50 message_hub_delegate: message::Delegate,
53 event_publisher: event::Publisher,
55}
56
57impl Manager {
58 pub(crate) async fn spawn(message_hub_delegate: &message::Delegate) -> message::Signature {
62 let receptor = message_hub_delegate
64 .create(MessengerType::Unbound)
65 .await
66 .expect("messenger should be available")
67 .1;
68
69 let (execution_completion_sender, execution_completion_receiver) =
71 futures::channel::mpsc::unbounded::<(source::Id, job::Info)>();
72
73 let signature = receptor.get_signature();
76 let event_publisher =
77 event::Publisher::create(message_hub_delegate, MessengerType::Unbound).await;
78
79 let mut manager = Self {
80 sources: HashMap::new(),
81 job_futures: FuturesUnordered::new(),
82 source_id_generator: source::IdGenerator::new(),
83 execution_completion_sender,
84 message_hub_delegate: message_hub_delegate.clone(),
85 event_publisher,
86 };
87
88 fasync::Task::local(async move {
93 let id = ftrace::Id::new();
94 trace!(id, c"job_manager");
95 let source_fuse = receptor.fuse();
96 let execution_fuse = execution_completion_receiver.fuse();
97
98 futures::pin_mut!(source_fuse, execution_fuse);
99 loop {
100 futures::select! {
101 source_event = source_fuse.select_next_some() => {
102 trace!(id, c"process_source_event");
103 manager.process_source_event(source_event).await;
104 },
105 (source_id, job_info) = execution_fuse.select_next_some() => {
106 trace!(id, c"process_completed_execution");
107 manager.process_completed_execution(source_id, job_info, id).await;
108 },
109 (job_info, stream) = manager.job_futures.select_next_some() => {
110 trace!(id, c"process_job");
111 let (source_id, job) = job_info.expect("job should be present");
114 manager.process_job(source_id, job, stream, id).await;
115 }
116 }
117 }
118 })
119 .detach();
120
121 signature
122 }
123
124 async fn process_completed_execution(
127 &mut self,
128 source_id: source::Id,
129 job_info: job::Info,
130 id: ftrace::Id,
131 ) {
132 let source_handler = &mut self.sources.get_mut(&source_id).expect("should find source");
134 source_handler.handle_job_completion(job_info);
135 self.remove_source_if_necessary(source_id);
136
137 self.process_next_job(id).await;
139 }
140
141 async fn process_next_job(&mut self, id: ftrace::Id) {
144 for (source_id, source_handler) in &mut self.sources {
146 let source_id = *source_id;
147 let execution_tx = self.execution_completion_sender.clone();
148
149 let _ = source_handler
151 .execute_next(
152 &mut self.message_hub_delegate,
153 move |job_info| {
154 if let Err(error) = execution_tx.unbounded_send((source_id, job_info)) {
155 panic!("Failed to send message. error: {error:?}");
156 };
157 },
158 id,
159 )
160 .await;
161 }
162 }
163
164 async fn process_source_event(&mut self, event: service::message::MessageEvent) {
167 let Payload::Source(source) = Payload::try_from(event).expect("should convert to source");
169
170 let job_stream = source.lock().await.take().expect("should capture job stream");
172
173 let source_id = self.source_id_generator.generate();
175
176 let _ = self.sources.insert(source_id, source::Handler::new());
178
179 let stream_fut = job_stream
182 .map(move |val| (source_id, Some(val)))
183 .chain(async move { (source_id, None) }.into_stream())
184 .boxed_local()
185 .into_future();
186 self.job_futures.push(stream_fut);
187 self.event_publisher.send_event(Event::Source(SourceEvent::Start(source_id)));
188 }
189
190 async fn process_job(
191 &mut self,
192 source: source::Id,
193 job: Option<Result<Job, Error>>,
194 source_stream: PinStream<JobStreamItem>,
195 id: ftrace::Id,
196 ) {
197 match job {
198 Some(Ok(job)) => {
199 if let Err(e) = self
202 .sources
203 .get_mut(&source)
204 .expect("source should be present")
205 .add_pending_job(job)
206 {
207 log::error!("Failed to add job: {:?}", e);
208 return;
209 }
210 }
211 Some(Err(Error::InvalidInput(error_responder))) => {
212 let id = error_responder.id();
215 if let Err(e) = error_responder.respond(fidl_fuchsia_settings::Error::Failed) {
216 log::warn!(
217 "Failed to report invalid input error to caller on API {} with id {:?}: \
218 {:?}",
219 id,
220 source,
221 e
222 );
223 }
224 }
225 Some(Err(Error::InvalidPolicyInput(error_responder))) => {
226 let id = error_responder.id();
229 if let Err(e) = error_responder.respond(fidl_fuchsia_settings_policy::Error::Failed)
230 {
231 log::warn!(
232 "Failed to report invalid policy input error to caller on policy API {} \
233 with id {:?}: {:?}",
234 id,
235 source,
236 e
237 );
238 }
239 }
240 Some(Err(Error::Unexpected(err))) if !err.is_closed() => {
241 log::warn!("Received an unexpected error on source {:?}: {:?}", source, err);
244 }
245 Some(Err(err @ (Error::Unexpected(_) | Error::Unsupported))) => {
246 log::warn!(
249 "Unable to process anymore job requests for {:?} due to fatal error: {:?}",
250 source,
251 err
252 );
253 self.cancel_source(source);
254 self.event_publisher
255 .send_event(Event::Source(SourceEvent::Complete(source, Err(err.into()))));
256 return;
257 }
258 None => {
259 self.cancel_source(source);
261 self.event_publisher
262 .send_event(Event::Source(SourceEvent::Complete(source, Ok(()))));
263 return;
264 }
265 }
266
267 self.job_futures.push(source_stream.into_future());
268 self.process_next_job(id).await;
269 }
270
271 fn cancel_source(&mut self, source_id: source::Id) {
272 self.sources.get_mut(&source_id).expect("should find source").cancel();
273 self.remove_source_if_necessary(source_id);
274 }
275
276 fn remove_source_if_necessary(&mut self, source_id: source::Id) {
277 let source_info = self.sources.get_mut(&source_id).expect("should find source");
278
279 if source_info.is_completed() {
280 let _ = self.sources.remove(&source_id);
281 }
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288 use crate::event::source::CompleteError;
289 use crate::message::base::Audience;
290 use crate::service::{build_event_listener, test, MessageHub};
291 use crate::tests::scaffold::workload::Workload;
292 use assert_matches::assert_matches;
293 use async_trait::async_trait;
294
295 use futures::channel::mpsc;
296 use futures::channel::oneshot::{self, Receiver, Sender};
297 use futures::lock::Mutex;
298 use std::rc::Rc;
299
300 #[fuchsia::test(allow_stalls = false)]
302 async fn test_manager_job_processing_multiple_jobs_one_source() {
303 let message_hub_delegate = MessageHub::create_hub();
305
306 let results = 0..10;
307
308 let mut receptor = message_hub_delegate
310 .create(MessengerType::Unbound)
311 .await
312 .expect("should create receptor")
313 .1;
314
315 let manager_signature = Manager::spawn(&message_hub_delegate).await;
316
317 let messenger = message_hub_delegate
319 .create(MessengerType::Unbound)
320 .await
321 .expect("should create messenger")
322 .0;
323
324 let (requests_tx, requests_rx) = mpsc::unbounded();
325
326 for result in results.clone() {
328 let signature = receptor.get_signature();
329 requests_tx
330 .unbounded_send(Ok(Job::new(job::work::Load::Independent(Workload::new(
331 test::Payload::Integer(result),
332 signature,
333 )))))
334 .expect("Should be able to queue requests");
335 }
336
337 let _ = messenger.message(
338 Payload::Source(Rc::new(Mutex::new(Some(requests_rx.boxed_local())))).into(),
339 Audience::Messenger(manager_signature),
340 );
341
342 for result in results {
343 assert_matches!(receptor.next_of::<test::Payload>().await.expect("should have payload").0,
345 test::Payload::Integer(value) if value == result);
346 }
347 }
348
349 #[fuchsia::test(allow_stalls = false)]
352 async fn test_manager_job_processing_handles_errored_conversions() {
353 struct TestResponder;
354 impl source::ErrorResponder for TestResponder {
355 fn id(&self) -> &'static str {
356 "Test"
357 }
358
359 fn respond(
360 self: Box<Self>,
361 error: fidl_fuchsia_settings::Error,
362 ) -> Result<(), fidl::Error> {
363 assert_eq!(error, fidl_fuchsia_settings::Error::Failed);
364 Ok(())
365 }
366 }
367
368 let message_hub_delegate = MessageHub::create_hub();
370
371 const RESULT: i64 = 1;
372
373 let mut receptor = message_hub_delegate
375 .create(MessengerType::Unbound)
376 .await
377 .expect("should create receptor")
378 .1;
379
380 let manager_signature = Manager::spawn(&message_hub_delegate).await;
381
382 let messenger = message_hub_delegate
384 .create(MessengerType::Unbound)
385 .await
386 .expect("should create messenger")
387 .0;
388
389 let (requests_tx, requests_rx) = mpsc::unbounded();
390
391 requests_tx
393 .unbounded_send(Err(Error::InvalidInput(Box::new(TestResponder))))
394 .expect("Should be able to queue requests");
395
396 let signature = receptor.get_signature();
398 requests_tx
399 .unbounded_send(Ok(Job::new(job::work::Load::Independent(Workload::new(
400 test::Payload::Integer(RESULT),
401 signature,
402 )))))
403 .expect("Should be able to queue requests");
404
405 let _ = messenger.message(
406 Payload::Source(Rc::new(Mutex::new(Some(requests_rx.boxed_local())))).into(),
407 Audience::Messenger(manager_signature),
408 );
409
410 assert_matches!(receptor.next_of::<test::Payload>().await.expect("should have payload").0,
412 test::Payload::Integer(value) if value == RESULT);
413 }
414
415 #[fuchsia::test(allow_stalls = false)]
418 async fn test_manager_job_processing_handles_errored_fidl() {
419 let message_hub_delegate = MessageHub::create_hub();
421
422 let mut receptor = message_hub_delegate
424 .create(MessengerType::Unbound)
425 .await
426 .expect("should create receptor")
427 .1;
428
429 let mut event_listener = build_event_listener(&message_hub_delegate).await;
430
431 let manager_signature = Manager::spawn(&message_hub_delegate).await;
432
433 let messenger = message_hub_delegate
435 .create(MessengerType::Unbound)
436 .await
437 .expect("should create messenger")
438 .0;
439
440 let (requests_tx, requests_rx) = mpsc::unbounded();
441
442 requests_tx
444 .unbounded_send(Err(Error::Unexpected(fidl::Error::ClientChannelClosed {
445 status: zx::Status::PEER_CLOSED,
446 protocol_name: "",
447 epitaph: None,
448 })))
449 .expect("Should be able to queue requests");
450
451 let signature = receptor.get_signature();
453 requests_tx
454 .unbounded_send(Ok(Job::new(job::work::Load::Independent(Workload::new(
455 test::Payload::Integer(1),
456 signature,
457 )))))
458 .expect("Should be able to queue requests");
459
460 let _ = messenger.message(
461 Payload::Source(Rc::new(Mutex::new(Some(requests_rx.boxed_local())))).into(),
462 Audience::Messenger(manager_signature),
463 );
464
465 assert_matches!(
467 event_listener.next_of::<event::Payload>().await,
468 Ok((event::Payload::Event(Event::Source(SourceEvent::Start(_))), _))
469 );
470 assert_matches!(
471 event_listener.next_of::<event::Payload>().await,
472 Ok((
473 event::Payload::Event(Event::Source(SourceEvent::Complete(
474 _,
475 Err(CompleteError::Unexpected)
476 ))),
477 _
478 ))
479 );
480
481 message_hub_delegate.delete(signature);
483
484 assert!(receptor.next_of::<test::Payload>().await.is_err());
486 }
487
488 #[fuchsia::test(allow_stalls = false)]
491 async fn test_invalid_policy_input_returns_error() {
492 struct TestPolicyResponder;
493 impl source::PolicyErrorResponder for TestPolicyResponder {
494 fn id(&self) -> &'static str {
495 "Test"
496 }
497
498 fn respond(
499 self: Box<Self>,
500 error: fidl_fuchsia_settings_policy::Error,
501 ) -> Result<(), fidl::Error> {
502 assert_eq!(error, fidl_fuchsia_settings_policy::Error::Failed);
503 Ok(())
504 }
505 }
506
507 let message_hub_delegate = MessageHub::create_hub();
509
510 const RESULT: i64 = 1;
511
512 let mut receptor = message_hub_delegate
514 .create(MessengerType::Unbound)
515 .await
516 .expect("should create receptor")
517 .1;
518
519 let manager_signature = Manager::spawn(&message_hub_delegate).await;
520
521 let messenger = message_hub_delegate
523 .create(MessengerType::Unbound)
524 .await
525 .expect("should create messenger")
526 .0;
527
528 let (requests_tx, requests_rx) = mpsc::unbounded();
529
530 requests_tx
532 .unbounded_send(Err(Error::InvalidPolicyInput(Box::new(TestPolicyResponder))))
533 .expect("Should be able to queue requests");
534
535 let signature = receptor.get_signature();
537 requests_tx
538 .unbounded_send(Ok(Job::new(job::work::Load::Independent(Workload::new(
539 test::Payload::Integer(RESULT),
540 signature,
541 )))))
542 .expect("Should be able to queue requests");
543
544 let _ = messenger.message(
545 Payload::Source(Rc::new(Mutex::new(Some(requests_rx.boxed_local())))).into(),
546 Audience::Messenger(manager_signature),
547 );
548
549 assert_matches!(receptor.next_of::<test::Payload>().await.expect("should have payload").0,
551 test::Payload::Integer(value) if value == RESULT);
552 }
553
554 struct WaitingWorkload {
555 rx: Receiver<()>,
556 execute_tx: Sender<()>,
557 }
558
559 impl WaitingWorkload {
560 fn new(rx: Receiver<()>, tx: Sender<()>) -> Self {
561 Self { rx, execute_tx: tx }
562 }
563 }
564
565 #[async_trait(?Send)]
568 impl job::work::Sequential for WaitingWorkload {
569 async fn execute(
570 self: Box<Self>,
571 _: message::Messenger,
572 _: job::data::StoreHandle,
573 _id: ftrace::Id,
574 ) -> Result<(), job::work::Error> {
575 self.execute_tx.send(()).expect("Should be able to signal start of execution");
576 let _ = self.rx.await;
577 Ok(())
578 }
579 }
580
581 #[fuchsia::test(allow_stalls = false)]
584 async fn test_manager_job_processing_multiple_sources() {
585 let message_hub_delegate = MessageHub::create_hub();
587
588 let manager_signature = Manager::spawn(&message_hub_delegate).await;
589
590 let messenger = message_hub_delegate
592 .create(MessengerType::Unbound)
593 .await
594 .expect("should create messenger")
595 .0;
596
597 let (_tx, rx) = oneshot::channel();
601 let (execute_tx, execute_rx) = oneshot::channel();
602 let (requests_tx, requests_rx) = mpsc::unbounded();
603 requests_tx
604 .unbounded_send(Ok(Job::new(job::work::Load::Sequential(
605 Box::new(WaitingWorkload::new(rx, execute_tx)),
606 job::Signature::new::<usize>(),
607 ))))
608 .expect("Should be able to send queue");
609 let _ = messenger.message(
610 Payload::Source(Rc::new(Mutex::new(Some(requests_rx.boxed_local())))).into(),
611 Audience::Messenger(manager_signature),
612 );
613
614 execute_rx.await.expect("Should have started hung execution");
616
617 let result = 1;
619 let mut receptor = message_hub_delegate
620 .create(MessengerType::Unbound)
621 .await
622 .expect("should create receptor")
623 .1;
624 let signature = receptor.get_signature();
625 let (requests_tx, requests_rx) = mpsc::unbounded();
626 requests_tx
627 .unbounded_send(Ok(Job::new(job::work::Load::Sequential(
628 Workload::new(test::Payload::Integer(result), signature),
629 job::Signature::new::<usize>(),
630 ))))
631 .expect("Should be able to send queue");
632
633 let _ = messenger.message(
634 Payload::Source(Rc::new(Mutex::new(Some(requests_rx.boxed_local())))).into(),
635 Audience::Messenger(manager_signature),
636 );
637
638 assert_matches!(receptor.next_of::<test::Payload>().await.expect("should have payload").0,
640 test::Payload::Integer(value) if value == result);
641 }
642
643 #[fuchsia::test(allow_stalls = false)]
646 async fn test_manager_cancels_jobs_on_stream_end() {
647 let message_hub_delegate = MessageHub::create_hub();
649
650 let manager_signature = Manager::spawn(&message_hub_delegate).await;
651
652 let messenger = message_hub_delegate
654 .create(MessengerType::Unbound)
655 .await
656 .expect("should create messenger")
657 .0;
658
659 let (_tx, rx) = oneshot::channel();
661 let (execute_tx, execute_rx) = oneshot::channel();
662 let (cancelation_tx, cancelation_rx) = oneshot::channel();
663 let (requests_tx, requests_rx) = mpsc::unbounded();
664 requests_tx
665 .unbounded_send(Ok(Job::new_with_cancellation(
666 job::work::Load::Sequential(
667 Box::new(WaitingWorkload::new(rx, execute_tx)),
668 job::Signature::new::<usize>(),
669 ),
670 cancelation_tx,
671 )))
672 .expect("Should be able to send queue");
673 let _ = messenger.message(
674 Payload::Source(Rc::new(Mutex::new(Some(requests_rx.boxed_local())))).into(),
675 Audience::Messenger(manager_signature),
676 );
677
678 execute_rx.await.expect("Should have started hung execution");
680
681 requests_tx.close_channel();
683
684 cancelation_rx.await.expect("Hanging is cancelled");
686 }
687}