settings/job/
source.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Job Source Support
6//!
7//! # Summary
8//!
9//! The source mod contains components for providing [Jobs](Job) to Job manager (most likely
10//! [manager](job::manager::Manager)). In a typical workflow, client code will create a [Seeder],
11//! which is used to send [Job](Job) streams to a manager via the
12//! [MessageHub](crate::message::message_hub::MessageHub). The [Seeder] can send any stream where
13//! the data implements [Into<Job>]. Once the source is received, the manager can assign a unique
14//! [Id] to the source, using [IdGenerator]. The manager can then use a [Handler] to handle the
15//!  produced [Jobs](Job) and their results.
16
17use crate::clock::now;
18use crate::job::execution::GroupError;
19use crate::job::{self, execution, Job, Payload, StoreHandleMapping};
20use crate::message::base::{Audience, MessengerType};
21use crate::service::message::{Delegate, Messenger, Signature};
22use crate::trace_guard;
23use core::pin::Pin;
24use futures::lock::Mutex;
25use futures::{Stream, StreamExt};
26use std::collections::{HashMap, VecDeque};
27use std::convert::Infallible;
28use std::rc::Rc;
29use thiserror::Error as ThisError;
30use {fuchsia_async as fasync, fuchsia_trace as ftrace};
31
32#[derive(Clone)]
33/// [Seeder] properly packages and sends [Job] source streams to a [Job] manager.
34pub struct Seeder {
35    /// A [Messenger](crate::message::messenger::MessengerClient) to send Payloads to the manager.
36    messenger: Messenger,
37    /// The [Signature](crate::message::base::Signature) of the manager to receive the source
38    /// Payloads.
39    manager_signature: Signature,
40}
41
42impl Seeder {
43    pub(crate) async fn new(delegate: &Delegate, manager_signature: Signature) -> Self {
44        Self {
45            messenger: delegate
46                .create(MessengerType::Unbound)
47                .await
48                .expect("should create messenger")
49                .0,
50            manager_signature,
51        }
52    }
53
54    pub(crate) fn seed<J, E, E2, T>(&self, source: T)
55    where
56        Job: TryFrom<J, Error = E2>,
57        Error: From<E> + From<E2>,
58        T: Stream<Item = Result<J, E>> + 'static,
59    {
60        // Convert the incoming stream into the expected types for a Job source.
61        let mapped_stream: Pin<Box<dyn Stream<Item = Result<Job, Error>>>> = source
62            .map(|result| {
63                result
64                    // First convert the error type from the result so we can be compatible
65                    // with conversions done with try_from below.
66                    .map_err(Error::from)
67                    // Then map the job. Ideally try_from will return `Error` directly, but we
68                    // also need to handle the `Infallible` type. It should compile to a no-op,
69                    // but the types still need to align.
70                    .and_then(|j| Job::try_from(j).map_err(Error::from))
71            })
72            .boxed_local();
73
74        // Send the source stream to the manager.
75        let _ = self.messenger.message(
76            Payload::Source(Rc::new(Mutex::new(Some(mapped_stream)))).into(),
77            Audience::Messenger(self.manager_signature),
78        );
79    }
80}
81
82/// The types of errors for [Jobs](Job). This is a single, unified set over all Job source
83/// related-errors. This enumeration should be expanded to capture any future error variant.
84#[derive(ThisError)]
85pub enum Error {
86    #[error("Unexpected error")]
87    Unexpected(fidl::Error),
88    #[error("Invalid input")]
89    InvalidInput(Box<dyn ErrorResponder>),
90    #[error("Invalid policy input")]
91    InvalidPolicyInput(Box<dyn PolicyErrorResponder>),
92    #[error("Unsupported API call")]
93    Unsupported,
94}
95
96impl std::fmt::Debug for Error {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        match self {
99            Error::Unexpected(_) => f.write_str("Unexpected"),
100            Error::InvalidInput(_) => f.write_str("InvalidInput(..)"),
101            Error::InvalidPolicyInput(_) => f.write_str("InvalidPolicyInput(..)"),
102            Error::Unsupported => f.write_str("Unsupported"),
103        }
104    }
105}
106
107/// Abstract over how to respond with a settings fidl error.
108pub trait ErrorResponder {
109    /// Unique identifier for the API this responder is responsible for.
110    fn id(&self) -> &'static str;
111
112    /// Respond with the supplied error. Returns any fidl errors that occur when
113    /// trying to send the response.
114    fn respond(self: Box<Self>, error: fidl_fuchsia_settings::Error) -> Result<(), fidl::Error>;
115}
116
117/// Abstract over how to respond with a settings policy fidl error.
118pub trait PolicyErrorResponder {
119    /// Unique identifier for the API this responder is responsible for.
120    fn id(&self) -> &'static str;
121
122    /// Respond with the supplied error. Returns any fidl errors that occur when
123    /// trying to send the response.
124    fn respond(
125        self: Box<Self>,
126        error: fidl_fuchsia_settings_policy::Error,
127    ) -> Result<(), fidl::Error>;
128}
129
130// This implementation is necessary when converting into a Job is infallible. This can happen if an
131// input to a job has no possible way to fail, or in tests when the streams a populated with Jobs
132// directly. This is used by the Seeder::seed fn above.
133impl From<Infallible> for Error {
134    fn from(_: Infallible) -> Self {
135        unreachable!()
136    }
137}
138
139impl From<fidl::Error> for Error {
140    fn from(error: fidl::Error) -> Self {
141        Error::Unexpected(error)
142    }
143}
144
145#[derive(Copy, Clone, Debug, PartialEq)]
146/// The current state of the source. This is used by the managing entity to understand how to handle
147/// pending and completed [Jobs](Job) from a source.
148pub(super) enum State {
149    /// The source is still available to produce new [Jobs](Job).
150    Active,
151    /// Completion has been requested, but [Jobs](Job) must complete before the source is considered
152    /// done.
153    PendingCompletion,
154    /// The source is no longer producing new [Jobs](Job).
155    Completed,
156}
157
158/// [Id] provides a unique identifier for a source within its parent space, most often a manager.
159#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
160pub struct Id {
161    _identifier: usize,
162}
163
164impl Id {
165    fn new(identifier: usize) -> Self {
166        Self { _identifier: identifier }
167    }
168}
169
170/// [IdGenerator] creates unique [Ids] to be associated with a source. This uniqueness is
171/// guaranteed for [Ids] generated by the same [IdGenerator].
172///
173/// [Ids]: Id
174pub(super) struct IdGenerator {
175    next_identifier: usize,
176}
177
178impl IdGenerator {
179    pub(super) fn new() -> Self {
180        Self { next_identifier: 0 }
181    }
182
183    pub(super) fn generate(&mut self) -> Id {
184        let return_id = Id::new(self.next_identifier);
185        self.next_identifier += 1;
186
187        return_id
188    }
189}
190
191/// [Handler] handles [Jobs](Job) within the scope of a single scope. It determines what [Job](Job)
192/// should be executed (if any). This responsibility includes managing any queueing that might be
193/// necessary based on the [Job] type.
194pub(super) struct Handler {
195    /// A [IdGenerator](job::IdGenerator) to assign unique ids to incoming jobs.
196    job_id_generator: job::IdGenerator,
197    /// A mapping from [execution types](execution::Type) to [groups](execution::Group). Each entry
198    /// enables tracking across [Jobs](Job) in the same group, such as storing persistent values.
199    /// The mapping is also consulted finding the next [Jobs](Job) to execute.
200    jobs: HashMap<execution::Type, execution::Group>,
201    /// A list of states. The element represents the most current [State]. We keep track of seen
202    /// states to allow post analysis, such as source duration.
203    states: VecDeque<(State, zx::MonotonicInstant)>,
204    /// This [HashMap] associates a given [Job] [Signature] with a [Data](job::data::Data) mapping.
205    /// [Signature] is used over [execution::Type] to allow storage to be shared across groups of
206    /// different [types](execution::Type) that share the same [Signature].
207    stores: StoreHandleMapping,
208}
209
210impl Handler {
211    pub(crate) fn new() -> Self {
212        let mut handler = Self {
213            job_id_generator: job::IdGenerator::new(),
214            jobs: HashMap::new(),
215            states: VecDeque::new(),
216            stores: HashMap::new(),
217        };
218
219        handler.set_state(State::Active);
220
221        handler
222    }
223
224    /// Marks the source as completed.
225    pub(crate) fn complete(&mut self) {
226        self.set_state(if self.is_active() { State::PendingCompletion } else { State::Completed });
227    }
228
229    /// Drops any job that has not yet been started and any watch jobs.
230    pub(crate) fn cancel(&mut self) {
231        for execution_group in self.jobs.values_mut() {
232            execution_group.cancel();
233        }
234        self.complete();
235    }
236
237    /// Returns whether the source has completed.
238    pub(crate) fn is_completed(&mut self) -> bool {
239        matches!(self.states.back(), Some(&(State::Completed, _)))
240    }
241
242    fn set_state(&mut self, state: State) {
243        // State should not be set after the source has been completed.
244        assert!(!self.is_completed());
245
246        // Do not try to set the state if it matches the last updated state.
247        if matches!(self.states.back(), Some(&(x,_)) if x == state) {
248            return;
249        }
250
251        self.states.push_back((state, now()));
252    }
253
254    /// Returns true if any job is executed, false otherwise.
255    pub(crate) async fn execute_next<F: FnOnce(job::Info) + 'static>(
256        &mut self,
257        delegate: &mut Delegate,
258        callback: F,
259        id: ftrace::Id,
260    ) -> bool {
261        for execution_group in self.jobs.values_mut() {
262            // If there are no jobs ready to become active, move to next group.
263            if let Some(job_info) = execution_group.promote_next_to_active() {
264                let guard = trace_guard!(id, c"prepare_execution");
265                let execution =
266                    job_info.prepare_execution(delegate, &mut self.stores, callback).await;
267                drop(guard);
268
269                fasync::Task::local(execution).detach();
270                return true;
271            }
272        }
273
274        false
275    }
276
277    /// Returns whether the source is active, defined as having at least one [Job] which is
278    /// currently active (running, not pending).
279    pub(crate) fn is_active(&self) -> bool {
280        self.jobs.iter().any(|(_, group)| group.is_active())
281    }
282
283    /// Adds a [Job] to be handled by this [Handler].
284    pub(crate) fn add_pending_job(&mut self, incoming_job: Job) -> Result<(), GroupError> {
285        let job_info = job::Info::new(self.job_id_generator.generate(), incoming_job);
286        let execution_type = *job_info.get_execution_type();
287
288        // Execution groups are based on matching execution::Type.
289        let execution_group = self
290            .jobs
291            .entry(execution_type)
292            .or_insert_with(move || execution::Group::new(execution_type));
293        execution_group.add(job_info)
294    }
295
296    /// Informs the [Handler] that a [Job] by the given [Id](job::Id) has completed.
297    pub(crate) fn handle_job_completion(&mut self, job: job::Info) {
298        self.jobs.get_mut(job.get_execution_type()).expect("group should be present").complete(job);
299
300        // When a source end is detected, the managing entity will try to complete the source. If
301        // there is active work, the source completion will be deferred. It is the source's
302        // responsibility after each subsequent completion to check whether completion can now
303        // proceed.
304        if matches!(self.states.back(), Some(&(State::PendingCompletion, _))) {
305            self.complete();
306        }
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313    use crate::service::{test, MessageHub};
314    use crate::tests::scaffold::workload::{Sequential, StubWorkload, Workload};
315    use rand::Rng;
316
317    use assert_matches::assert_matches;
318    use futures::FutureExt;
319
320    #[fuchsia::test]
321    fn test_id_generation() {
322        let mut generator = IdGenerator::new();
323        // Ensure generator is creating unique ids
324        assert!(generator.generate() != generator.generate());
325    }
326
327    #[fuchsia::test(allow_stalls = false)]
328    async fn test_seeding() {
329        // Create delegate for communication between components.
330        let message_hub_delegate = MessageHub::create_hub();
331
332        // Create a top-level receptor to receive sources.
333        let mut receptor = message_hub_delegate
334            .create(MessengerType::Unbound)
335            .await
336            .expect("should create receptor")
337            .1;
338
339        // Create seeder.
340        let seeder = Seeder::new(&message_hub_delegate, receptor.get_signature()).await;
341
342        let job_stream = async {
343            Ok(Job::new(job::work::Load::Independent(StubWorkload::new()))) as Result<Job, Error>
344        }
345        .into_stream();
346
347        seeder.seed(job_stream);
348
349        assert_matches!(receptor.next_of::<Payload>().await, Ok((Payload::Source(_), _)));
350    }
351
352    #[fuchsia::test(allow_stalls = false)]
353    async fn test_handling() {
354        // Create delegate for communication between components.
355        let mut message_hub_delegate = MessageHub::create_hub();
356
357        let results: Vec<i64> = (0..10).collect();
358
359        // Create a top-level receptor to receive job results from.
360        let mut receptor = message_hub_delegate
361            .create(MessengerType::Unbound)
362            .await
363            .expect("should create receptor")
364            .1;
365
366        let mut handler = Handler::new();
367
368        assert!(!handler.execute_next(&mut message_hub_delegate, |_| {}, 0.into()).await);
369
370        for result in &results {
371            let _ = handler.add_pending_job(Job::new(job::work::Load::Independent(Workload::new(
372                test::Payload::Integer(*result),
373                receptor.get_signature(),
374            ))));
375        }
376
377        for result in results {
378            let (execution_tx, mut execution_rx) = futures::channel::mpsc::unbounded::<job::Info>();
379
380            // Execute job.
381            assert!(
382                handler
383                    .execute_next(
384                        &mut message_hub_delegate,
385                        move |job| {
386                            execution_tx.unbounded_send(job).expect("send should succeed");
387                        },
388                        0.into()
389                    )
390                    .await
391            );
392
393            // Confirm received value matches the value sent from workload.
394            if let test::Payload::Integer(value) =
395                receptor.next_of::<test::Payload>().await.expect("should have payload").0
396            {
397                assert_eq!(value, result);
398            }
399
400            handler
401                .handle_job_completion(execution_rx.next().await.expect("should have gotten job"));
402        }
403    }
404
405    #[fuchsia::test(allow_stalls = false)]
406    async fn test_drop_pending() {
407        // Create delegate for communication between components.
408        let mut message_hub_delegate = MessageHub::create_hub();
409
410        let mut results: Vec<i64> = (0..10).collect();
411
412        // Create a top-level receptor to receive job results from.
413        let mut receptor = message_hub_delegate
414            .create(MessengerType::Unbound)
415            .await
416            .expect("should create receptor")
417            .1;
418
419        let mut handler = Handler::new();
420
421        assert!(!handler.execute_next(&mut message_hub_delegate, |_| {}, 0.into()).await);
422
423        for result in &results {
424            let _ = handler.add_pending_job(Job::new(job::work::Load::Independent(Workload::new(
425                test::Payload::Integer(*result),
426                receptor.get_signature(),
427            ))));
428        }
429
430        let result = results.remove(0);
431        let (execution_tx, mut execution_rx) = futures::channel::mpsc::unbounded::<job::Info>();
432
433        // Execute job concurrently.
434        assert!(
435            handler
436                .execute_next(
437                    &mut message_hub_delegate,
438                    move |job| {
439                        execution_tx.unbounded_send(job).expect("send should succeed");
440                    },
441                    0.into(),
442                )
443                .await
444        );
445
446        handler.cancel();
447
448        // Confirm received value matches the value sent from workload.
449        if let test::Payload::Integer(value) =
450            receptor.next_of::<test::Payload>().await.expect("should have payload").0
451        {
452            assert_eq!(value, result);
453        }
454
455        handler.handle_job_completion(execution_rx.next().await.expect("should have gotten job"));
456
457        // Validate there are no more jobs to execute.
458        let (execution_tx, _execution_rx) = futures::channel::mpsc::unbounded::<job::Info>();
459        assert!(
460            !handler
461                .execute_next(
462                    &mut message_hub_delegate,
463                    move |job| {
464                        execution_tx.unbounded_send(job).expect("send should succeed");
465                    },
466                    0.into(),
467                )
468                .await
469        );
470    }
471
472    // Ensures that proper queueing happens amongst Jobs within Execution Groups.
473    #[fuchsia::test(allow_stalls = false)]
474    async fn test_execution_order() {
475        let (execution_tx, mut execution_rx) = futures::channel::mpsc::unbounded::<job::Info>();
476
477        // Create delegate for communication between components.
478        let mut message_hub_delegate = MessageHub::create_hub();
479
480        let mut handler = Handler::new();
481
482        // Create a top-level receptor to receive job results from.
483        let mut receptor = message_hub_delegate
484            .create(MessengerType::Unbound)
485            .await
486            .expect("should create receptor")
487            .1;
488
489        // Create 2 jobs of the same sequential type.
490        let results: Vec<i64> = (0..=1).collect();
491
492        assert!(!handler.execute_next(&mut message_hub_delegate, |_| {}, 0.into()).await);
493
494        for result in &results {
495            let _ = handler.add_pending_job(Job::new(job::work::Load::Sequential(
496                Workload::new(test::Payload::Integer(*result), receptor.get_signature()),
497                job::Signature::new::<usize>(),
498            )));
499        }
500
501        // Execute first job, ensuring handler has a job to execute.
502        {
503            let execution_tx = execution_tx.clone();
504            assert!(
505                handler
506                    .execute_next(
507                        &mut message_hub_delegate,
508                        move |job| {
509                            execution_tx.unbounded_send(job).expect("send should succeed");
510                        },
511                        0.into()
512                    )
513                    .await
514            );
515        }
516
517        // Verify we receive result back for the first job.
518        assert_eq!(
519            test::Payload::Integer(0),
520            receptor.next_of::<test::Payload>().await.expect("should have payload").0
521        );
522
523        // Capture first completed job, do not handle yet.
524        let first_job_info = execution_rx.next().await.expect("should have gotten job");
525
526        // Ensure no job is ready to execute.
527        assert!(!handler.execute_next(&mut message_hub_delegate, move |_| {}, 0.into()).await);
528
529        // Add an independent job.
530        let _ =
531            handler.add_pending_job(Job::new(job::work::Load::Independent(StubWorkload::new())));
532
533        // Execute independent job.
534        {
535            let execution_tx = execution_tx.clone();
536            // Execute next job and ensure that the response max
537            assert!(
538                handler
539                    .execute_next(
540                        &mut message_hub_delegate,
541                        move |job| {
542                            execution_tx.unbounded_send(job).expect("send should succeed");
543                        },
544                        0.into()
545                    )
546                    .await
547            );
548        }
549
550        let independent_job_info = execution_rx.next().await.expect("should have gotten job");
551        assert_matches!(*independent_job_info.get_execution_type(), execution::Type::Independent);
552
553        // Handle independent job completion.
554        handler.handle_job_completion(independent_job_info);
555
556        // Handle first job completion.
557        handler.handle_job_completion(first_job_info);
558
559        {
560            let execution_tx = execution_tx.clone();
561            // Execute next job. Assert job is ready to execute
562            assert!(
563                handler
564                    .execute_next(
565                        &mut message_hub_delegate,
566                        move |job| {
567                            execution_tx.unbounded_send(job).expect("send should succeed");
568                        },
569                        0.into()
570                    )
571                    .await
572            );
573        }
574
575        // Verify we receive result from the second job back.
576        assert_eq!(
577            test::Payload::Integer(1),
578            receptor.next_of::<test::Payload>().await.expect("should have payload").0
579        );
580    }
581
582    // Ensures that proper queueing happens amongst Jobs within Execution Groups.
583    #[fuchsia::test(allow_stalls = false)]
584    async fn test_data() {
585        let mut rng = rand::thread_rng();
586
587        let (result_tx, mut result_rx) = futures::channel::mpsc::unbounded::<usize>();
588
589        // Create delegate for communication between components.
590        let mut message_hub_delegate = MessageHub::create_hub();
591
592        let mut handler = Handler::new();
593
594        let data_key = job::data::Key::TestInteger(rng.gen());
595        let initial_value = rng.gen_range(0..9);
596        let signature = job::Signature::new::<usize>();
597
598        // Each result is the square of the previous result,
599        let results: Vec<usize> = (0..5)
600            .map(move |val| {
601                let mut return_value: usize = initial_value;
602
603                for _ in 0..val {
604                    return_value = return_value.pow(2);
605                }
606
607                return_value
608            })
609            .collect();
610
611        for _ in &results {
612            let data_key = data_key.clone();
613            let result_tx = result_tx.clone();
614
615            // Add a job that writes the initial value and reads it back.
616            let _ = handler.add_pending_job(Job::new(job::work::Load::Sequential(
617                Sequential::boxed(move |_, store| {
618                    let result_tx = result_tx.clone();
619                    let data_key = data_key.clone();
620
621                    Box::pin(async move {
622                        let mut storage_lock = store.lock().await;
623                        let new_value = if let Some(job::data::Data::TestData(value)) =
624                            storage_lock.get(&data_key)
625                        {
626                            value.pow(2)
627                        } else {
628                            initial_value
629                        };
630
631                        // Store value.
632                        let _ = storage_lock.insert(data_key, job::data::Data::TestData(new_value));
633
634                        // Relay value back.
635                        result_tx.unbounded_send(new_value).expect("should send");
636                    })
637                }),
638                signature,
639            )));
640        }
641
642        for value in results {
643            let (completion_tx, mut completion_rx) =
644                futures::channel::mpsc::unbounded::<job::Info>();
645
646            // Execute next job.
647            assert!(
648                handler
649                    .execute_next(
650                        &mut message_hub_delegate,
651                        move |job| {
652                            completion_tx.unbounded_send(job).expect("should send job");
653                        },
654                        0.into()
655                    )
656                    .await
657            );
658
659            // Ensure the returned value matches the calculation
660            assert_eq!(value, result_rx.next().await.expect("value should be returned"));
661            handler.handle_job_completion(completion_rx.next().await.expect("should receive job"));
662        }
663    }
664}