shutdown_shim/
collaborative_reboot.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! An implementation of collaborative reboot for Fuchsia.
6//!
7//! Collaborative reboot is a mechanism that allows multiple actors to work
8//! together to schedule a device reboot at a time that avoids user disruption.
9//!
10//! Scheduler actors express their desire to reboot the device by issuing
11//! `ScheduleReboot` requests. While the Initiator actor expresses when it is an
12//! appropriate time to reboot the device by issuing a `PerformPendingReboot`
13//! request.
14
15use fidl_fuchsia_power::{
16    CollaborativeRebootInitiatorPerformPendingRebootResponse as PerformPendingRebootResponse,
17    CollaborativeRebootInitiatorRequest as InitiatorRequest,
18    CollaborativeRebootInitiatorRequestStream as InitiatorRequestStream,
19};
20use fidl_fuchsia_power_internal::{
21    CollaborativeRebootReason as Reason, CollaborativeRebootSchedulerRequest as SchedulerRequest,
22    CollaborativeRebootSchedulerRequestStream as SchedulerRequestStream,
23};
24use fuchsia_async::OnSignals;
25use fuchsia_inspect::NumericProperty;
26use fuchsia_sync::Mutex;
27use futures::{StreamExt, TryStreamExt};
28use std::sync::Arc;
29
30/// The signals that can be used to cancel a scheduled collaborative reboot.
31const CANCELLATION_SIGNALS: zx::Signals =
32    zx::Signals::USER_ALL.union(zx::Signals::OBJECT_PEER_CLOSED);
33
34/// Construct a new [`State`] and it's paired [`Cancellations`].
35pub(super) fn new(inspector: &fuchsia_inspect::Inspector) -> (State, Cancellations) {
36    let (cancellation_sender, cancellation_receiver) = futures::channel::mpsc::unbounded();
37    let cr_node = inspector.root().create_child("CollaborativeReboot");
38    let scheduled_requests_node = cr_node.create_child("ScheduledRequests");
39
40    let scheduled_requests: Arc<Mutex<ScheduledRequests>> =
41        Arc::new(Mutex::new(ScheduledRequests::new(&scheduled_requests_node)));
42    (
43        State {
44            scheduled_requests: scheduled_requests.clone(),
45            cancellation_sender,
46            _inspect_nodes: [cr_node, scheduled_requests_node],
47        },
48        Cancellations { scheduled_requests, cancellation_receiver },
49    )
50}
51
52#[derive(Debug)]
53/// Collaborative Reboot State.
54pub(super) struct State {
55    /// The current outstanding requests for collaborative reboot.
56    ///
57    /// Also held by [`Cancellations`].
58    scheduled_requests: Arc<Mutex<ScheduledRequests>>,
59
60    /// The sender of cancellation signals.
61    ///
62    /// The receiver half is held by [`Cancellations`].
63    cancellation_sender: futures::channel::mpsc::UnboundedSender<Cancel>,
64
65    /// A container to hold the Inspect nodes for collaborative reboot.
66    ///
67    /// These nodes need to retained, as dropping them would delete the
68    /// underlying Inspect data.
69    _inspect_nodes: [fuchsia_inspect::types::Node; 2],
70}
71
72impl State {
73    /// Handle requests from a [`CollaborativeRebootScheduler`].
74    pub(super) async fn handle_scheduler_requests(&self, mut stream: SchedulerRequestStream) {
75        while let Ok(Some(request)) = stream.try_next().await {
76            match request {
77                SchedulerRequest::ScheduleReboot { reason, cancel, responder } => {
78                    {
79                        let mut scheduled_requests = self.scheduled_requests.lock();
80                        scheduled_requests.schedule(reason);
81                        println!(
82                            "[shutdown-shim] Collaborative reboot scheduled for reason: \
83                            {reason:?}. Current scheduled requests: {scheduled_requests}"
84                        );
85                    }
86                    if let Some(cancel) = cancel {
87                        self.cancellation_sender
88                            .unbounded_send(Cancel { signal: cancel, reason })
89                            .expect("receiver should not close");
90                    }
91                    match responder.send() {
92                        Ok(()) => {}
93                        Err(e) => {
94                            eprintln!(
95                                "[shutdown-shim] Failed to respond to 'ScheduleReboot': {e:?}"
96                            );
97                            // Returning closes the connection.
98                            return;
99                        }
100                    }
101                }
102            }
103        }
104    }
105
106    /// Handle requests from a [`CollaborativeRebootInitiator`].
107    pub(super) async fn handle_initiator_requests<R: RebootActuator>(
108        &self,
109        mut stream: InitiatorRequestStream,
110        actuator: &R,
111    ) {
112        while let Ok(Some(request)) = stream.try_next().await {
113            match request {
114                InitiatorRequest::PerformPendingReboot { responder } => {
115                    let reboot_reasons = {
116                        let scheduled_requests = self.scheduled_requests.lock();
117                        println!(
118                            "[shutdown-shim] Asked to perform collaborative reboot. \
119                            Current scheduled requests: {scheduled_requests}"
120                        );
121                        scheduled_requests.list_reasons()
122                    };
123
124                    let rebooting = !reboot_reasons.is_empty();
125
126                    if rebooting {
127                        println!("[shutdown-shim] Performing collaborative reboot ...");
128                        match actuator.perform_reboot(reboot_reasons).await {
129                            Ok(()) | Err(zx::Status::ALREADY_EXISTS) => {}
130                            Err(status) => {
131                                eprintln!("[shutdown-shim] Failed to perform reboot: {status:?}");
132                                // Returning closes the connection.
133                                return;
134                            }
135                        }
136                    }
137
138                    match responder.send(&PerformPendingRebootResponse {
139                        rebooting: Some(rebooting),
140                        __source_breaking: fidl::marker::SourceBreaking,
141                    }) {
142                        Ok(()) => {}
143                        Err(e) => {
144                            eprintln!(
145                                "[shutdown-shim] Failed to respond to 'PerformPendingReboot': {e:?}"
146                            );
147                            // Returning closes the connection.
148                            return;
149                        }
150                    }
151                }
152            }
153        }
154    }
155}
156
157/// Holds cancellation signals for collaborative reboot.
158pub(super) struct Cancellations {
159    /// The current outstanding requests for collaborative reboot.
160    ///
161    /// Also held by [`State`].
162    scheduled_requests: Arc<Mutex<ScheduledRequests>>,
163
164    /// The receiver of cancellation signals.
165    ///
166    /// The sender half is held by [`State`].
167    cancellation_receiver: futures::channel::mpsc::UnboundedReceiver<Cancel>,
168}
169
170impl Cancellations {
171    /// Drives the cancellation of scheduled requests.
172    ///
173    /// The returned future will only exit if the corresponding [`State`] is
174    /// dropped.
175    pub(super) async fn run(self) -> () {
176        let Self { scheduled_requests, cancellation_receiver } = self;
177        cancellation_receiver
178            .for_each_concurrent(None, |Cancel { reason, signal }| {
179                let scheduled_requests = scheduled_requests.clone();
180                async move {
181                    // Note: Panic if we observe an error, as all `Err` statuses
182                    // represent programming errors. See the documentation for
183                    // `zx_object_wait_one`:
184                    // https://fuchsia.dev/reference/syscalls/object_wait_one.
185                    let _signals = OnSignals::new(signal, CANCELLATION_SIGNALS)
186                        .await
187                        .expect("failed to wait for signals on eventpair");
188                    {
189                        let mut scheduled_requests = scheduled_requests.lock();
190                        scheduled_requests.cancel(reason);
191                        println!(
192                            "[shutdown-shim] Collaborative reboot canceled for reason: {reason:?}. \
193                            Current scheduled requests: {scheduled_requests}"
194                        );
195                    }
196                }
197            })
198            .await
199    }
200}
201
202// A tuple of the cancellation signal, and the reason being canceled.
203struct Cancel {
204    reason: Reason,
205    signal: zx::EventPair,
206}
207
208/// A reference count of the scheduled requests, broken down by reason.
209#[derive(Debug)]
210struct ScheduledRequests {
211    system_update: usize,
212    netstack_migration: usize,
213    // Inspect properties to report the counters from above.
214    system_update_inspect_prop: fuchsia_inspect::types::UintProperty,
215    netstack_migration_inspect_prop: fuchsia_inspect::types::UintProperty,
216}
217
218impl ScheduledRequests {
219    fn new(inspect_node: &fuchsia_inspect::types::Node) -> Self {
220        Self {
221            system_update: 0,
222            netstack_migration: 0,
223            system_update_inspect_prop: inspect_node.create_uint("SystemUpdate", 0),
224            netstack_migration_inspect_prop: inspect_node.create_uint("NetstackMigration", 0),
225        }
226    }
227
228    fn schedule(&mut self, reason: Reason) {
229        let (rc, inspect_prop) = match reason {
230            Reason::SystemUpdate => (&mut self.system_update, &self.system_update_inspect_prop),
231            Reason::NetstackMigration => {
232                (&mut self.netstack_migration, &self.netstack_migration_inspect_prop)
233            }
234        };
235        *rc = rc.saturating_add(1);
236        inspect_prop.add(1);
237    }
238
239    fn cancel(&mut self, reason: Reason) {
240        let (rc, inspect_prop) = match reason {
241            Reason::SystemUpdate => (&mut self.system_update, &self.system_update_inspect_prop),
242            Reason::NetstackMigration => {
243                (&mut self.netstack_migration, &self.netstack_migration_inspect_prop)
244            }
245        };
246        *rc = rc.saturating_sub(1);
247        inspect_prop.subtract(1);
248    }
249
250    fn list_reasons(&self) -> Vec<Reason> {
251        let Self {
252            system_update,
253            netstack_migration,
254            system_update_inspect_prop: _,
255            netstack_migration_inspect_prop: _,
256        } = self;
257        let mut reasons = Vec::new();
258        if *system_update != 0 {
259            reasons.push(Reason::SystemUpdate);
260        }
261        if *netstack_migration != 0 {
262            reasons.push(Reason::NetstackMigration);
263        }
264        reasons
265    }
266}
267
268impl std::fmt::Display for ScheduledRequests {
269    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270        let Self {
271            system_update,
272            netstack_migration,
273            system_update_inspect_prop: _,
274            netstack_migration_inspect_prop: _,
275        } = self;
276        write!(f, "SystemUpdate:{system_update}, NetstackMigration:{netstack_migration}")
277    }
278}
279
280/// A type capable of actuating reboots on behalf of collaborative reboot.
281pub(super) trait RebootActuator {
282    /// Perform the reboot with the given reasons.
283    async fn perform_reboot(&self, reasons: Vec<Reason>) -> Result<(), zx::Status>;
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289
290    use std::cell::RefCell;
291
292    use assert_matches::assert_matches;
293    use diagnostics_assertions::assert_data_tree;
294    use fidl::Peered;
295    use fidl_fuchsia_power::CollaborativeRebootInitiatorMarker;
296    use fidl_fuchsia_power_internal::CollaborativeRebootSchedulerMarker;
297    use futures::future::Either;
298    use futures::stream::FuturesUnordered;
299    use futures::FutureExt;
300    use test_case::test_case;
301
302    #[derive(Default)]
303    struct MockRebooter {
304        /// Stores if `perform_reboot` has been called, and with what arguments.
305        ///
306        /// Wrapped in a `RefCell` to allow interior mutability. Because the
307        /// tests are single threaded, this isn't problematic.
308        reasons: RefCell<Option<Vec<Reason>>>,
309    }
310
311    impl RebootActuator for MockRebooter {
312        async fn perform_reboot(&self, reasons: Vec<Reason>) -> Result<(), zx::Status> {
313            let original_reasons = self.reasons.borrow_mut().replace(reasons);
314            // The mock only expects to have `perform_reboot` called once.
315            assert_eq!(None, original_reasons);
316            Ok(())
317        }
318    }
319
320    #[test_case(vec![] => false; "no_pending_reboot")]
321    #[test_case(vec![Reason::SystemUpdate] => true; "system_update")]
322    #[test_case(vec![Reason::NetstackMigration] => true; "netstack_migration")]
323    #[test_case(vec![Reason::SystemUpdate, Reason::NetstackMigration] => true; "different_reasons")]
324    #[test_case(vec![Reason::SystemUpdate, Reason::SystemUpdate] => true; "same_reasons")]
325    #[fuchsia_async::run_singlethreaded(test)]
326    async fn collaborative_reboot(mut reasons: Vec<Reason>) -> bool {
327        // Note: this test doesn't exercise cancellations.
328        let inspector =
329            fuchsia_inspect::Inspector::new(fuchsia_inspect::InspectorConfig::default());
330        let (state, _cancellations) = new(&inspector);
331
332        let (scheduler_client, scheduler_request_stream) =
333            fidl::endpoints::create_request_stream::<CollaborativeRebootSchedulerMarker>();
334        let scheduler = scheduler_client.into_proxy();
335        let (initiator_client, initiator_request_stream) =
336            fidl::endpoints::create_request_stream::<CollaborativeRebootInitiatorMarker>();
337        let initiator = initiator_client.into_proxy();
338
339        // Schedule reboots for each reason (and drive the server
340        // implementation).
341        let schedule_server_fut = state.handle_scheduler_requests(scheduler_request_stream).fuse();
342        futures::pin_mut!(schedule_server_fut);
343        for reason in &reasons {
344            futures::select!(
345                result = scheduler.schedule_reboot(*reason, None).fuse() => {
346                    result.expect("failed to schedule reboot");
347                },
348                () = schedule_server_fut => {
349                    unreachable!("The `Scheduler` protocol worker shouldn't exit");
350                }
351            );
352        }
353
354        // Initiate the reboot (and drive the server implementation).
355        let mock = MockRebooter::default();
356        let PerformPendingRebootResponse { rebooting, __source_breaking } = futures::select!(
357            result = initiator.perform_pending_reboot().fuse() => {
358                result.expect("failed to initate reboot")
359            },
360            () = state.handle_initiator_requests(initiator_request_stream, &mock).fuse() => {
361                unreachable!("The `Initiator` protocol worker shouldn't exit.");
362            }
363        );
364
365        // Verify that the correct reasons were reported
366        let expected_reasons = if reasons.is_empty() {
367            None
368        } else {
369            // De-duplicate the list. If the same reason is scheduled multiple
370            // times, it's only expected to be reported once.
371            reasons.sort();
372            reasons.dedup();
373            Some(reasons)
374        };
375        assert_eq!(*mock.reasons.borrow(), expected_reasons);
376
377        rebooting.expect("rebooting should be present")
378    }
379
380    // Specifies how a request should be cancelled, if at all.
381    enum Cancel {
382        None,
383        UserSignal,
384        Drop,
385    }
386
387    #[test_case(vec![(Reason::SystemUpdate, Cancel::None)] => true; "not_canceled")]
388    #[test_case(vec![(Reason::SystemUpdate, Cancel::UserSignal)] => false; "canceled")]
389    #[test_case(vec![(Reason::SystemUpdate, Cancel::Drop)] => false; "canceled_with_drop")]
390    #[test_case(vec![
391        (Reason::SystemUpdate, Cancel::UserSignal), (Reason::SystemUpdate, Cancel::None)
392        ] => true; "same_reasons_only_one_canceled")]
393    #[test_case(vec![
394        (Reason::SystemUpdate, Cancel::UserSignal), (Reason::SystemUpdate, Cancel::UserSignal)
395        ] => false; "same_reasons_both_canceled")]
396    #[test_case(vec![
397        (Reason::SystemUpdate, Cancel::UserSignal), (Reason::NetstackMigration, Cancel::None)
398        ] => true; "different_reasons_only_one_canceled")]
399    #[test_case(vec![
400        (Reason::SystemUpdate, Cancel::UserSignal), (Reason::NetstackMigration, Cancel::UserSignal)
401        ] => false; "different_reasons_both_canceled")]
402    #[fuchsia::test]
403    fn cancellation(requests: Vec<(Reason, Cancel)>) -> bool {
404        // Note: This test requires partially driving the cancellation worker,
405        // so we need direct access to the executor.
406        let mut exec = fuchsia_async::TestExecutor::new();
407
408        let inspector =
409            fuchsia_inspect::Inspector::new(fuchsia_inspect::InspectorConfig::default());
410        let (state, cancellations_worker) = new(&inspector);
411
412        let (scheduler_client, scheduler_request_stream) =
413            fidl::endpoints::create_request_stream::<CollaborativeRebootSchedulerMarker>();
414        let scheduler = scheduler_client.into_proxy();
415        let (initiator_client, initiator_request_stream) =
416            fidl::endpoints::create_request_stream::<CollaborativeRebootInitiatorMarker>();
417        let initiator = initiator_client.into_proxy();
418
419        let mut cancellation_signals = vec![];
420        // Note: We need to hold onto the uncancelled_signals, as dropping them
421        // would result in the request being canceled.
422        let mut uncancelled_signals = vec![];
423        let mut uncancelled_reasons = vec![];
424
425        // Schedule reboots for each reason (and drive the server
426        // implementation).
427        let schedule_client_fut = FuturesUnordered::new();
428        for (reason, should_cancel) in requests {
429            let (ep1, ep2) = zx::EventPair::create();
430            schedule_client_fut.push(scheduler.schedule_reboot(reason, Some(ep2)));
431            match should_cancel {
432                Cancel::None => {
433                    uncancelled_signals.push(ep1);
434                    uncancelled_reasons.push(reason);
435                }
436                Cancel::UserSignal => cancellation_signals.push(ep1),
437                Cancel::Drop => std::mem::drop(ep1),
438            }
439        }
440        let schedule_client_fut = schedule_client_fut
441            .for_each(|result| futures::future::ready(result.expect("failed to schedule_reboot")));
442        futures::pin_mut!(schedule_client_fut);
443        let schedule_server_fut = state.handle_scheduler_requests(scheduler_request_stream);
444        futures::pin_mut!(schedule_server_fut);
445        let schedule_fut =
446            futures::future::select(schedule_client_fut, schedule_server_fut).map(|result| {
447                match result {
448                    Either::Left(((), _server_fut)) => {}
449                    Either::Right(((), _client_fut)) => {
450                        unreachable!("The `Scheduler` protocol worker shouldn't exit")
451                    }
452                }
453            });
454        futures::pin_mut!(schedule_fut);
455        exec.run_singlethreaded(&mut schedule_fut);
456
457        // Cancel some of the requests, and drive the cancellations worker
458        // until it stalls (indicating it's processed all cancellations).
459        for cancellation in cancellation_signals {
460            cancellation
461                .signal_peer(zx::Signals::NONE, zx::Signals::USER_0)
462                .expect("failed to cancel reboot");
463        }
464        let cancellation_fut = cancellations_worker.run();
465        futures::pin_mut!(cancellation_fut);
466        assert_eq!(futures::task::Poll::Pending, exec.run_until_stalled(&mut cancellation_fut));
467
468        // Initiate the reboot (and drive the server implementation).
469        let mock = MockRebooter::default();
470        let initiate_client_fut = initiator.perform_pending_reboot();
471        futures::pin_mut!(initiate_client_fut);
472        let initiate_server_fut = state.handle_initiator_requests(initiator_request_stream, &mock);
473        futures::pin_mut!(initiate_server_fut);
474        let initiate_fut =
475            futures::future::select(initiate_client_fut, initiate_server_fut).map(|result| {
476                match result {
477                    Either::Left((result, _server_fut)) => {
478                        result.expect("failed to initate reboot")
479                    }
480                    Either::Right(((), _client_fut)) => {
481                        unreachable!("The `Initiator` protocol worker shouldn't exit")
482                    }
483                }
484            });
485        futures::pin_mut!(initiate_fut);
486        let PerformPendingRebootResponse { rebooting, __source_breaking } =
487            exec.run_singlethreaded(&mut initiate_fut);
488
489        // Verify that the correct reasons were reported
490        let expected_reasons = if uncancelled_reasons.is_empty() {
491            None
492        } else {
493            // De-duplicate the list. If the same reason is scheduled multiple
494            // times, it's only expected to be reported once.
495            uncancelled_reasons.sort();
496            uncancelled_reasons.dedup();
497            Some(uncancelled_reasons)
498        };
499        assert_eq!(*mock.reasons.borrow(), expected_reasons);
500
501        rebooting.expect("rebooting should be present")
502    }
503
504    struct MockRebooterWithError {
505        /// The error status to return for each call to `perform_reboot`.
506        error: zx::Status,
507    }
508
509    impl RebootActuator for MockRebooterWithError {
510        async fn perform_reboot(&self, _reasons: Vec<Reason>) -> Result<(), zx::Status> {
511            Err(self.error)
512        }
513    }
514
515    #[test_case(zx::Status::ALREADY_EXISTS, true; "already_exists")]
516    #[test_case(zx::Status::INTERNAL, false; "internal")]
517    #[fuchsia::test]
518    async fn reboot_error(error: zx::Status, should_succeed: bool) {
519        let inspector =
520            fuchsia_inspect::Inspector::new(fuchsia_inspect::InspectorConfig::default());
521        let (state, _cancellations) = new(&inspector);
522
523        let (scheduler_client, scheduler_request_stream) =
524            fidl::endpoints::create_request_stream::<CollaborativeRebootSchedulerMarker>();
525        let scheduler = scheduler_client.into_proxy();
526        let (initiator_client, initiator_request_stream) =
527            fidl::endpoints::create_request_stream::<CollaborativeRebootInitiatorMarker>();
528        let initiator = initiator_client.into_proxy();
529
530        // Schedule a reboot with an arbitrary reason (and drive the server
531        // implementation).
532        let schedule_server_fut = state.handle_scheduler_requests(scheduler_request_stream).fuse();
533        futures::pin_mut!(schedule_server_fut);
534        futures::select!(
535            result = scheduler.schedule_reboot(Reason::SystemUpdate, None).fuse() => {
536                result.expect("failed to schedule reboot");
537            },
538            () = schedule_server_fut => {
539                unreachable!("The `Scheduler` protocol worker shouldn't exit");
540            }
541        );
542
543        // Initiate the reboot (and drive the server implementation).
544        let mock = MockRebooterWithError { error };
545        if should_succeed {
546            let result = futures::select!(
547                result = initiator.perform_pending_reboot().fuse() => result,
548                () = state.handle_initiator_requests(initiator_request_stream, &mock).fuse() => {
549                    unreachable!("The `Initiator` protocol worker shouldn't exit.");
550                }
551            );
552            assert_matches!(result, Ok(_))
553        } else {
554            let (result, ()) = futures::join!(
555                initiator.perform_pending_reboot().fuse(),
556                state.handle_initiator_requests(initiator_request_stream, &mock).fuse()
557            );
558            assert_matches!(result, Err(_))
559        }
560    }
561
562    #[fuchsia_async::run_singlethreaded(test)]
563    async fn inspect() {
564        let inspector =
565            fuchsia_inspect::Inspector::new(fuchsia_inspect::InspectorConfig::default());
566        let (state, cancellations_worker) = new(&inspector);
567
568        let mut cancellation_signals = vec![];
569        let (scheduler_client, scheduler_request_stream) =
570            fidl::endpoints::create_request_stream::<CollaborativeRebootSchedulerMarker>();
571        let scheduler = scheduler_client.into_proxy();
572        let schedule_server_fut = state.handle_scheduler_requests(scheduler_request_stream).fuse();
573        futures::pin_mut!(schedule_server_fut);
574
575        // At the start, expect the tree to be initialized with 0 values.
576        assert_data_tree!(inspector, root: {
577            "CollaborativeReboot": {
578                "ScheduledRequests": {
579                    "SystemUpdate": 0u64,
580                    "NetstackMigration": 0u64,
581                }
582            }
583        });
584
585        // Schedule a `SystemUpdate` reboot, twice, and verify the inspect data.
586        for count in [1u64, 2u64] {
587            let (ep1, ep2) = zx::EventPair::create();
588            cancellation_signals.push(ep1);
589            futures::select!(
590                result = scheduler.schedule_reboot(
591                    Reason::SystemUpdate, Some(ep2)).fuse() => {
592                        result.expect("failed to schedule reboot");
593                    },
594                () = schedule_server_fut => {
595                    unreachable!("The `Scheduler` protocol worker shouldn't exit");
596                }
597            );
598            assert_data_tree!(inspector, root: {
599                "CollaborativeReboot": {
600                    "ScheduledRequests": {
601                        "SystemUpdate": count,
602                        "NetstackMigration": 0u64,
603                    }
604                }
605            });
606        }
607
608        // Schedule a `NetstackMigration` reboot, and verify the inspect data.
609        futures::select!(
610            result = scheduler.schedule_reboot(Reason::NetstackMigration, None).fuse() => {
611                    result.expect("failed to schedule reboot");
612                },
613            () = schedule_server_fut => {
614                unreachable!("The `Scheduler` protocol worker shouldn't exit");
615            }
616        );
617        assert_data_tree!(inspector, root: {
618            "CollaborativeReboot": {
619                "ScheduledRequests": {
620                    "SystemUpdate": 2u64,
621                    "NetstackMigration": 1u64,
622                }
623            }
624        });
625
626        // Cancel the `SystemUpdate` requests, and verify the inspect data.
627        std::mem::drop(cancellation_signals);
628        state.cancellation_sender.close_channel();
629        cancellations_worker.run().await;
630        assert_data_tree!(inspector, root: {
631            "CollaborativeReboot": {
632                "ScheduledRequests": {
633                    "SystemUpdate": 0u64,
634                    "NetstackMigration": 1u64,
635                }
636            }
637        });
638    }
639}