1use crate::service::message;
23use crate::{payload_convert, trace};
24
25use core::fmt::{Debug, Formatter};
26use core::pin::Pin;
27use futures::channel::oneshot;
28use futures::lock::Mutex;
29use futures::stream::Stream;
30use std::any::TypeId;
31use std::collections::HashMap;
32use std::future::Future;
33use std::rc::Rc;
34
35pub mod manager;
36pub mod source;
37
38payload_convert!(Job, Payload);
39
40pub(super) type StoreHandleMapping = HashMap<Signature, data::StoreHandle>;
45type PinStream<T> = Pin<Box<dyn Stream<Item = T>>>;
46type SourceStreamHandle = Rc<Mutex<Option<PinStream<Result<Job, source::Error>>>>>;
47
48#[derive(Clone)]
50pub enum Payload {
51 Source(SourceStreamHandle),
53}
54
55impl Debug for Payload {
56 fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
57 write!(f, "Job Payload")
58 }
59}
60
61impl PartialEq for Payload {
62 fn eq(&self, _other: &Self) -> bool {
63 false
64 }
65}
66
67pub mod data {
68 use crate::base::SettingInfo;
73 use futures::lock::Mutex;
74 use std::collections::HashMap;
75 use std::rc::Rc;
76
77 pub type StoreHandle = Rc<Mutex<HashMap<Key, Data>>>;
79
80 #[derive(Clone, PartialEq, Eq, Hash)]
81 pub enum Key {
82 Identifier(&'static str),
83 #[cfg(test)]
84 TestInteger(usize),
85 }
86
87 #[derive(Clone, PartialEq)]
88 pub enum Data {
89 SettingInfo(SettingInfo),
90 #[cfg(test)]
91 TestData(usize),
92 }
93}
94
95pub mod work {
96 use super::{data, Signature};
97 use crate::service::message;
98 use async_trait::async_trait;
99 use fuchsia_trace as ftrace;
100
101 pub enum Load {
102 Sequential(Box<dyn Sequential>, Signature),
106 Independent(Box<dyn Independent>),
109 }
110
111 pub enum Error {
113 Canceled,
115 }
116
117 impl Load {
118 pub(super) async fn execute(
124 self,
125 messenger: message::Messenger,
126 store: Option<data::StoreHandle>,
127 id: ftrace::Id,
128 ) -> Result<(), Error> {
129 match self {
130 Load::Sequential(load, _) => {
131 load.execute(
132 messenger,
133 store.expect("all sequential loads should have store"),
134 id,
135 )
136 .await
137 }
138 Load::Independent(load) => {
139 load.execute(messenger, id).await;
140 Ok(())
141 }
142 }
143 }
144 }
145
146 impl std::fmt::Debug for Load {
147 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 match self {
149 Load::Sequential(_, _) => f.debug_tuple("Sequential"),
150 Load::Independent(_) => f.debug_tuple("Independent"),
151 }
152 .field(&"..")
153 .finish()
154 }
155 }
156
157 #[async_trait(?Send)]
158 pub trait Sequential {
159 async fn execute(
163 self: Box<Self>,
164 messenger: message::Messenger,
165 store: data::StoreHandle,
166 id: ftrace::Id,
167 ) -> Result<(), Error>;
168 }
169
170 #[async_trait(?Send)]
171 pub trait Independent {
172 async fn execute(self: Box<Self>, messenger: message::Messenger, id: ftrace::Id);
175 }
176}
177
178#[derive(PartialEq, Eq, Copy, Clone, Debug, Hash)]
179enum Either<A, B> {
180 A(A),
181 B(B),
182}
183
184#[derive(PartialEq, Copy, Clone, Debug, Eq, Hash)]
188pub struct Signature {
189 key: Either<TypeId, (TypeId, u64)>,
190}
191
192impl Signature {
193 pub(crate) fn new<T>() -> Self
197 where
198 T: 'static + ?Sized,
199 {
200 Self { key: Either::A(TypeId::of::<T>()) }
201 }
202
203 pub(crate) fn with<T>(key: u64) -> Self
204 where
205 T: 'static + ?Sized,
206 {
207 Self { key: Either::B((TypeId::of::<T>(), key)) }
208 }
209}
210
211#[derive(Debug)]
214pub struct Job {
215 workload: work::Load,
217 execution_type: execution::Type,
219 cancelation_tx: Option<oneshot::Sender<()>>,
221}
222
223impl Job {
224 pub(crate) fn new(workload: work::Load) -> Self {
225 let execution_type = match &workload {
226 work::Load::Sequential(_, signature) => execution::Type::Sequential(*signature),
227 _ => execution::Type::Independent,
228 };
229
230 Self { workload, execution_type, cancelation_tx: None }
231 }
232
233 pub(crate) fn new_with_cancellation(
234 workload: work::Load,
235 cancelation_tx: oneshot::Sender<()>,
236 ) -> Self {
237 let execution_type = match &workload {
238 work::Load::Sequential(_, signature) => execution::Type::Sequential(*signature),
239 _ => execution::Type::Independent,
240 };
241
242 Self { workload, execution_type, cancelation_tx: Some(cancelation_tx) }
243 }
244
245 #[cfg(test)]
246 pub(crate) fn workload(&self) -> &work::Load {
247 &self.workload
248 }
249
250 #[cfg(test)]
251 pub(crate) fn execution_type(&self) -> execution::Type {
252 self.execution_type
253 }
254}
255
256#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
260pub(super) struct Id {
261 _identifier: usize,
262}
263
264impl Id {
265 fn new(identifier: usize) -> Self {
266 Self { _identifier: identifier }
267 }
268}
269
270pub(super) struct IdGenerator {
272 next_identifier: usize,
273}
274
275impl IdGenerator {
276 pub(super) fn new() -> Self {
277 Self { next_identifier: 0 }
278 }
279
280 pub(super) fn generate(&mut self) -> Id {
283 let return_id = Id::new(self.next_identifier);
284 self.next_identifier += 1;
285
286 return_id
287 }
288}
289
290enum State {
292 Ready(Job),
294 Executing,
296 Executed,
298 Canceled,
300}
301
302struct Info {
305 id: Id,
306 state: State,
307 execution_type: execution::Type,
308 cancelation_tx: Option<oneshot::Sender<()>>,
309}
310
311impl Info {
312 fn new(id: Id, mut job: Job) -> Self {
313 let execution_type = job.execution_type;
314 let cancelation_tx = job.cancelation_tx.take();
315 Self { id, state: State::Ready(job), execution_type, cancelation_tx }
316 }
317
318 fn get_execution_type(&self) -> &execution::Type {
320 &self.execution_type
321 }
322
323 async fn prepare_execution<F: FnOnce(Self)>(
327 mut self,
328 delegate: &mut message::Delegate,
329 stores: &mut StoreHandleMapping,
330 callback: F,
331 ) -> impl Future<Output = ()> {
332 let messenger = delegate
335 .create(message::MessengerType::Unbound)
336 .await
337 .expect("messenger should be available")
338 .0;
339
340 let store = self
341 .execution_type
342 .get_signature()
343 .map(|signature| stores.entry(*signature).or_default().clone());
344
345 async move {
346 let id = fuchsia_trace::Id::new();
347 trace!(id, c"job execution");
348 let mut state = State::Executing;
349 std::mem::swap(&mut state, &mut self.state);
350
351 if let State::Ready(job) = state {
352 self.state = if let Err(work::Error::Canceled) =
353 job.workload.execute(messenger, store, id).await
354 {
355 State::Canceled
356 } else {
357 State::Executed
358 };
359 callback(self);
360 } else {
361 panic!("job not in the ready state");
362 }
363 }
364 }
365}
366
367pub(super) mod execution {
368 use super::Signature;
369 use crate::job;
370 use futures::channel::oneshot;
371 use std::collections::{HashMap, VecDeque};
372
373 #[derive(PartialEq, Clone, Copy, Debug, Eq, Hash)]
376 pub enum Type {
377 Independent,
380 Sequential(Signature),
383 }
384
385 impl Type {
386 pub(super) fn get_signature(&self) -> Option<&Signature> {
387 if let Type::Sequential(signature) = self {
388 Some(signature)
389 } else {
390 None
391 }
392 }
393 }
394
395 #[derive(thiserror::Error, Debug, Clone, Copy)]
396 pub(super) enum GroupError {
397 #[error("The group is closed, so no new jobs can be added")]
398 Closed,
399 }
400
401 pub(super) struct Group {
404 group_type: Type,
405 active: HashMap<job::Id, Option<oneshot::Sender<()>>>,
406 pending: VecDeque<job::Info>,
407 canceled: bool,
408 }
409
410 impl Group {
411 pub(super) fn new(group_type: Type) -> Self {
413 Self { group_type, active: HashMap::new(), pending: VecDeque::new(), canceled: false }
414 }
415
416 pub(super) fn is_active(&self) -> bool {
419 !self.active.is_empty()
420 }
421
422 pub(super) fn has_available_jobs(&self) -> bool {
424 if self.pending.is_empty() {
425 return false;
426 }
427
428 match self.group_type {
429 Type::Independent => true,
430 Type::Sequential(_) => self.active.is_empty(),
431 }
432 }
433
434 pub(super) fn add(&mut self, job_info: job::Info) -> Result<(), GroupError> {
435 if self.canceled {
436 Err(GroupError::Closed)
437 } else {
438 self.pending.push_back(job_info);
439 Ok(())
440 }
441 }
442
443 pub(super) fn promote_next_to_active(&mut self) -> Option<job::Info> {
448 if !self.has_available_jobs() {
449 return None;
450 }
451
452 let mut active_job = self.pending.pop_front();
453
454 if let Some(ref mut job) = active_job {
455 let _ = self.active.insert(job.id, job.cancelation_tx.take());
456 }
457
458 active_job
459 }
460
461 pub(super) fn complete(&mut self, job_info: job::Info) {
462 let _ = self.active.remove(&job_info.id);
463 }
464
465 pub(super) fn cancel(&mut self) {
466 self.canceled = true;
467 self.pending.clear();
468 for (_, cancelation_tx) in self.active.iter_mut() {
469 if let Some(cancelation_tx) = cancelation_tx.take() {
470 let _ = cancelation_tx.send(());
471 }
472 }
473 }
474 }
475}
476
477#[cfg(test)]
478mod tests {
479 use super::*;
480 use crate::message::base::MessengerType;
481 use crate::service::test::Payload;
482 use crate::service::MessageHub;
483 use crate::tests::scaffold::workload::Workload;
484
485 use assert_matches::assert_matches;
486 use rand::Rng;
487
488 #[fuchsia::test]
489 fn test_id_generation() {
490 let mut generator = IdGenerator::new();
491 assert!(generator.generate() != generator.generate());
493 }
494
495 #[fuchsia::test(allow_stalls = false)]
496 async fn test_job_functionality() {
497 let message_hub_delegate = MessageHub::create_hub();
499
500 let mut receptor = message_hub_delegate
502 .create(MessengerType::Unbound)
503 .await
504 .expect("should create receptor")
505 .1;
506
507 let messenger = message_hub_delegate
509 .create(MessengerType::Unbound)
510 .await
511 .expect("should create messenger")
512 .0;
513
514 let mut rng = rand::thread_rng();
515 let val = rng.gen();
517
518 let job = Job::new(work::Load::Independent(Workload::new(
520 Payload::Integer(val),
521 receptor.get_signature(),
522 )));
523
524 let _ = job
525 .workload
526 .execute(messenger, Some(Rc::new(Mutex::new(HashMap::new()))), 0.into())
527 .await;
528
529 assert_matches!(
531 receptor.next_of::<Payload>().await.expect("should return result, not error").0,
532 Payload::Integer(value) if value == val);
533 }
534}