shutdown_shim/
collaborative_reboot.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! An implementation of collaborative reboot for Fuchsia.
6//!
7//! Collaborative reboot is a mechanism that allows multiple actors to work
8//! together to schedule a device reboot at a time that avoids user disruption.
9//!
10//! Scheduler actors express their desire to reboot the device by issuing
11//! `ScheduleReboot` requests. While the Initiator actor expresses when it is an
12//! appropriate time to reboot the device by issuing a `PerformPendingReboot`
13//! request.
14
15use fidl_fuchsia_power::{
16    CollaborativeRebootInitiatorPerformPendingRebootResponse as PerformPendingRebootResponse,
17    CollaborativeRebootInitiatorRequest as InitiatorRequest,
18    CollaborativeRebootInitiatorRequestStream as InitiatorRequestStream,
19};
20use fidl_fuchsia_power_internal::{
21    CollaborativeRebootReason as Reason, CollaborativeRebootSchedulerRequest as SchedulerRequest,
22    CollaborativeRebootSchedulerRequestStream as SchedulerRequestStream,
23};
24use fuchsia_async::OnSignals;
25use fuchsia_inspect::NumericProperty;
26use fuchsia_sync::Mutex;
27use futures::{StreamExt, TryStreamExt};
28use std::sync::Arc;
29
30/// The signals that can be used to cancel a scheduled collaborative reboot.
31const CANCELLATION_SIGNALS: zx::Signals =
32    zx::Signals::USER_ALL.union(zx::Signals::OBJECT_PEER_CLOSED);
33
34/// Construct a new [`State`] and it's paired [`Cancellations`].
35pub(super) fn new(inspector: &fuchsia_inspect::Inspector) -> (State, Cancellations) {
36    let (cancellation_sender, cancellation_receiver) = futures::channel::mpsc::unbounded();
37    let cr_node = inspector.root().create_child("CollaborativeReboot");
38    let scheduled_requests_node = cr_node.create_child("ScheduledRequests");
39
40    let scheduled_requests: Arc<Mutex<ScheduledRequests>> =
41        Arc::new(Mutex::new(ScheduledRequests::new(&scheduled_requests_node)));
42    (
43        State {
44            scheduled_requests: scheduled_requests.clone(),
45            cancellation_sender,
46            _inspect_nodes: [cr_node, scheduled_requests_node],
47        },
48        Cancellations { scheduled_requests, cancellation_receiver },
49    )
50}
51
52#[derive(Debug)]
53/// Collaborative Reboot State.
54pub(super) struct State {
55    /// The current outstanding requests for collaborative reboot.
56    ///
57    /// Also held by [`Cancellations`].
58    scheduled_requests: Arc<Mutex<ScheduledRequests>>,
59
60    /// The sender of cancellation signals.
61    ///
62    /// The receiver half is held by [`Cancellations`].
63    cancellation_sender: futures::channel::mpsc::UnboundedSender<Cancel>,
64
65    /// A container to hold the Inspect nodes for collaborative reboot.
66    ///
67    /// These nodes need to retained, as dropping them would delete the
68    /// underlying Inspect data.
69    _inspect_nodes: [fuchsia_inspect::types::Node; 2],
70}
71
72impl State {
73    /// Handle requests from a [`CollaborativeRebootScheduler`].
74    pub(super) async fn handle_scheduler_requests(&self, mut stream: SchedulerRequestStream) {
75        while let Ok(Some(request)) = stream.try_next().await {
76            match request {
77                SchedulerRequest::ScheduleReboot { reason, cancel, responder } => {
78                    {
79                        let mut scheduled_requests = self.scheduled_requests.lock();
80                        scheduled_requests.schedule(reason);
81                        println!(
82                            "[shutdown-shim] Collaborative reboot scheduled for reason: \
83                            {reason:?}. Current scheduled requests: {scheduled_requests}"
84                        );
85                    }
86                    if let Some(cancel) = cancel {
87                        self.cancellation_sender
88                            .unbounded_send(Cancel { signal: cancel, reason })
89                            .expect("receiver should not close");
90                    }
91                    match responder.send() {
92                        Ok(()) => {}
93                        Err(e) => {
94                            eprintln!(
95                                "[shutdown-shim] Failed to respond to 'ScheduleReboot': {e:?}"
96                            );
97                            // Returning closes the connection.
98                            return;
99                        }
100                    }
101                }
102            }
103        }
104    }
105
106    /// Handle requests from a [`CollaborativeRebootInitiator`].
107    pub(super) async fn handle_initiator_requests<R: RebootActuator>(
108        &self,
109        mut stream: InitiatorRequestStream,
110        actuator: &R,
111    ) {
112        while let Ok(Some(request)) = stream.try_next().await {
113            match request {
114                InitiatorRequest::PerformPendingReboot { responder } => {
115                    let reboot_reasons = {
116                        let scheduled_requests = self.scheduled_requests.lock();
117                        println!(
118                            "[shutdown-shim] Asked to perform collaborative reboot. \
119                            Current scheduled requests: {scheduled_requests}"
120                        );
121                        scheduled_requests.list_reasons()
122                    };
123
124                    let rebooting = !reboot_reasons.is_empty();
125
126                    if rebooting {
127                        println!("[shutdown-shim] Performing collaborative reboot ...");
128                        match actuator.perform_reboot(reboot_reasons).await {
129                            Ok(()) => {}
130                            Err(status) => {
131                                eprintln!("[shutdown-shim] Failed to perform reboot: {status:?}");
132                                // Returning closes the connection.
133                                return;
134                            }
135                        }
136                    }
137
138                    match responder.send(&PerformPendingRebootResponse {
139                        rebooting: Some(rebooting),
140                        __source_breaking: fidl::marker::SourceBreaking,
141                    }) {
142                        Ok(()) => {}
143                        Err(e) => {
144                            eprintln!(
145                                "[shutdown-shim] Failed to respond to 'PerformPendingReboot': {e:?}"
146                            );
147                            // Returning closes the connection.
148                            return;
149                        }
150                    }
151                }
152            }
153        }
154    }
155}
156
157/// Holds cancellation signals for collaborative reboot.
158pub(super) struct Cancellations {
159    /// The current outstanding requests for collaborative reboot.
160    ///
161    /// Also held by [`State`].
162    scheduled_requests: Arc<Mutex<ScheduledRequests>>,
163
164    /// The receiver of cancellation signals.
165    ///
166    /// The sender half is held by [`State`].
167    cancellation_receiver: futures::channel::mpsc::UnboundedReceiver<Cancel>,
168}
169
170impl Cancellations {
171    /// Drives the cancellation of scheduled requests.
172    ///
173    /// The returned future will only exit if the corresponding [`State`] is
174    /// dropped.
175    pub(super) async fn run(self) -> () {
176        let Self { scheduled_requests, cancellation_receiver } = self;
177        cancellation_receiver
178            .for_each_concurrent(None, |Cancel { reason, signal }| {
179                let scheduled_requests = scheduled_requests.clone();
180                async move {
181                    // Note: Panic if we observe an error, as all `Err` statuses
182                    // represent programming errors. See the documentation for
183                    // `zx_object_wait_one`:
184                    // https://fuchsia.dev/reference/syscalls/object_wait_one.
185                    let _signals = OnSignals::new(signal, CANCELLATION_SIGNALS)
186                        .await
187                        .expect("failed to wait for signals on eventpair");
188                    {
189                        let mut scheduled_requests = scheduled_requests.lock();
190                        scheduled_requests.cancel(reason);
191                        println!(
192                            "[shutdown-shim] Collaborative reboot canceled for reason: {reason:?}. \
193                            Current scheduled requests: {scheduled_requests}"
194                        );
195                    }
196                }
197            })
198            .await
199    }
200}
201
202// A tuple of the cancellation signal, and the reason being canceled.
203struct Cancel {
204    reason: Reason,
205    signal: zx::EventPair,
206}
207
208/// A reference count of the scheduled requests, broken down by reason.
209#[derive(Debug)]
210struct ScheduledRequests {
211    system_update: usize,
212    netstack_migration: usize,
213    // Inspect properties to report the counters from above.
214    system_update_inspect_prop: fuchsia_inspect::types::UintProperty,
215    netstack_migration_inspect_prop: fuchsia_inspect::types::UintProperty,
216}
217
218impl ScheduledRequests {
219    fn new(inspect_node: &fuchsia_inspect::types::Node) -> Self {
220        Self {
221            system_update: 0,
222            netstack_migration: 0,
223            system_update_inspect_prop: inspect_node.create_uint("SystemUpdate", 0),
224            netstack_migration_inspect_prop: inspect_node.create_uint("NetstackMigration", 0),
225        }
226    }
227
228    fn schedule(&mut self, reason: Reason) {
229        let (rc, inspect_prop) = match reason {
230            Reason::SystemUpdate => (&mut self.system_update, &self.system_update_inspect_prop),
231            Reason::NetstackMigration => {
232                (&mut self.netstack_migration, &self.netstack_migration_inspect_prop)
233            }
234        };
235        *rc = rc.saturating_add(1);
236        inspect_prop.add(1);
237    }
238
239    fn cancel(&mut self, reason: Reason) {
240        let (rc, inspect_prop) = match reason {
241            Reason::SystemUpdate => (&mut self.system_update, &self.system_update_inspect_prop),
242            Reason::NetstackMigration => {
243                (&mut self.netstack_migration, &self.netstack_migration_inspect_prop)
244            }
245        };
246        *rc = rc.saturating_sub(1);
247        inspect_prop.subtract(1);
248    }
249
250    fn list_reasons(&self) -> Vec<Reason> {
251        let Self {
252            system_update,
253            netstack_migration,
254            system_update_inspect_prop: _,
255            netstack_migration_inspect_prop: _,
256        } = self;
257        let mut reasons = Vec::new();
258        if *system_update != 0 {
259            reasons.push(Reason::SystemUpdate);
260        }
261        if *netstack_migration != 0 {
262            reasons.push(Reason::NetstackMigration);
263        }
264        reasons
265    }
266}
267
268impl std::fmt::Display for ScheduledRequests {
269    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270        let Self {
271            system_update,
272            netstack_migration,
273            system_update_inspect_prop: _,
274            netstack_migration_inspect_prop: _,
275        } = self;
276        write!(f, "SystemUpdate:{system_update}, NetstackMigration:{netstack_migration}")
277    }
278}
279
280/// A type capable of actuating reboots on behalf of collaborative reboot.
281pub(super) trait RebootActuator {
282    /// Perform the reboot with the given reasons.
283    async fn perform_reboot(&self, reasons: Vec<Reason>) -> Result<(), zx::Status>;
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289
290    use std::cell::RefCell;
291
292    use diagnostics_assertions::assert_data_tree;
293    use fidl::Peered;
294    use fidl_fuchsia_power::CollaborativeRebootInitiatorMarker;
295    use fidl_fuchsia_power_internal::CollaborativeRebootSchedulerMarker;
296    use futures::future::Either;
297    use futures::stream::FuturesUnordered;
298    use futures::FutureExt;
299    use test_case::test_case;
300
301    #[derive(Default)]
302    struct MockRebooter {
303        /// Stores if `perform_reboot` has been called, and with what arguments.
304        ///
305        /// Wrapped in a `RefCell` to allow interior mutability. Because the
306        /// tests are single threaded, this isn't problematic.
307        reasons: RefCell<Option<Vec<Reason>>>,
308    }
309
310    impl RebootActuator for MockRebooter {
311        async fn perform_reboot(&self, reasons: Vec<Reason>) -> Result<(), zx::Status> {
312            let original_reasons = self.reasons.borrow_mut().replace(reasons);
313            // The mock only expects to have `perform_reboot` called once.
314            assert_eq!(None, original_reasons);
315            Ok(())
316        }
317    }
318
319    #[test_case(vec![] => false; "no_pending_reboot")]
320    #[test_case(vec![Reason::SystemUpdate] => true; "system_update")]
321    #[test_case(vec![Reason::NetstackMigration] => true; "netstack_migration")]
322    #[test_case(vec![Reason::SystemUpdate, Reason::NetstackMigration] => true; "different_reasons")]
323    #[test_case(vec![Reason::SystemUpdate, Reason::SystemUpdate] => true; "same_reasons")]
324    #[fuchsia_async::run_singlethreaded(test)]
325    async fn collaborative_reboot(mut reasons: Vec<Reason>) -> bool {
326        // Note: this test doesn't exercise cancellations.
327        let inspector =
328            fuchsia_inspect::Inspector::new(fuchsia_inspect::InspectorConfig::default());
329        let (state, _cancellations) = new(&inspector);
330
331        let (scheduler_client, scheduler_request_stream) =
332            fidl::endpoints::create_request_stream::<CollaborativeRebootSchedulerMarker>();
333        let scheduler = scheduler_client.into_proxy();
334        let (initiator_client, initiator_request_stream) =
335            fidl::endpoints::create_request_stream::<CollaborativeRebootInitiatorMarker>();
336        let initiator = initiator_client.into_proxy();
337
338        // Schedule reboots for each reason (and drive the server
339        // implementation).
340        let schedule_server_fut = state.handle_scheduler_requests(scheduler_request_stream).fuse();
341        futures::pin_mut!(schedule_server_fut);
342        for reason in &reasons {
343            futures::select!(
344                result = scheduler.schedule_reboot(*reason, None).fuse() => {
345                    result.expect("failed to schedule reboot");
346                },
347                () = schedule_server_fut => {
348                    unreachable!("The `Scheduler` protocol worker shouldn't exit");
349                }
350            );
351        }
352
353        // Initiate the reboot (and drive the server implementation).
354        let mock = MockRebooter::default();
355        let PerformPendingRebootResponse { rebooting, __source_breaking } = futures::select!(
356            result = initiator.perform_pending_reboot().fuse() => {
357                result.expect("failed to initate reboot")
358            },
359            () = state.handle_initiator_requests(initiator_request_stream, &mock).fuse() => {
360                unreachable!("The `Initiator` protocol worker shouldn't exit.");
361            }
362        );
363
364        // Verify that the correct reasons were reported
365        let expected_reasons = if reasons.is_empty() {
366            None
367        } else {
368            // De-duplicate the list. If the same reason is scheduled multiple
369            // times, it's only expected to be reported once.
370            reasons.sort();
371            reasons.dedup();
372            Some(reasons)
373        };
374        assert_eq!(*mock.reasons.borrow(), expected_reasons);
375
376        rebooting.expect("rebooting should be present")
377    }
378
379    // Specifies how a request should be cancelled, if at all.
380    enum Cancel {
381        None,
382        UserSignal,
383        Drop,
384    }
385
386    #[test_case(vec![(Reason::SystemUpdate, Cancel::None)] => true; "not_canceled")]
387    #[test_case(vec![(Reason::SystemUpdate, Cancel::UserSignal)] => false; "canceled")]
388    #[test_case(vec![(Reason::SystemUpdate, Cancel::Drop)] => false; "canceled_with_drop")]
389    #[test_case(vec![
390        (Reason::SystemUpdate, Cancel::UserSignal), (Reason::SystemUpdate, Cancel::None)
391        ] => true; "same_reasons_only_one_canceled")]
392    #[test_case(vec![
393        (Reason::SystemUpdate, Cancel::UserSignal), (Reason::SystemUpdate, Cancel::UserSignal)
394        ] => false; "same_reasons_both_canceled")]
395    #[test_case(vec![
396        (Reason::SystemUpdate, Cancel::UserSignal), (Reason::NetstackMigration, Cancel::None)
397        ] => true; "different_reasons_only_one_canceled")]
398    #[test_case(vec![
399        (Reason::SystemUpdate, Cancel::UserSignal), (Reason::NetstackMigration, Cancel::UserSignal)
400        ] => false; "different_reasons_both_canceled")]
401    #[fuchsia::test]
402    fn cancellation(requests: Vec<(Reason, Cancel)>) -> bool {
403        // Note: This test requires partially driving the cancellation worker,
404        // so we need direct access to the executor.
405        let mut exec = fuchsia_async::TestExecutor::new();
406
407        let inspector =
408            fuchsia_inspect::Inspector::new(fuchsia_inspect::InspectorConfig::default());
409        let (state, cancellations_worker) = new(&inspector);
410
411        let (scheduler_client, scheduler_request_stream) =
412            fidl::endpoints::create_request_stream::<CollaborativeRebootSchedulerMarker>();
413        let scheduler = scheduler_client.into_proxy();
414        let (initiator_client, initiator_request_stream) =
415            fidl::endpoints::create_request_stream::<CollaborativeRebootInitiatorMarker>();
416        let initiator = initiator_client.into_proxy();
417
418        let mut cancellation_signals = vec![];
419        // Note: We need to hold onto the uncancelled_signals, as dropping them
420        // would result in the request being canceled.
421        let mut uncancelled_signals = vec![];
422        let mut uncancelled_reasons = vec![];
423
424        // Schedule reboots for each reason (and drive the server
425        // implementation).
426        let schedule_client_fut = FuturesUnordered::new();
427        for (reason, should_cancel) in requests {
428            let (ep1, ep2) = zx::EventPair::create();
429            schedule_client_fut.push(scheduler.schedule_reboot(reason, Some(ep2)));
430            match should_cancel {
431                Cancel::None => {
432                    uncancelled_signals.push(ep1);
433                    uncancelled_reasons.push(reason);
434                }
435                Cancel::UserSignal => cancellation_signals.push(ep1),
436                Cancel::Drop => std::mem::drop(ep1),
437            }
438        }
439        let schedule_client_fut = schedule_client_fut
440            .for_each(|result| futures::future::ready(result.expect("failed to schedule_reboot")));
441        futures::pin_mut!(schedule_client_fut);
442        let schedule_server_fut = state.handle_scheduler_requests(scheduler_request_stream);
443        futures::pin_mut!(schedule_server_fut);
444        let schedule_fut =
445            futures::future::select(schedule_client_fut, schedule_server_fut).map(|result| {
446                match result {
447                    Either::Left(((), _server_fut)) => {}
448                    Either::Right(((), _client_fut)) => {
449                        unreachable!("The `Scheduler` protocol worker shouldn't exit")
450                    }
451                }
452            });
453        futures::pin_mut!(schedule_fut);
454        exec.run_singlethreaded(&mut schedule_fut);
455
456        // Cancel some of the requests, and drive the cancellations worker
457        // until it stalls (indicating it's processed all cancellations).
458        for cancellation in cancellation_signals {
459            cancellation
460                .signal_peer(zx::Signals::NONE, zx::Signals::USER_0)
461                .expect("failed to cancel reboot");
462        }
463        let cancellation_fut = cancellations_worker.run();
464        futures::pin_mut!(cancellation_fut);
465        assert_eq!(futures::task::Poll::Pending, exec.run_until_stalled(&mut cancellation_fut));
466
467        // Initiate the reboot (and drive the server implementation).
468        let mock = MockRebooter::default();
469        let initiate_client_fut = initiator.perform_pending_reboot();
470        futures::pin_mut!(initiate_client_fut);
471        let initiate_server_fut = state.handle_initiator_requests(initiator_request_stream, &mock);
472        futures::pin_mut!(initiate_server_fut);
473        let initiate_fut =
474            futures::future::select(initiate_client_fut, initiate_server_fut).map(|result| {
475                match result {
476                    Either::Left((result, _server_fut)) => {
477                        result.expect("failed to initate reboot")
478                    }
479                    Either::Right(((), _client_fut)) => {
480                        unreachable!("The `Initiator` protocol worker shouldn't exit")
481                    }
482                }
483            });
484        futures::pin_mut!(initiate_fut);
485        let PerformPendingRebootResponse { rebooting, __source_breaking } =
486            exec.run_singlethreaded(&mut initiate_fut);
487
488        // Verify that the correct reasons were reported
489        let expected_reasons = if uncancelled_reasons.is_empty() {
490            None
491        } else {
492            // De-duplicate the list. If the same reason is scheduled multiple
493            // times, it's only expected to be reported once.
494            uncancelled_reasons.sort();
495            uncancelled_reasons.dedup();
496            Some(uncancelled_reasons)
497        };
498        assert_eq!(*mock.reasons.borrow(), expected_reasons);
499
500        rebooting.expect("rebooting should be present")
501    }
502
503    #[fuchsia_async::run_singlethreaded(test)]
504    async fn inspect() {
505        let inspector =
506            fuchsia_inspect::Inspector::new(fuchsia_inspect::InspectorConfig::default());
507        let (state, cancellations_worker) = new(&inspector);
508
509        let mut cancellation_signals = vec![];
510        let (scheduler_client, scheduler_request_stream) =
511            fidl::endpoints::create_request_stream::<CollaborativeRebootSchedulerMarker>();
512        let scheduler = scheduler_client.into_proxy();
513        let schedule_server_fut = state.handle_scheduler_requests(scheduler_request_stream).fuse();
514        futures::pin_mut!(schedule_server_fut);
515
516        // At the start, expect the tree to be initialized with 0 values.
517        assert_data_tree!(inspector, root: {
518            "CollaborativeReboot": {
519                "ScheduledRequests": {
520                    "SystemUpdate": 0u64,
521                    "NetstackMigration": 0u64,
522                }
523            }
524        });
525
526        // Schedule a `SystemUpdate` reboot, twice, and verify the inspect data.
527        for count in [1u64, 2u64] {
528            let (ep1, ep2) = zx::EventPair::create();
529            cancellation_signals.push(ep1);
530            futures::select!(
531                result = scheduler.schedule_reboot(
532                    Reason::SystemUpdate, Some(ep2)).fuse() => {
533                        result.expect("failed to schedule reboot");
534                    },
535                () = schedule_server_fut => {
536                    unreachable!("The `Scheduler` protocol worker shouldn't exit");
537                }
538            );
539            assert_data_tree!(inspector, root: {
540                "CollaborativeReboot": {
541                    "ScheduledRequests": {
542                        "SystemUpdate": count,
543                        "NetstackMigration": 0u64,
544                    }
545                }
546            });
547        }
548
549        // Schedule a `NetstackMigration` reboot, and verify the inspect data.
550        futures::select!(
551            result = scheduler.schedule_reboot(Reason::NetstackMigration, None).fuse() => {
552                    result.expect("failed to schedule reboot");
553                },
554            () = schedule_server_fut => {
555                unreachable!("The `Scheduler` protocol worker shouldn't exit");
556            }
557        );
558        assert_data_tree!(inspector, root: {
559            "CollaborativeReboot": {
560                "ScheduledRequests": {
561                    "SystemUpdate": 2u64,
562                    "NetstackMigration": 1u64,
563                }
564            }
565        });
566
567        // Cancel the `SystemUpdate` requests, and verify the inspect data.
568        std::mem::drop(cancellation_signals);
569        state.cancellation_sender.close_channel();
570        cancellations_worker.run().await;
571        assert_data_tree!(inspector, root: {
572            "CollaborativeReboot": {
573                "ScheduledRequests": {
574                    "SystemUpdate": 0u64,
575                    "NetstackMigration": 1u64,
576                }
577            }
578        });
579    }
580}