settings/job/
manager.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Job Management Support
6//!
7//! # Summary
8//!
9//! The manager mod defines entities for managing [Job] sources and controlling the execution of
10//! pending [workloads](crate::job::work::Load) contained in those [Jobs](Job). [Manager] provides a
11//! concrete implementation of a [Job] processor. Outside clients send [Job] sources to the
12//! [Manager] over the [MessageHub](crate::message::message_hub::MessageHub). In turn, the [Manager]
13//! will process each received source for new [Jobs](Job) and provide the necessary backing, such as
14//! caches, to support executing the [Job].
15
16use crate::event::source::Event as SourceEvent;
17use crate::event::{self, Event};
18use crate::job::source::{self, Error};
19use crate::job::{self, Job, Payload, PinStream};
20use crate::message::base::MessengerType;
21use crate::service::{self, message};
22use crate::trace;
23use futures::stream::{FuturesUnordered, StreamFuture};
24use futures::{FutureExt, StreamExt};
25use std::collections::HashMap;
26use {fuchsia_async as fasync, fuchsia_trace as ftrace};
27
28type JobStreamItem = (source::Id, Option<Result<Job, Error>>);
29
30/// [Manager] processes incoming streams for new [Job]s. [Job]s are handled and executed by the
31/// [Manager] based on the [Job] definitions.
32pub(crate) struct Manager {
33    /// A mapping from [source id](source::Id) to [handler](source::Handler). This mapping is used
34    /// to retrieve the [handler](source::Handler) for job updates (inserting, retrieving,
35    /// completing) and source maintenance (cleaning up on exit).
36    sources: HashMap<source::Id, source::Handler>,
37    /// A collection of sources given to this manager. Each source is associated with a stream of
38    /// requests. Each item produced by streaming this collection represents the next request from
39    /// some particular source. It will produce a tuple of the intended item and the rest of
40    /// the stream for the corresponding source. The intended item is another tuple that contains a
41    /// [source id](source::Id) and a [Job]. Once the stream has been closed, `None` will be passed
42    /// as the [Job] portion of the tuple.
43    job_futures: FuturesUnordered<StreamFuture<PinStream<JobStreamItem>>>,
44    /// A [Id generator](source::IdGenerator) responsible for producing unique [Ids](source::Id) for
45    /// the received sources.
46    source_id_generator: source::IdGenerator,
47    /// A Sender used to communicate back to the [Manager] that the execution of a [Job] has
48    /// completed.
49    execution_completion_sender: futures::channel::mpsc::UnboundedSender<(source::Id, job::Info)>,
50    /// A [delegate](message::Delegate) used to generate the necessary messaging components for
51    /// [Jobs](Job) to use.
52    message_hub_delegate: message::Delegate,
53    /// An event publisher used to signal when a source has begun and ended.
54    event_publisher: event::Publisher,
55}
56
57impl Manager {
58    /// Creates a new [Manager] with the given MessageHub. A reference to the service MessageHub is
59    /// provided so that it can be passed to [Jobs](Job) for communicating with the rest of the
60    /// service.
61    pub(crate) async fn spawn(message_hub_delegate: &message::Delegate) -> message::Signature {
62        // Create a top-level receptor in the MessageHub to accept new sources from.
63        let receptor = message_hub_delegate
64            .create(MessengerType::Unbound)
65            .await
66            .expect("messenger should be available")
67            .1;
68
69        // Create a channel for execution tasks to communicate when a Job has been completed.
70        let (execution_completion_sender, execution_completion_receiver) =
71            futures::channel::mpsc::unbounded::<(source::Id, job::Info)>();
72
73        // Capture the top-level receptor's signature so it can be passed back
74        // to the caller for sending new sources.
75        let signature = receptor.get_signature();
76        let event_publisher =
77            event::Publisher::create(message_hub_delegate, MessengerType::Unbound).await;
78
79        let mut manager = Self {
80            sources: HashMap::new(),
81            job_futures: FuturesUnordered::new(),
82            source_id_generator: source::IdGenerator::new(),
83            execution_completion_sender,
84            message_hub_delegate: message_hub_delegate.clone(),
85            event_publisher,
86        };
87
88        // Spawn a task to run the main event loop, which handles the following events:
89        // 1) Receiving new sources to process
90        // 2) Accepting and processing new jobs from sources
91        // 3) Executing jobs and handling the their results
92        fasync::Task::local(async move {
93            let id = ftrace::Id::new();
94            trace!(id, c"job_manager");
95            let source_fuse = receptor.fuse();
96            let execution_fuse = execution_completion_receiver.fuse();
97
98            futures::pin_mut!(source_fuse, execution_fuse);
99            loop {
100                futures::select! {
101                    source_event = source_fuse.select_next_some() => {
102                        trace!(id, c"process_source_event");
103                        manager.process_source_event(source_event).await;
104                    },
105                    (source_id, job_info) = execution_fuse.select_next_some() => {
106                        trace!(id, c"process_completed_execution");
107                        manager.process_completed_execution(source_id, job_info, id).await;
108                    },
109                    (job_info, stream) = manager.job_futures.select_next_some() => {
110                        trace!(id, c"process_job");
111                        // Since the manager owns job_futures, we should never reach the end of
112                        // the stream.
113                        let (source_id, job) = job_info.expect("job should be present");
114                        manager.process_job(source_id, job, stream, id).await;
115                    }
116                }
117            }
118        })
119        .detach();
120
121        signature
122    }
123
124    // Propagates results of a completed job by cleaning up references, informing the parent source
125    // of the job completion, and checking if another job can be processed.
126    async fn process_completed_execution(
127        &mut self,
128        source_id: source::Id,
129        job_info: job::Info,
130        id: ftrace::Id,
131    ) {
132        // Fetch the source and inform it that its child Job has completed.
133        let source_handler = &mut self.sources.get_mut(&source_id).expect("should find source");
134        source_handler.handle_job_completion(job_info);
135        self.remove_source_if_necessary(source_id);
136
137        // Continue processing available jobs.
138        self.process_next_job(id).await;
139    }
140
141    // Executes the next job if conditions to run another job are met. If so, the manager consults
142    // available sources for a candidate job and then executes the first one found.
143    async fn process_next_job(&mut self, id: ftrace::Id) {
144        // Iterate through sources and see if any source has a pending job
145        for (source_id, source_handler) in &mut self.sources {
146            let source_id = *source_id;
147            let execution_tx = self.execution_completion_sender.clone();
148
149            // Ignore the executed status.
150            let _ = source_handler
151                .execute_next(
152                    &mut self.message_hub_delegate,
153                    move |job_info| {
154                        if let Err(error) = execution_tx.unbounded_send((source_id, job_info)) {
155                            panic!("Failed to send message. error: {error:?}");
156                        };
157                    },
158                    id,
159                )
160                .await;
161        }
162    }
163
164    // Processes a new source, generating the associated tracking data and inserting its job stream
165    // into the monitored job futures.
166    async fn process_source_event(&mut self, event: service::message::MessageEvent) {
167        // Manager only expects to receive new job streams from events passed into this method.
168        let Payload::Source(source) = Payload::try_from(event).expect("should convert to source");
169
170        // Extract job stream from payload.
171        let job_stream = source.lock().await.take().expect("should capture job stream");
172
173        // Associate stream with a new id.
174        let source_id = self.source_id_generator.generate();
175
176        // Create a handler to manage jobs produced by this stream.
177        let _ = self.sources.insert(source_id, source::Handler::new());
178
179        // Add the stream to the monitored pool. Associate jobs with the source id along with
180        // appending an empty value to the end for indicating when the stream has completed.
181        let stream_fut = job_stream
182            .map(move |val| (source_id, Some(val)))
183            .chain(async move { (source_id, None) }.into_stream())
184            .boxed_local()
185            .into_future();
186        self.job_futures.push(stream_fut);
187        self.event_publisher.send_event(Event::Source(SourceEvent::Start(source_id)));
188    }
189
190    async fn process_job(
191        &mut self,
192        source: source::Id,
193        job: Option<Result<Job, Error>>,
194        source_stream: PinStream<JobStreamItem>,
195        id: ftrace::Id,
196    ) {
197        match job {
198            Some(Ok(job)) => {
199                // When the stream produces a job, associate with the appropriate source. Then try
200                // to see if any job is available to run.
201                if let Err(e) = self
202                    .sources
203                    .get_mut(&source)
204                    .expect("source should be present")
205                    .add_pending_job(job)
206                {
207                    log::error!("Failed to add job: {:?}", e);
208                    return;
209                }
210            }
211            Some(Err(Error::InvalidInput(error_responder))) => {
212                // When the stream failed to produce a job due to bad input, report back the error
213                // through the APIs error responder.
214                let id = error_responder.id();
215                if let Err(e) = error_responder.respond(fidl_fuchsia_settings::Error::Failed) {
216                    log::warn!(
217                        "Failed to report invalid input error to caller on API {} with id {:?}: \
218                            {:?}",
219                        id,
220                        source,
221                        e
222                    );
223                }
224            }
225            Some(Err(Error::InvalidPolicyInput(error_responder))) => {
226                // When the stream failed to produce a job due to bad input, report back the error
227                // through the APIs error responder.
228                let id = error_responder.id();
229                if let Err(e) = error_responder.respond(fidl_fuchsia_settings_policy::Error::Failed)
230                {
231                    log::warn!(
232                        "Failed to report invalid policy input error to caller on policy API {} \
233                            with id {:?}: {:?}",
234                        id,
235                        source,
236                        e
237                    );
238                }
239            }
240            Some(Err(Error::Unexpected(err))) if !err.is_closed() => {
241                // No-op. If the error did not close the stream then just warn and allow the rest
242                // of the stream to continue processing.
243                log::warn!("Received an unexpected error on source {:?}: {:?}", source, err);
244            }
245            Some(Err(err @ (Error::Unexpected(_) | Error::Unsupported))) => {
246                // All other errors cause the source stream to close. Clean up the source and cancel
247                // any pending jobs. We still need to wait for any remaining jobs to finish.
248                log::warn!(
249                    "Unable to process anymore job requests for {:?} due to fatal error: {:?}",
250                    source,
251                    err
252                );
253                self.cancel_source(source);
254                self.event_publisher
255                    .send_event(Event::Source(SourceEvent::Complete(source, Err(err.into()))));
256                return;
257            }
258            None => {
259                // The end of the stream has been reached (None), so clean up the source.
260                self.cancel_source(source);
261                self.event_publisher
262                    .send_event(Event::Source(SourceEvent::Complete(source, Ok(()))));
263                return;
264            }
265        }
266
267        self.job_futures.push(source_stream.into_future());
268        self.process_next_job(id).await;
269    }
270
271    fn cancel_source(&mut self, source_id: source::Id) {
272        self.sources.get_mut(&source_id).expect("should find source").cancel();
273        self.remove_source_if_necessary(source_id);
274    }
275
276    fn remove_source_if_necessary(&mut self, source_id: source::Id) {
277        let source_info = self.sources.get_mut(&source_id).expect("should find source");
278
279        if source_info.is_completed() {
280            let _ = self.sources.remove(&source_id);
281        }
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288    use crate::event::source::CompleteError;
289    use crate::message::base::Audience;
290    use crate::service::{build_event_listener, test, MessageHub};
291    use crate::tests::scaffold::workload::Workload;
292    use assert_matches::assert_matches;
293    use async_trait::async_trait;
294
295    use futures::channel::mpsc;
296    use futures::channel::oneshot::{self, Receiver, Sender};
297    use futures::lock::Mutex;
298    use std::rc::Rc;
299
300    // Validates that multiple messages can be handled from a single source
301    #[fuchsia::test(allow_stalls = false)]
302    async fn test_manager_job_processing_multiple_jobs_one_source() {
303        // Create delegate for communication between components.
304        let message_hub_delegate = MessageHub::create_hub();
305
306        let results = 0..10;
307
308        // Create a top-level receptor to receive job results from.
309        let mut receptor = message_hub_delegate
310            .create(MessengerType::Unbound)
311            .await
312            .expect("should create receptor")
313            .1;
314
315        let manager_signature = Manager::spawn(&message_hub_delegate).await;
316
317        // Create a messenger to send job sources to the manager.
318        let messenger = message_hub_delegate
319            .create(MessengerType::Unbound)
320            .await
321            .expect("should create messenger")
322            .0;
323
324        let (requests_tx, requests_rx) = mpsc::unbounded();
325
326        // Send multiple jobs in one source.
327        for result in results.clone() {
328            let signature = receptor.get_signature();
329            requests_tx
330                .unbounded_send(Ok(Job::new(job::work::Load::Independent(Workload::new(
331                    test::Payload::Integer(result),
332                    signature,
333                )))))
334                .expect("Should be able to queue requests");
335        }
336
337        let _ = messenger.message(
338            Payload::Source(Rc::new(Mutex::new(Some(requests_rx.boxed_local())))).into(),
339            Audience::Messenger(manager_signature),
340        );
341
342        for result in results {
343            // Confirm received value matches the value sent from workload.
344            assert_matches!(receptor.next_of::<test::Payload>().await.expect("should have payload").0,
345                test::Payload::Integer(value) if value == result);
346        }
347    }
348
349    // Validates that a request that failed to convert to a job does not block the remaining jobs
350    // from running.
351    #[fuchsia::test(allow_stalls = false)]
352    async fn test_manager_job_processing_handles_errored_conversions() {
353        struct TestResponder;
354        impl source::ErrorResponder for TestResponder {
355            fn id(&self) -> &'static str {
356                "Test"
357            }
358
359            fn respond(
360                self: Box<Self>,
361                error: fidl_fuchsia_settings::Error,
362            ) -> Result<(), fidl::Error> {
363                assert_eq!(error, fidl_fuchsia_settings::Error::Failed);
364                Ok(())
365            }
366        }
367
368        // Create delegate for communication between components.
369        let message_hub_delegate = MessageHub::create_hub();
370
371        const RESULT: i64 = 1;
372
373        // Create a top-level receptor to receive job results from.
374        let mut receptor = message_hub_delegate
375            .create(MessengerType::Unbound)
376            .await
377            .expect("should create receptor")
378            .1;
379
380        let manager_signature = Manager::spawn(&message_hub_delegate).await;
381
382        // Create a messenger to send job sources to the manager.
383        let messenger = message_hub_delegate
384            .create(MessengerType::Unbound)
385            .await
386            .expect("should create messenger")
387            .0;
388
389        let (requests_tx, requests_rx) = mpsc::unbounded();
390
391        // Send an error (conversion failed) before a valid job.
392        requests_tx
393            .unbounded_send(Err(Error::InvalidInput(Box::new(TestResponder))))
394            .expect("Should be able to queue requests");
395
396        // Now send a valid job, which should be processed after the error.
397        let signature = receptor.get_signature();
398        requests_tx
399            .unbounded_send(Ok(Job::new(job::work::Load::Independent(Workload::new(
400                test::Payload::Integer(RESULT),
401                signature,
402            )))))
403            .expect("Should be able to queue requests");
404
405        let _ = messenger.message(
406            Payload::Source(Rc::new(Mutex::new(Some(requests_rx.boxed_local())))).into(),
407            Audience::Messenger(manager_signature),
408        );
409
410        // Confirm received value matches the value sent from the second job.
411        assert_matches!(receptor.next_of::<test::Payload>().await.expect("should have payload").0,
412            test::Payload::Integer(value) if value == RESULT);
413    }
414
415    // Validates that a request that failed to convert to a job does not block the remaining jobs
416    // from running.
417    #[fuchsia::test(allow_stalls = false)]
418    async fn test_manager_job_processing_handles_errored_fidl() {
419        // Create delegate for communication between components.
420        let message_hub_delegate = MessageHub::create_hub();
421
422        // Create a top-level receptor to receive job results from.
423        let mut receptor = message_hub_delegate
424            .create(MessengerType::Unbound)
425            .await
426            .expect("should create receptor")
427            .1;
428
429        let mut event_listener = build_event_listener(&message_hub_delegate).await;
430
431        let manager_signature = Manager::spawn(&message_hub_delegate).await;
432
433        // Create a messenger to send job sources to the manager.
434        let messenger = message_hub_delegate
435            .create(MessengerType::Unbound)
436            .await
437            .expect("should create messenger")
438            .0;
439
440        let (requests_tx, requests_rx) = mpsc::unbounded();
441
442        // Send a fidl error before a valid job.
443        requests_tx
444            .unbounded_send(Err(Error::Unexpected(fidl::Error::ClientChannelClosed {
445                status: zx::Status::PEER_CLOSED,
446                protocol_name: "",
447                epitaph: None,
448            })))
449            .expect("Should be able to queue requests");
450
451        // Now send a valid job, which should not be processed after the error.
452        let signature = receptor.get_signature();
453        requests_tx
454            .unbounded_send(Ok(Job::new(job::work::Load::Independent(Workload::new(
455                test::Payload::Integer(1),
456                signature,
457            )))))
458            .expect("Should be able to queue requests");
459
460        let _ = messenger.message(
461            Payload::Source(Rc::new(Mutex::new(Some(requests_rx.boxed_local())))).into(),
462            Audience::Messenger(manager_signature),
463        );
464
465        // Ensure the source started and completed before moving on.
466        assert_matches!(
467            event_listener.next_of::<event::Payload>().await,
468            Ok((event::Payload::Event(Event::Source(SourceEvent::Start(_))), _))
469        );
470        assert_matches!(
471            event_listener.next_of::<event::Payload>().await,
472            Ok((
473                event::Payload::Event(Event::Source(SourceEvent::Complete(
474                    _,
475                    Err(CompleteError::Unexpected)
476                ))),
477                _
478            ))
479        );
480
481        // Now we can delete the receptor signature so we don't hang the test on the next assertion.
482        message_hub_delegate.delete(signature);
483
484        // Confirm we never get the result from the request.
485        assert!(receptor.next_of::<test::Payload>().await.is_err());
486    }
487
488    // Validates that an InvalidPolicyInput error causes the stream to close and not run further
489    // jobs.
490    #[fuchsia::test(allow_stalls = false)]
491    async fn test_invalid_policy_input_returns_error() {
492        struct TestPolicyResponder;
493        impl source::PolicyErrorResponder for TestPolicyResponder {
494            fn id(&self) -> &'static str {
495                "Test"
496            }
497
498            fn respond(
499                self: Box<Self>,
500                error: fidl_fuchsia_settings_policy::Error,
501            ) -> Result<(), fidl::Error> {
502                assert_eq!(error, fidl_fuchsia_settings_policy::Error::Failed);
503                Ok(())
504            }
505        }
506
507        // Create delegate for communication between components.
508        let message_hub_delegate = MessageHub::create_hub();
509
510        const RESULT: i64 = 1;
511
512        // Create a top-level receptor to receive job results from.
513        let mut receptor = message_hub_delegate
514            .create(MessengerType::Unbound)
515            .await
516            .expect("should create receptor")
517            .1;
518
519        let manager_signature = Manager::spawn(&message_hub_delegate).await;
520
521        // Create a messenger to send job sources to the manager.
522        let messenger = message_hub_delegate
523            .create(MessengerType::Unbound)
524            .await
525            .expect("should create messenger")
526            .0;
527
528        let (requests_tx, requests_rx) = mpsc::unbounded();
529
530        // Send a fidl error before a valid job.
531        requests_tx
532            .unbounded_send(Err(Error::InvalidPolicyInput(Box::new(TestPolicyResponder))))
533            .expect("Should be able to queue requests");
534
535        // Now send a valid job, which should not be processed after the error.
536        let signature = receptor.get_signature();
537        requests_tx
538            .unbounded_send(Ok(Job::new(job::work::Load::Independent(Workload::new(
539                test::Payload::Integer(RESULT),
540                signature,
541            )))))
542            .expect("Should be able to queue requests");
543
544        let _ = messenger.message(
545            Payload::Source(Rc::new(Mutex::new(Some(requests_rx.boxed_local())))).into(),
546            Audience::Messenger(manager_signature),
547        );
548
549        // Confirm received value matches the value sent from the second job.
550        assert_matches!(receptor.next_of::<test::Payload>().await.expect("should have payload").0,
551            test::Payload::Integer(value) if value == RESULT);
552    }
553
554    struct WaitingWorkload {
555        rx: Receiver<()>,
556        execute_tx: Sender<()>,
557    }
558
559    impl WaitingWorkload {
560        fn new(rx: Receiver<()>, tx: Sender<()>) -> Self {
561            Self { rx, execute_tx: tx }
562        }
563    }
564
565    // This implementation can be used to imitate a hanging get by delaying or never sending a
566    // message across its channel.
567    #[async_trait(?Send)]
568    impl job::work::Sequential for WaitingWorkload {
569        async fn execute(
570            self: Box<Self>,
571            _: message::Messenger,
572            _: job::data::StoreHandle,
573            _id: ftrace::Id,
574        ) -> Result<(), job::work::Error> {
575            self.execute_tx.send(()).expect("Should be able to signal start of execution");
576            let _ = self.rx.await;
577            Ok(())
578        }
579    }
580
581    // Validates that a hanging get on one source does not block jobs from being processed on
582    // another source.
583    #[fuchsia::test(allow_stalls = false)]
584    async fn test_manager_job_processing_multiple_sources() {
585        // Create delegate for communication between components.
586        let message_hub_delegate = MessageHub::create_hub();
587
588        let manager_signature = Manager::spawn(&message_hub_delegate).await;
589
590        // Create a messenger to send job sources to the manager.
591        let messenger = message_hub_delegate
592            .create(MessengerType::Unbound)
593            .await
594            .expect("should create messenger")
595            .0;
596
597        // Send each job as a separate source.
598
599        // The first one should hang (hence the _tx) and never complete, to mimic a hanging get.
600        let (_tx, rx) = oneshot::channel();
601        let (execute_tx, execute_rx) = oneshot::channel();
602        let (requests_tx, requests_rx) = mpsc::unbounded();
603        requests_tx
604            .unbounded_send(Ok(Job::new(job::work::Load::Sequential(
605                Box::new(WaitingWorkload::new(rx, execute_tx)),
606                job::Signature::new::<usize>(),
607            ))))
608            .expect("Should be able to send queue");
609        let _ = messenger.message(
610            Payload::Source(Rc::new(Mutex::new(Some(requests_rx.boxed_local())))).into(),
611            Audience::Messenger(manager_signature),
612        );
613
614        // Ensure the requests is in the hanging portion of execute.
615        execute_rx.await.expect("Should have started hung execution");
616
617        // Then send the second request as a new source.
618        let result = 1;
619        let mut receptor = message_hub_delegate
620            .create(MessengerType::Unbound)
621            .await
622            .expect("should create receptor")
623            .1;
624        let signature = receptor.get_signature();
625        let (requests_tx, requests_rx) = mpsc::unbounded();
626        requests_tx
627            .unbounded_send(Ok(Job::new(job::work::Load::Sequential(
628                Workload::new(test::Payload::Integer(result), signature),
629                job::Signature::new::<usize>(),
630            ))))
631            .expect("Should be able to send queue");
632
633        let _ = messenger.message(
634            Payload::Source(Rc::new(Mutex::new(Some(requests_rx.boxed_local())))).into(),
635            Audience::Messenger(manager_signature),
636        );
637
638        // Confirm received value matches the value sent from workload.
639        assert_matches!(receptor.next_of::<test::Payload>().await.expect("should have payload").0,
640            test::Payload::Integer(value) if value == result);
641    }
642
643    // Validates that sequential jobs like hanging gets are canceled when the source stream ends,
644    // which corresponds to a client closing their connection.
645    #[fuchsia::test(allow_stalls = false)]
646    async fn test_manager_cancels_jobs_on_stream_end() {
647        // Create delegate for communication between components.
648        let message_hub_delegate = MessageHub::create_hub();
649
650        let manager_signature = Manager::spawn(&message_hub_delegate).await;
651
652        // Create a messenger to send job sources to the manager.
653        let messenger = message_hub_delegate
654            .create(MessengerType::Unbound)
655            .await
656            .expect("should create messenger")
657            .0;
658
659        // Send a job source with one job that hangs forever, to mimic a hanging get.
660        let (_tx, rx) = oneshot::channel();
661        let (execute_tx, execute_rx) = oneshot::channel();
662        let (cancelation_tx, cancelation_rx) = oneshot::channel();
663        let (requests_tx, requests_rx) = mpsc::unbounded();
664        requests_tx
665            .unbounded_send(Ok(Job::new_with_cancellation(
666                job::work::Load::Sequential(
667                    Box::new(WaitingWorkload::new(rx, execute_tx)),
668                    job::Signature::new::<usize>(),
669                ),
670                cancelation_tx,
671            )))
672            .expect("Should be able to send queue");
673        let _ = messenger.message(
674            Payload::Source(Rc::new(Mutex::new(Some(requests_rx.boxed_local())))).into(),
675            Audience::Messenger(manager_signature),
676        );
677
678        // Ensure the request is in the hanging portion of execute.
679        execute_rx.await.expect("Should have started hung execution");
680
681        // Send the end of the source stream, to mimic a client closing its connection.
682        requests_tx.close_channel();
683
684        // Expect that the job received the cancelation signal.
685        cancelation_rx.await.expect("Hanging is cancelled");
686    }
687}