1use crate::clock::now;
18use crate::job::execution::GroupError;
19use crate::job::{self, execution, Job, Payload, StoreHandleMapping};
20use crate::message::base::{Audience, MessengerType};
21use crate::service::message::{Delegate, Messenger, Signature};
22use crate::trace_guard;
23use core::pin::Pin;
24use futures::lock::Mutex;
25use futures::{Stream, StreamExt};
26use std::collections::{HashMap, VecDeque};
27use std::convert::Infallible;
28use std::rc::Rc;
29use thiserror::Error as ThisError;
30use {fuchsia_async as fasync, fuchsia_trace as ftrace};
31
32#[derive(Clone)]
33pub struct Seeder {
35 messenger: Messenger,
37 manager_signature: Signature,
40}
41
42impl Seeder {
43 pub(crate) async fn new(delegate: &Delegate, manager_signature: Signature) -> Self {
44 Self {
45 messenger: delegate
46 .create(MessengerType::Unbound)
47 .await
48 .expect("should create messenger")
49 .0,
50 manager_signature,
51 }
52 }
53
54 pub(crate) fn seed<J, E, E2, T>(&self, source: T)
55 where
56 Job: TryFrom<J, Error = E2>,
57 Error: From<E> + From<E2>,
58 T: Stream<Item = Result<J, E>> + 'static,
59 {
60 let mapped_stream: Pin<Box<dyn Stream<Item = Result<Job, Error>>>> = source
62 .map(|result| {
63 result
64 .map_err(Error::from)
67 .and_then(|j| Job::try_from(j).map_err(Error::from))
71 })
72 .boxed_local();
73
74 let _ = self.messenger.message(
76 Payload::Source(Rc::new(Mutex::new(Some(mapped_stream)))).into(),
77 Audience::Messenger(self.manager_signature),
78 );
79 }
80}
81
82#[derive(ThisError)]
85pub enum Error {
86 #[error("Unexpected error")]
87 Unexpected(fidl::Error),
88 #[error("Invalid input")]
89 InvalidInput(Box<dyn ErrorResponder>),
90 #[error("Invalid policy input")]
91 InvalidPolicyInput(Box<dyn PolicyErrorResponder>),
92 #[error("Unsupported API call")]
93 Unsupported,
94}
95
96impl std::fmt::Debug for Error {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 match self {
99 Error::Unexpected(_) => f.write_str("Unexpected"),
100 Error::InvalidInput(_) => f.write_str("InvalidInput(..)"),
101 Error::InvalidPolicyInput(_) => f.write_str("InvalidPolicyInput(..)"),
102 Error::Unsupported => f.write_str("Unsupported"),
103 }
104 }
105}
106
107pub trait ErrorResponder {
109 fn id(&self) -> &'static str;
111
112 fn respond(self: Box<Self>, error: fidl_fuchsia_settings::Error) -> Result<(), fidl::Error>;
115}
116
117pub trait PolicyErrorResponder {
119 fn id(&self) -> &'static str;
121
122 fn respond(
125 self: Box<Self>,
126 error: fidl_fuchsia_settings_policy::Error,
127 ) -> Result<(), fidl::Error>;
128}
129
130impl From<Infallible> for Error {
134 fn from(_: Infallible) -> Self {
135 unreachable!()
136 }
137}
138
139impl From<fidl::Error> for Error {
140 fn from(error: fidl::Error) -> Self {
141 Error::Unexpected(error)
142 }
143}
144
145#[derive(Copy, Clone, Debug, PartialEq)]
146pub(super) enum State {
149 Active,
151 PendingCompletion,
154 Completed,
156}
157
158#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
160pub struct Id {
161 _identifier: usize,
162}
163
164impl Id {
165 fn new(identifier: usize) -> Self {
166 Self { _identifier: identifier }
167 }
168}
169
170pub(super) struct IdGenerator {
175 next_identifier: usize,
176}
177
178impl IdGenerator {
179 pub(super) fn new() -> Self {
180 Self { next_identifier: 0 }
181 }
182
183 pub(super) fn generate(&mut self) -> Id {
184 let return_id = Id::new(self.next_identifier);
185 self.next_identifier += 1;
186
187 return_id
188 }
189}
190
191pub(super) struct Handler {
195 job_id_generator: job::IdGenerator,
197 jobs: HashMap<execution::Type, execution::Group>,
201 states: VecDeque<(State, zx::MonotonicInstant)>,
204 stores: StoreHandleMapping,
208}
209
210impl Handler {
211 pub(crate) fn new() -> Self {
212 let mut handler = Self {
213 job_id_generator: job::IdGenerator::new(),
214 jobs: HashMap::new(),
215 states: VecDeque::new(),
216 stores: HashMap::new(),
217 };
218
219 handler.set_state(State::Active);
220
221 handler
222 }
223
224 pub(crate) fn complete(&mut self) {
226 self.set_state(if self.is_active() { State::PendingCompletion } else { State::Completed });
227 }
228
229 pub(crate) fn cancel(&mut self) {
231 for execution_group in self.jobs.values_mut() {
232 execution_group.cancel();
233 }
234 self.complete();
235 }
236
237 pub(crate) fn is_completed(&mut self) -> bool {
239 matches!(self.states.back(), Some(&(State::Completed, _)))
240 }
241
242 fn set_state(&mut self, state: State) {
243 assert!(!self.is_completed());
245
246 if matches!(self.states.back(), Some(&(x,_)) if x == state) {
248 return;
249 }
250
251 self.states.push_back((state, now()));
252 }
253
254 pub(crate) async fn execute_next<F: FnOnce(job::Info) + 'static>(
256 &mut self,
257 delegate: &mut Delegate,
258 callback: F,
259 id: ftrace::Id,
260 ) -> bool {
261 for execution_group in self.jobs.values_mut() {
262 if let Some(job_info) = execution_group.promote_next_to_active() {
264 let guard = trace_guard!(id, c"prepare_execution");
265 let execution =
266 job_info.prepare_execution(delegate, &mut self.stores, callback).await;
267 drop(guard);
268
269 fasync::Task::local(execution).detach();
270 return true;
271 }
272 }
273
274 false
275 }
276
277 pub(crate) fn is_active(&self) -> bool {
280 self.jobs.iter().any(|(_, group)| group.is_active())
281 }
282
283 pub(crate) fn add_pending_job(&mut self, incoming_job: Job) -> Result<(), GroupError> {
285 let job_info = job::Info::new(self.job_id_generator.generate(), incoming_job);
286 let execution_type = *job_info.get_execution_type();
287
288 let execution_group = self
290 .jobs
291 .entry(execution_type)
292 .or_insert_with(move || execution::Group::new(execution_type));
293 execution_group.add(job_info)
294 }
295
296 pub(crate) fn handle_job_completion(&mut self, job: job::Info) {
298 self.jobs.get_mut(job.get_execution_type()).expect("group should be present").complete(job);
299
300 if matches!(self.states.back(), Some(&(State::PendingCompletion, _))) {
305 self.complete();
306 }
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313 use crate::service::{test, MessageHub};
314 use crate::tests::scaffold::workload::{Sequential, StubWorkload, Workload};
315 use rand::Rng;
316
317 use assert_matches::assert_matches;
318 use futures::FutureExt;
319
320 #[fuchsia::test]
321 fn test_id_generation() {
322 let mut generator = IdGenerator::new();
323 assert!(generator.generate() != generator.generate());
325 }
326
327 #[fuchsia::test(allow_stalls = false)]
328 async fn test_seeding() {
329 let message_hub_delegate = MessageHub::create_hub();
331
332 let mut receptor = message_hub_delegate
334 .create(MessengerType::Unbound)
335 .await
336 .expect("should create receptor")
337 .1;
338
339 let seeder = Seeder::new(&message_hub_delegate, receptor.get_signature()).await;
341
342 let job_stream = async {
343 Ok(Job::new(job::work::Load::Independent(StubWorkload::new()))) as Result<Job, Error>
344 }
345 .into_stream();
346
347 seeder.seed(job_stream);
348
349 assert_matches!(receptor.next_of::<Payload>().await, Ok((Payload::Source(_), _)));
350 }
351
352 #[fuchsia::test(allow_stalls = false)]
353 async fn test_handling() {
354 let mut message_hub_delegate = MessageHub::create_hub();
356
357 let results: Vec<i64> = (0..10).collect();
358
359 let mut receptor = message_hub_delegate
361 .create(MessengerType::Unbound)
362 .await
363 .expect("should create receptor")
364 .1;
365
366 let mut handler = Handler::new();
367
368 assert!(!handler.execute_next(&mut message_hub_delegate, |_| {}, 0.into()).await);
369
370 for result in &results {
371 let _ = handler.add_pending_job(Job::new(job::work::Load::Independent(Workload::new(
372 test::Payload::Integer(*result),
373 receptor.get_signature(),
374 ))));
375 }
376
377 for result in results {
378 let (execution_tx, mut execution_rx) = futures::channel::mpsc::unbounded::<job::Info>();
379
380 assert!(
382 handler
383 .execute_next(
384 &mut message_hub_delegate,
385 move |job| {
386 execution_tx.unbounded_send(job).expect("send should succeed");
387 },
388 0.into()
389 )
390 .await
391 );
392
393 if let test::Payload::Integer(value) =
395 receptor.next_of::<test::Payload>().await.expect("should have payload").0
396 {
397 assert_eq!(value, result);
398 }
399
400 handler
401 .handle_job_completion(execution_rx.next().await.expect("should have gotten job"));
402 }
403 }
404
405 #[fuchsia::test(allow_stalls = false)]
406 async fn test_drop_pending() {
407 let mut message_hub_delegate = MessageHub::create_hub();
409
410 let mut results: Vec<i64> = (0..10).collect();
411
412 let mut receptor = message_hub_delegate
414 .create(MessengerType::Unbound)
415 .await
416 .expect("should create receptor")
417 .1;
418
419 let mut handler = Handler::new();
420
421 assert!(!handler.execute_next(&mut message_hub_delegate, |_| {}, 0.into()).await);
422
423 for result in &results {
424 let _ = handler.add_pending_job(Job::new(job::work::Load::Independent(Workload::new(
425 test::Payload::Integer(*result),
426 receptor.get_signature(),
427 ))));
428 }
429
430 let result = results.remove(0);
431 let (execution_tx, mut execution_rx) = futures::channel::mpsc::unbounded::<job::Info>();
432
433 assert!(
435 handler
436 .execute_next(
437 &mut message_hub_delegate,
438 move |job| {
439 execution_tx.unbounded_send(job).expect("send should succeed");
440 },
441 0.into(),
442 )
443 .await
444 );
445
446 handler.cancel();
447
448 if let test::Payload::Integer(value) =
450 receptor.next_of::<test::Payload>().await.expect("should have payload").0
451 {
452 assert_eq!(value, result);
453 }
454
455 handler.handle_job_completion(execution_rx.next().await.expect("should have gotten job"));
456
457 let (execution_tx, _execution_rx) = futures::channel::mpsc::unbounded::<job::Info>();
459 assert!(
460 !handler
461 .execute_next(
462 &mut message_hub_delegate,
463 move |job| {
464 execution_tx.unbounded_send(job).expect("send should succeed");
465 },
466 0.into(),
467 )
468 .await
469 );
470 }
471
472 #[fuchsia::test(allow_stalls = false)]
474 async fn test_execution_order() {
475 let (execution_tx, mut execution_rx) = futures::channel::mpsc::unbounded::<job::Info>();
476
477 let mut message_hub_delegate = MessageHub::create_hub();
479
480 let mut handler = Handler::new();
481
482 let mut receptor = message_hub_delegate
484 .create(MessengerType::Unbound)
485 .await
486 .expect("should create receptor")
487 .1;
488
489 let results: Vec<i64> = (0..=1).collect();
491
492 assert!(!handler.execute_next(&mut message_hub_delegate, |_| {}, 0.into()).await);
493
494 for result in &results {
495 let _ = handler.add_pending_job(Job::new(job::work::Load::Sequential(
496 Workload::new(test::Payload::Integer(*result), receptor.get_signature()),
497 job::Signature::new::<usize>(),
498 )));
499 }
500
501 {
503 let execution_tx = execution_tx.clone();
504 assert!(
505 handler
506 .execute_next(
507 &mut message_hub_delegate,
508 move |job| {
509 execution_tx.unbounded_send(job).expect("send should succeed");
510 },
511 0.into()
512 )
513 .await
514 );
515 }
516
517 assert_eq!(
519 test::Payload::Integer(0),
520 receptor.next_of::<test::Payload>().await.expect("should have payload").0
521 );
522
523 let first_job_info = execution_rx.next().await.expect("should have gotten job");
525
526 assert!(!handler.execute_next(&mut message_hub_delegate, move |_| {}, 0.into()).await);
528
529 let _ =
531 handler.add_pending_job(Job::new(job::work::Load::Independent(StubWorkload::new())));
532
533 {
535 let execution_tx = execution_tx.clone();
536 assert!(
538 handler
539 .execute_next(
540 &mut message_hub_delegate,
541 move |job| {
542 execution_tx.unbounded_send(job).expect("send should succeed");
543 },
544 0.into()
545 )
546 .await
547 );
548 }
549
550 let independent_job_info = execution_rx.next().await.expect("should have gotten job");
551 assert_matches!(*independent_job_info.get_execution_type(), execution::Type::Independent);
552
553 handler.handle_job_completion(independent_job_info);
555
556 handler.handle_job_completion(first_job_info);
558
559 {
560 let execution_tx = execution_tx.clone();
561 assert!(
563 handler
564 .execute_next(
565 &mut message_hub_delegate,
566 move |job| {
567 execution_tx.unbounded_send(job).expect("send should succeed");
568 },
569 0.into()
570 )
571 .await
572 );
573 }
574
575 assert_eq!(
577 test::Payload::Integer(1),
578 receptor.next_of::<test::Payload>().await.expect("should have payload").0
579 );
580 }
581
582 #[fuchsia::test(allow_stalls = false)]
584 async fn test_data() {
585 let mut rng = rand::thread_rng();
586
587 let (result_tx, mut result_rx) = futures::channel::mpsc::unbounded::<usize>();
588
589 let mut message_hub_delegate = MessageHub::create_hub();
591
592 let mut handler = Handler::new();
593
594 let data_key = job::data::Key::TestInteger(rng.gen());
595 let initial_value = rng.gen_range(0..9);
596 let signature = job::Signature::new::<usize>();
597
598 let results: Vec<usize> = (0..5)
600 .map(move |val| {
601 let mut return_value: usize = initial_value;
602
603 for _ in 0..val {
604 return_value = return_value.pow(2);
605 }
606
607 return_value
608 })
609 .collect();
610
611 for _ in &results {
612 let data_key = data_key.clone();
613 let result_tx = result_tx.clone();
614
615 let _ = handler.add_pending_job(Job::new(job::work::Load::Sequential(
617 Sequential::boxed(move |_, store| {
618 let result_tx = result_tx.clone();
619 let data_key = data_key.clone();
620
621 Box::pin(async move {
622 let mut storage_lock = store.lock().await;
623 let new_value = if let Some(job::data::Data::TestData(value)) =
624 storage_lock.get(&data_key)
625 {
626 value.pow(2)
627 } else {
628 initial_value
629 };
630
631 let _ = storage_lock.insert(data_key, job::data::Data::TestData(new_value));
633
634 result_tx.unbounded_send(new_value).expect("should send");
636 })
637 }),
638 signature,
639 )));
640 }
641
642 for value in results {
643 let (completion_tx, mut completion_rx) =
644 futures::channel::mpsc::unbounded::<job::Info>();
645
646 assert!(
648 handler
649 .execute_next(
650 &mut message_hub_delegate,
651 move |job| {
652 completion_tx.unbounded_send(job).expect("should send job");
653 },
654 0.into()
655 )
656 .await
657 );
658
659 assert_eq!(value, result_rx.next().await.expect("value should be returned"));
661 handler.handle_job_completion(completion_rx.next().await.expect("should receive job"));
662 }
663 }
664}