settings/
job.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Job Handling Support
6//!
7//! # Summary
8//!
9//! [Jobs] are basic units of work that interfaces in the setting service can specify. In addition
10//! to the workload, [Job] definitions also capture information about how the work should be
11//! handled. For example, a Job can specify that it would like to run in sequence within a set of
12//! similar [Jobs]. This behavior is captured by the [Job]'s execution [Type](execution::Type).
13//!
14//! Sources are streams that provide jobs from a given source. The lifetime of [Jobs] produced
15//! by a source are bound to the source's lifetime. The end of a source stream will lead to any
16//! in-flight and pending jobs being cancelled.
17//!
18//! Job Manager is responsible for managing sources and the jobs they produce. The manager
19//! associates and maintains any supporting data for jobs, such as caches.
20//!
21//! [Jobs]: Job
22use crate::service::message;
23use crate::{payload_convert, trace};
24
25use core::fmt::{Debug, Formatter};
26use core::pin::Pin;
27use futures::channel::oneshot;
28use futures::lock::Mutex;
29use futures::stream::Stream;
30use std::any::TypeId;
31use std::collections::HashMap;
32use std::future::Future;
33use std::rc::Rc;
34
35pub mod manager;
36pub mod source;
37
38payload_convert!(Job, Payload);
39
40/// [StoreHandleMapping] represents the mapping from a [Job]'s [Signature] to the [data::Data]
41/// store. This store is shared by all [Jobs] with the same [Signature].
42///
43/// [Jobs]: Job
44pub(super) type StoreHandleMapping = HashMap<Signature, data::StoreHandle>;
45type PinStream<T> = Pin<Box<dyn Stream<Item = T>>>;
46type SourceStreamHandle = Rc<Mutex<Option<PinStream<Result<Job, source::Error>>>>>;
47
48/// The data payload that can be sent to the [Job Manager](crate::job::manager::Manager).
49#[derive(Clone)]
50pub enum Payload {
51    /// `Source` represents a new source of [Jobs](Job).
52    Source(SourceStreamHandle),
53}
54
55impl Debug for Payload {
56    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
57        write!(f, "Job Payload")
58    }
59}
60
61impl PartialEq for Payload {
62    fn eq(&self, _other: &Self) -> bool {
63        false
64    }
65}
66
67pub mod data {
68    //! The data mod provides the components for interacting with information that can be stored and
69    //! retrieved by a [Job](super::Job) during its execution. [Keys](Key) provide a way to address
70    //! this data while [Data] defines the type of data that can be stored per entry.
71
72    use crate::base::SettingInfo;
73    use futures::lock::Mutex;
74    use std::collections::HashMap;
75    use std::rc::Rc;
76
77    /// A shared handle to the [Data] mapping.
78    pub type StoreHandle = Rc<Mutex<HashMap<Key, Data>>>;
79
80    #[derive(Clone, PartialEq, Eq, Hash)]
81    pub enum Key {
82        Identifier(&'static str),
83        #[cfg(test)]
84        TestInteger(usize),
85    }
86
87    #[derive(Clone, PartialEq)]
88    pub enum Data {
89        SettingInfo(SettingInfo),
90        #[cfg(test)]
91        TestData(usize),
92    }
93}
94
95pub mod work {
96    use super::{data, Signature};
97    use crate::service::message;
98    use async_trait::async_trait;
99    use fuchsia_trace as ftrace;
100
101    pub enum Load {
102        /// [Sequential] loads are run in order after [Loads](Load) from [Jobs](crate::job::Job)
103        /// of the same [Signature](crate::job::Signature) that preceded them. These [Loads](Load)
104        /// share a common data store, which can be used to share information.
105        Sequential(Box<dyn Sequential>, Signature),
106        /// [Independent] loads are run as soon as there is availability to run as dictated by the
107        /// containing [Job's](crate::job::Job) handler.
108        Independent(Box<dyn Independent>),
109    }
110
111    /// Possible error conditions that can be encountered during work execution.
112    pub enum Error {
113        /// The work was canceled.
114        Canceled,
115    }
116
117    impl Load {
118        /// Executes the contained workload, providing the individualized parameters based on type.
119        /// This function is asynchronous and is meant to be waited upon for workload completion.
120        /// Workloads are expected to wait and complete all work within the scope of this execution.
121        /// Therefore, invoking code can safely presume all work has completed after this function
122        /// returns.
123        pub(super) async fn execute(
124            self,
125            messenger: message::Messenger,
126            store: Option<data::StoreHandle>,
127            id: ftrace::Id,
128        ) -> Result<(), Error> {
129            match self {
130                Load::Sequential(load, _) => {
131                    load.execute(
132                        messenger,
133                        store.expect("all sequential loads should have store"),
134                        id,
135                    )
136                    .await
137                }
138                Load::Independent(load) => {
139                    load.execute(messenger, id).await;
140                    Ok(())
141                }
142            }
143        }
144    }
145
146    impl std::fmt::Debug for Load {
147        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148            match self {
149                Load::Sequential(_, _) => f.debug_tuple("Sequential"),
150                Load::Independent(_) => f.debug_tuple("Independent"),
151            }
152            .field(&"..")
153            .finish()
154        }
155    }
156
157    #[async_trait(?Send)]
158    pub trait Sequential {
159        /// Called when the [Job](super::Job) processing is ready for the encapsulated
160        /// [work::Load](super::work::Load) be executed. The provided [StoreHandle](data::StoreHandle)
161        /// is specific to the parent [Job](super::Job) group.
162        async fn execute(
163            self: Box<Self>,
164            messenger: message::Messenger,
165            store: data::StoreHandle,
166            id: ftrace::Id,
167        ) -> Result<(), Error>;
168    }
169
170    #[async_trait(?Send)]
171    pub trait Independent {
172        /// Called when a [work::Load](super::work::Load) should run. All workload specific logic should
173        /// be encompassed in this method.
174        async fn execute(self: Box<Self>, messenger: message::Messenger, id: ftrace::Id);
175    }
176}
177
178#[derive(PartialEq, Eq, Copy, Clone, Debug, Hash)]
179enum Either<A, B> {
180    A(A),
181    B(B),
182}
183
184/// An identifier specified by [Jobs](Job) to group related workflows. This is useful for
185/// [work::Loads](work::Load) that need to be run sequentially. The [Signature] is used by the job
186/// infrastructure to associate resources such as caches.
187#[derive(PartialEq, Copy, Clone, Debug, Eq, Hash)]
188pub struct Signature {
189    key: Either<TypeId, (TypeId, u64)>,
190}
191
192impl Signature {
193    /// Constructs a new [Signature]. The key provided will group the associated [Job] with other
194    /// [Jobs](Job) of the same key. The association is scoped to other [Jobs](Job) in the same
195    /// parent source.
196    pub(crate) fn new<T>() -> Self
197    where
198        T: 'static + ?Sized,
199    {
200        Self { key: Either::A(TypeId::of::<T>()) }
201    }
202
203    pub(crate) fn with<T>(key: u64) -> Self
204    where
205        T: 'static + ?Sized,
206    {
207        Self { key: Either::B((TypeId::of::<T>(), key)) }
208    }
209}
210
211/// A [Job] is a simple data container that associates a [work::Load] with an [execution::Type]
212/// along with metadata, such as the creation time.
213#[derive(Debug)]
214pub struct Job {
215    /// The [work::Load] to be run.
216    workload: work::Load,
217    /// The [execution::Type] determining how the [work::Load] will be run.
218    execution_type: execution::Type,
219    /// The trigger that can be used to cancel this job. Not all jobs are cancelable.
220    cancelation_tx: Option<oneshot::Sender<()>>,
221}
222
223impl Job {
224    pub(crate) fn new(workload: work::Load) -> Self {
225        let execution_type = match &workload {
226            work::Load::Sequential(_, signature) => execution::Type::Sequential(*signature),
227            _ => execution::Type::Independent,
228        };
229
230        Self { workload, execution_type, cancelation_tx: None }
231    }
232
233    pub(crate) fn new_with_cancellation(
234        workload: work::Load,
235        cancelation_tx: oneshot::Sender<()>,
236    ) -> Self {
237        let execution_type = match &workload {
238            work::Load::Sequential(_, signature) => execution::Type::Sequential(*signature),
239            _ => execution::Type::Independent,
240        };
241
242        Self { workload, execution_type, cancelation_tx: Some(cancelation_tx) }
243    }
244
245    #[cfg(test)]
246    pub(crate) fn workload(&self) -> &work::Load {
247        &self.workload
248    }
249
250    #[cfg(test)]
251    pub(crate) fn execution_type(&self) -> execution::Type {
252        self.execution_type
253    }
254}
255
256/// [Id] provides a unique identifier for a job within its parent space. Unlike
257/// [Signatures](Signature), All [Job Ids](Id) will be unique per [Job]. [Ids](Id) should never be
258/// directly constructed. An [IdGenerator] should be used instead.
259#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
260pub(super) struct Id {
261    _identifier: usize,
262}
263
264impl Id {
265    fn new(identifier: usize) -> Self {
266        Self { _identifier: identifier }
267    }
268}
269
270/// [`IdGenerator`] is responsible for generating unique [Ids](Id).
271pub(super) struct IdGenerator {
272    next_identifier: usize,
273}
274
275impl IdGenerator {
276    pub(super) fn new() -> Self {
277        Self { next_identifier: 0 }
278    }
279
280    /// Produces a [`Id`] that is unique from any [`Id`] that has or will be generated by this
281    /// [`IdGenerator`] instance.
282    pub(super) fn generate(&mut self) -> Id {
283        let return_id = Id::new(self.next_identifier);
284        self.next_identifier += 1;
285
286        return_id
287    }
288}
289
290/// An enumeration of stages a [Job] can be in.
291enum State {
292    /// The workload associated with the [Job] has not been executed yet.
293    Ready(Job),
294    /// The workload is executing.
295    Executing,
296    /// The workload execution has completed.
297    Executed,
298    /// The workload was canceled before it completed.
299    Canceled,
300}
301
302/// [Info] is used to capture details about a [Job] once it has been accepted by an entity that will
303/// process it. This includes an assigned [Id] and a recording at what time it was accepted.
304struct Info {
305    id: Id,
306    state: State,
307    execution_type: execution::Type,
308    cancelation_tx: Option<oneshot::Sender<()>>,
309}
310
311impl Info {
312    fn new(id: Id, mut job: Job) -> Self {
313        let execution_type = job.execution_type;
314        let cancelation_tx = job.cancelation_tx.take();
315        Self { id, state: State::Ready(job), execution_type, cancelation_tx }
316    }
317
318    /// Retrieves the [execution::Type] of the underlying [Job].
319    fn get_execution_type(&self) -> &execution::Type {
320        &self.execution_type
321    }
322
323    /// Prepares the components necessary for a [Job] to execute and then returns a future to
324    /// execute the [Job] workload with them. These components include a messenger for communicating
325    /// with the system and the store associated with the [Job's](Job) group if applicable.
326    async fn prepare_execution<F: FnOnce(Self)>(
327        mut self,
328        delegate: &mut message::Delegate,
329        stores: &mut StoreHandleMapping,
330        callback: F,
331    ) -> impl Future<Output = ()> {
332        // Create a messenger for the workload to communicate with the rest of the setting
333        // service.
334        let messenger = delegate
335            .create(message::MessengerType::Unbound)
336            .await
337            .expect("messenger should be available")
338            .0;
339
340        let store = self
341            .execution_type
342            .get_signature()
343            .map(|signature| stores.entry(*signature).or_default().clone());
344
345        async move {
346            let id = fuchsia_trace::Id::new();
347            trace!(id, c"job execution");
348            let mut state = State::Executing;
349            std::mem::swap(&mut state, &mut self.state);
350
351            if let State::Ready(job) = state {
352                self.state = if let Err(work::Error::Canceled) =
353                    job.workload.execute(messenger, store, id).await
354                {
355                    State::Canceled
356                } else {
357                    State::Executed
358                };
359                callback(self);
360            } else {
361                panic!("job not in the ready state");
362            }
363        }
364    }
365}
366
367pub(super) mod execution {
368    use super::Signature;
369    use crate::job;
370    use futures::channel::oneshot;
371    use std::collections::{HashMap, VecDeque};
372
373    /// The workload types of a [job::Job]. This enumeration is used to define how a [job::Job] will
374    /// be treated in relation to other jobs from the same source.
375    #[derive(PartialEq, Clone, Copy, Debug, Eq, Hash)]
376    pub enum Type {
377        /// Independent jobs are executed in isolation from other [Jobs](job::Job). Some
378        /// functionality is unavailable for Independent jobs, such as caches.
379        Independent,
380        /// Sequential [Jobs](job::Job) wait until all pre-existing [Jobs](job::Job) of the same
381        /// [Signature] are completed.
382        Sequential(Signature),
383    }
384
385    impl Type {
386        pub(super) fn get_signature(&self) -> Option<&Signature> {
387            if let Type::Sequential(signature) = self {
388                Some(signature)
389            } else {
390                None
391            }
392        }
393    }
394
395    #[derive(thiserror::Error, Debug, Clone, Copy)]
396    pub(super) enum GroupError {
397        #[error("The group is closed, so no new jobs can be added")]
398        Closed,
399    }
400
401    /// A collection of [Jobs](job::Job) which have matching execution types. [Groups](Group)
402    /// determine how similar [Jobs](job::Job) are executed.
403    pub(super) struct Group {
404        group_type: Type,
405        active: HashMap<job::Id, Option<oneshot::Sender<()>>>,
406        pending: VecDeque<job::Info>,
407        canceled: bool,
408    }
409
410    impl Group {
411        /// Creates a new [Group] based on the execution [Type].
412        pub(super) fn new(group_type: Type) -> Self {
413            Self { group_type, active: HashMap::new(), pending: VecDeque::new(), canceled: false }
414        }
415
416        /// Returns whether any [Jobs](job::Job) are currently active. Pending [Jobs](job::Job) do
417        /// not count towards this total.
418        pub(super) fn is_active(&self) -> bool {
419            !self.active.is_empty()
420        }
421
422        /// Returns whether there are any [Jobs](job::Job) waiting to be executed.
423        pub(super) fn has_available_jobs(&self) -> bool {
424            if self.pending.is_empty() {
425                return false;
426            }
427
428            match self.group_type {
429                Type::Independent => true,
430                Type::Sequential(_) => self.active.is_empty(),
431            }
432        }
433
434        pub(super) fn add(&mut self, job_info: job::Info) -> Result<(), GroupError> {
435            if self.canceled {
436                Err(GroupError::Closed)
437            } else {
438                self.pending.push_back(job_info);
439                Ok(())
440            }
441        }
442
443        /// Invoked by [Job](super::Job) processing code to retrieve the next [Job](super::Job) to
444        /// run from this group. If there are no [Jobs](super::Job) ready for execution, None is
445        /// return. Otherwise, the selected [Job](super::Job) is added to the list of active
446        /// [Jobs](super::Job) for the group and handed back to the caller.
447        pub(super) fn promote_next_to_active(&mut self) -> Option<job::Info> {
448            if !self.has_available_jobs() {
449                return None;
450            }
451
452            let mut active_job = self.pending.pop_front();
453
454            if let Some(ref mut job) = active_job {
455                let _ = self.active.insert(job.id, job.cancelation_tx.take());
456            }
457
458            active_job
459        }
460
461        pub(super) fn complete(&mut self, job_info: job::Info) {
462            let _ = self.active.remove(&job_info.id);
463        }
464
465        pub(super) fn cancel(&mut self) {
466            self.canceled = true;
467            self.pending.clear();
468            for (_, cancelation_tx) in self.active.iter_mut() {
469                if let Some(cancelation_tx) = cancelation_tx.take() {
470                    let _ = cancelation_tx.send(());
471                }
472            }
473        }
474    }
475}
476
477#[cfg(test)]
478mod tests {
479    use super::*;
480    use crate::message::base::MessengerType;
481    use crate::service::test::Payload;
482    use crate::service::MessageHub;
483    use crate::tests::scaffold::workload::Workload;
484
485    use assert_matches::assert_matches;
486    use rand::Rng;
487
488    #[fuchsia::test]
489    fn test_id_generation() {
490        let mut generator = IdGenerator::new();
491        // Ensure generator creates subsequent ids that don't match.
492        assert!(generator.generate() != generator.generate());
493    }
494
495    #[fuchsia::test(allow_stalls = false)]
496    async fn test_job_functionality() {
497        // Create delegate for communication between components.
498        let message_hub_delegate = MessageHub::create_hub();
499
500        // Create a top-level receptor to receive communication from the workload.
501        let mut receptor = message_hub_delegate
502            .create(MessengerType::Unbound)
503            .await
504            .expect("should create receptor")
505            .1;
506
507        // Create a messenger to send communication from the workload.
508        let messenger = message_hub_delegate
509            .create(MessengerType::Unbound)
510            .await
511            .expect("should create messenger")
512            .0;
513
514        let mut rng = rand::thread_rng();
515        // The value expected to be conveyed from workload to receptor.
516        let val = rng.gen();
517
518        // Create job from workload scaffolding.
519        let job = Job::new(work::Load::Independent(Workload::new(
520            Payload::Integer(val),
521            receptor.get_signature(),
522        )));
523
524        let _ = job
525            .workload
526            .execute(messenger, Some(Rc::new(Mutex::new(HashMap::new()))), 0.into())
527            .await;
528
529        // Confirm received value matches the value sent from workload.
530        assert_matches!(
531            receptor.next_of::<Payload>().await.expect("should return result, not error").0,
532            Payload::Integer(value) if value == val);
533    }
534}