1use fidl_fuchsia_power::{
16 CollaborativeRebootInitiatorPerformPendingRebootResponse as PerformPendingRebootResponse,
17 CollaborativeRebootInitiatorRequest as InitiatorRequest,
18 CollaborativeRebootInitiatorRequestStream as InitiatorRequestStream,
19};
20use fidl_fuchsia_power_internal::{
21 CollaborativeRebootReason as Reason, CollaborativeRebootSchedulerRequest as SchedulerRequest,
22 CollaborativeRebootSchedulerRequestStream as SchedulerRequestStream,
23};
24use fuchsia_async::OnSignals;
25use fuchsia_inspect::NumericProperty;
26use fuchsia_sync::Mutex;
27use futures::{StreamExt, TryStreamExt};
28use std::sync::Arc;
29
30const CANCELLATION_SIGNALS: zx::Signals =
32 zx::Signals::USER_ALL.union(zx::Signals::OBJECT_PEER_CLOSED);
33
34pub(super) fn new(inspector: &fuchsia_inspect::Inspector) -> (State, Cancellations) {
36 let (cancellation_sender, cancellation_receiver) = futures::channel::mpsc::unbounded();
37 let cr_node = inspector.root().create_child("CollaborativeReboot");
38 let scheduled_requests_node = cr_node.create_child("ScheduledRequests");
39
40 let scheduled_requests: Arc<Mutex<ScheduledRequests>> =
41 Arc::new(Mutex::new(ScheduledRequests::new(&scheduled_requests_node)));
42 (
43 State {
44 scheduled_requests: scheduled_requests.clone(),
45 cancellation_sender,
46 _inspect_nodes: [cr_node, scheduled_requests_node],
47 },
48 Cancellations { scheduled_requests, cancellation_receiver },
49 )
50}
51
52#[derive(Debug)]
53pub(super) struct State {
55 scheduled_requests: Arc<Mutex<ScheduledRequests>>,
59
60 cancellation_sender: futures::channel::mpsc::UnboundedSender<Cancel>,
64
65 _inspect_nodes: [fuchsia_inspect::types::Node; 2],
70}
71
72impl State {
73 pub(super) async fn handle_scheduler_requests(&self, mut stream: SchedulerRequestStream) {
75 while let Ok(Some(request)) = stream.try_next().await {
76 match request {
77 SchedulerRequest::ScheduleReboot { reason, cancel, responder } => {
78 {
79 let mut scheduled_requests = self.scheduled_requests.lock();
80 scheduled_requests.schedule(reason);
81 println!(
82 "[shutdown-shim] Collaborative reboot scheduled for reason: \
83 {reason:?}. Current scheduled requests: {scheduled_requests}"
84 );
85 }
86 if let Some(cancel) = cancel {
87 self.cancellation_sender
88 .unbounded_send(Cancel { signal: cancel, reason })
89 .expect("receiver should not close");
90 }
91 match responder.send() {
92 Ok(()) => {}
93 Err(e) => {
94 eprintln!(
95 "[shutdown-shim] Failed to respond to 'ScheduleReboot': {e:?}"
96 );
97 return;
99 }
100 }
101 }
102 }
103 }
104 }
105
106 pub(super) async fn handle_initiator_requests<R: RebootActuator>(
108 &self,
109 mut stream: InitiatorRequestStream,
110 actuator: &R,
111 ) {
112 while let Ok(Some(request)) = stream.try_next().await {
113 match request {
114 InitiatorRequest::PerformPendingReboot { responder } => {
115 let reboot_reasons = {
116 let scheduled_requests = self.scheduled_requests.lock();
117 println!(
118 "[shutdown-shim] Asked to perform collaborative reboot. \
119 Current scheduled requests: {scheduled_requests}"
120 );
121 scheduled_requests.list_reasons()
122 };
123
124 let rebooting = !reboot_reasons.is_empty();
125
126 if rebooting {
127 println!("[shutdown-shim] Performing collaborative reboot ...");
128 match actuator.perform_reboot(reboot_reasons).await {
129 Ok(()) | Err(zx::Status::ALREADY_EXISTS) => {}
130 Err(status) => {
131 eprintln!("[shutdown-shim] Failed to perform reboot: {status:?}");
132 return;
134 }
135 }
136 }
137
138 match responder.send(&PerformPendingRebootResponse {
139 rebooting: Some(rebooting),
140 __source_breaking: fidl::marker::SourceBreaking,
141 }) {
142 Ok(()) => {}
143 Err(e) => {
144 eprintln!(
145 "[shutdown-shim] Failed to respond to 'PerformPendingReboot': {e:?}"
146 );
147 return;
149 }
150 }
151 }
152 }
153 }
154 }
155}
156
157pub(super) struct Cancellations {
159 scheduled_requests: Arc<Mutex<ScheduledRequests>>,
163
164 cancellation_receiver: futures::channel::mpsc::UnboundedReceiver<Cancel>,
168}
169
170impl Cancellations {
171 pub(super) async fn run(self) -> () {
176 let Self { scheduled_requests, cancellation_receiver } = self;
177 cancellation_receiver
178 .for_each_concurrent(None, |Cancel { reason, signal }| {
179 let scheduled_requests = scheduled_requests.clone();
180 async move {
181 let _signals = OnSignals::new(signal, CANCELLATION_SIGNALS)
186 .await
187 .expect("failed to wait for signals on eventpair");
188 {
189 let mut scheduled_requests = scheduled_requests.lock();
190 scheduled_requests.cancel(reason);
191 println!(
192 "[shutdown-shim] Collaborative reboot canceled for reason: {reason:?}. \
193 Current scheduled requests: {scheduled_requests}"
194 );
195 }
196 }
197 })
198 .await
199 }
200}
201
202struct Cancel {
204 reason: Reason,
205 signal: zx::EventPair,
206}
207
208#[derive(Debug)]
210struct ScheduledRequests {
211 system_update: usize,
212 netstack_migration: usize,
213 system_update_inspect_prop: fuchsia_inspect::types::UintProperty,
215 netstack_migration_inspect_prop: fuchsia_inspect::types::UintProperty,
216}
217
218impl ScheduledRequests {
219 fn new(inspect_node: &fuchsia_inspect::types::Node) -> Self {
220 Self {
221 system_update: 0,
222 netstack_migration: 0,
223 system_update_inspect_prop: inspect_node.create_uint("SystemUpdate", 0),
224 netstack_migration_inspect_prop: inspect_node.create_uint("NetstackMigration", 0),
225 }
226 }
227
228 fn schedule(&mut self, reason: Reason) {
229 let (rc, inspect_prop) = match reason {
230 Reason::SystemUpdate => (&mut self.system_update, &self.system_update_inspect_prop),
231 Reason::NetstackMigration => {
232 (&mut self.netstack_migration, &self.netstack_migration_inspect_prop)
233 }
234 };
235 *rc = rc.saturating_add(1);
236 inspect_prop.add(1);
237 }
238
239 fn cancel(&mut self, reason: Reason) {
240 let (rc, inspect_prop) = match reason {
241 Reason::SystemUpdate => (&mut self.system_update, &self.system_update_inspect_prop),
242 Reason::NetstackMigration => {
243 (&mut self.netstack_migration, &self.netstack_migration_inspect_prop)
244 }
245 };
246 *rc = rc.saturating_sub(1);
247 inspect_prop.subtract(1);
248 }
249
250 fn list_reasons(&self) -> Vec<Reason> {
251 let Self {
252 system_update,
253 netstack_migration,
254 system_update_inspect_prop: _,
255 netstack_migration_inspect_prop: _,
256 } = self;
257 let mut reasons = Vec::new();
258 if *system_update != 0 {
259 reasons.push(Reason::SystemUpdate);
260 }
261 if *netstack_migration != 0 {
262 reasons.push(Reason::NetstackMigration);
263 }
264 reasons
265 }
266}
267
268impl std::fmt::Display for ScheduledRequests {
269 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270 let Self {
271 system_update,
272 netstack_migration,
273 system_update_inspect_prop: _,
274 netstack_migration_inspect_prop: _,
275 } = self;
276 write!(f, "SystemUpdate:{system_update}, NetstackMigration:{netstack_migration}")
277 }
278}
279
280pub(super) trait RebootActuator {
282 async fn perform_reboot(&self, reasons: Vec<Reason>) -> Result<(), zx::Status>;
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289
290 use std::cell::RefCell;
291
292 use assert_matches::assert_matches;
293 use diagnostics_assertions::assert_data_tree;
294 use fidl::Peered;
295 use fidl_fuchsia_power::CollaborativeRebootInitiatorMarker;
296 use fidl_fuchsia_power_internal::CollaborativeRebootSchedulerMarker;
297 use futures::future::Either;
298 use futures::stream::FuturesUnordered;
299 use futures::FutureExt;
300 use test_case::test_case;
301
302 #[derive(Default)]
303 struct MockRebooter {
304 reasons: RefCell<Option<Vec<Reason>>>,
309 }
310
311 impl RebootActuator for MockRebooter {
312 async fn perform_reboot(&self, reasons: Vec<Reason>) -> Result<(), zx::Status> {
313 let original_reasons = self.reasons.borrow_mut().replace(reasons);
314 assert_eq!(None, original_reasons);
316 Ok(())
317 }
318 }
319
320 #[test_case(vec![] => false; "no_pending_reboot")]
321 #[test_case(vec![Reason::SystemUpdate] => true; "system_update")]
322 #[test_case(vec![Reason::NetstackMigration] => true; "netstack_migration")]
323 #[test_case(vec![Reason::SystemUpdate, Reason::NetstackMigration] => true; "different_reasons")]
324 #[test_case(vec![Reason::SystemUpdate, Reason::SystemUpdate] => true; "same_reasons")]
325 #[fuchsia_async::run_singlethreaded(test)]
326 async fn collaborative_reboot(mut reasons: Vec<Reason>) -> bool {
327 let inspector =
329 fuchsia_inspect::Inspector::new(fuchsia_inspect::InspectorConfig::default());
330 let (state, _cancellations) = new(&inspector);
331
332 let (scheduler_client, scheduler_request_stream) =
333 fidl::endpoints::create_request_stream::<CollaborativeRebootSchedulerMarker>();
334 let scheduler = scheduler_client.into_proxy();
335 let (initiator_client, initiator_request_stream) =
336 fidl::endpoints::create_request_stream::<CollaborativeRebootInitiatorMarker>();
337 let initiator = initiator_client.into_proxy();
338
339 let schedule_server_fut = state.handle_scheduler_requests(scheduler_request_stream).fuse();
342 futures::pin_mut!(schedule_server_fut);
343 for reason in &reasons {
344 futures::select!(
345 result = scheduler.schedule_reboot(*reason, None).fuse() => {
346 result.expect("failed to schedule reboot");
347 },
348 () = schedule_server_fut => {
349 unreachable!("The `Scheduler` protocol worker shouldn't exit");
350 }
351 );
352 }
353
354 let mock = MockRebooter::default();
356 let PerformPendingRebootResponse { rebooting, __source_breaking } = futures::select!(
357 result = initiator.perform_pending_reboot().fuse() => {
358 result.expect("failed to initate reboot")
359 },
360 () = state.handle_initiator_requests(initiator_request_stream, &mock).fuse() => {
361 unreachable!("The `Initiator` protocol worker shouldn't exit.");
362 }
363 );
364
365 let expected_reasons = if reasons.is_empty() {
367 None
368 } else {
369 reasons.sort();
372 reasons.dedup();
373 Some(reasons)
374 };
375 assert_eq!(*mock.reasons.borrow(), expected_reasons);
376
377 rebooting.expect("rebooting should be present")
378 }
379
380 enum Cancel {
382 None,
383 UserSignal,
384 Drop,
385 }
386
387 #[test_case(vec![(Reason::SystemUpdate, Cancel::None)] => true; "not_canceled")]
388 #[test_case(vec![(Reason::SystemUpdate, Cancel::UserSignal)] => false; "canceled")]
389 #[test_case(vec![(Reason::SystemUpdate, Cancel::Drop)] => false; "canceled_with_drop")]
390 #[test_case(vec![
391 (Reason::SystemUpdate, Cancel::UserSignal), (Reason::SystemUpdate, Cancel::None)
392 ] => true; "same_reasons_only_one_canceled")]
393 #[test_case(vec![
394 (Reason::SystemUpdate, Cancel::UserSignal), (Reason::SystemUpdate, Cancel::UserSignal)
395 ] => false; "same_reasons_both_canceled")]
396 #[test_case(vec![
397 (Reason::SystemUpdate, Cancel::UserSignal), (Reason::NetstackMigration, Cancel::None)
398 ] => true; "different_reasons_only_one_canceled")]
399 #[test_case(vec![
400 (Reason::SystemUpdate, Cancel::UserSignal), (Reason::NetstackMigration, Cancel::UserSignal)
401 ] => false; "different_reasons_both_canceled")]
402 #[fuchsia::test]
403 fn cancellation(requests: Vec<(Reason, Cancel)>) -> bool {
404 let mut exec = fuchsia_async::TestExecutor::new();
407
408 let inspector =
409 fuchsia_inspect::Inspector::new(fuchsia_inspect::InspectorConfig::default());
410 let (state, cancellations_worker) = new(&inspector);
411
412 let (scheduler_client, scheduler_request_stream) =
413 fidl::endpoints::create_request_stream::<CollaborativeRebootSchedulerMarker>();
414 let scheduler = scheduler_client.into_proxy();
415 let (initiator_client, initiator_request_stream) =
416 fidl::endpoints::create_request_stream::<CollaborativeRebootInitiatorMarker>();
417 let initiator = initiator_client.into_proxy();
418
419 let mut cancellation_signals = vec![];
420 let mut uncancelled_signals = vec![];
423 let mut uncancelled_reasons = vec![];
424
425 let schedule_client_fut = FuturesUnordered::new();
428 for (reason, should_cancel) in requests {
429 let (ep1, ep2) = zx::EventPair::create();
430 schedule_client_fut.push(scheduler.schedule_reboot(reason, Some(ep2)));
431 match should_cancel {
432 Cancel::None => {
433 uncancelled_signals.push(ep1);
434 uncancelled_reasons.push(reason);
435 }
436 Cancel::UserSignal => cancellation_signals.push(ep1),
437 Cancel::Drop => std::mem::drop(ep1),
438 }
439 }
440 let schedule_client_fut = schedule_client_fut
441 .for_each(|result| futures::future::ready(result.expect("failed to schedule_reboot")));
442 futures::pin_mut!(schedule_client_fut);
443 let schedule_server_fut = state.handle_scheduler_requests(scheduler_request_stream);
444 futures::pin_mut!(schedule_server_fut);
445 let schedule_fut =
446 futures::future::select(schedule_client_fut, schedule_server_fut).map(|result| {
447 match result {
448 Either::Left(((), _server_fut)) => {}
449 Either::Right(((), _client_fut)) => {
450 unreachable!("The `Scheduler` protocol worker shouldn't exit")
451 }
452 }
453 });
454 futures::pin_mut!(schedule_fut);
455 exec.run_singlethreaded(&mut schedule_fut);
456
457 for cancellation in cancellation_signals {
460 cancellation
461 .signal_peer(zx::Signals::NONE, zx::Signals::USER_0)
462 .expect("failed to cancel reboot");
463 }
464 let cancellation_fut = cancellations_worker.run();
465 futures::pin_mut!(cancellation_fut);
466 assert_eq!(futures::task::Poll::Pending, exec.run_until_stalled(&mut cancellation_fut));
467
468 let mock = MockRebooter::default();
470 let initiate_client_fut = initiator.perform_pending_reboot();
471 futures::pin_mut!(initiate_client_fut);
472 let initiate_server_fut = state.handle_initiator_requests(initiator_request_stream, &mock);
473 futures::pin_mut!(initiate_server_fut);
474 let initiate_fut =
475 futures::future::select(initiate_client_fut, initiate_server_fut).map(|result| {
476 match result {
477 Either::Left((result, _server_fut)) => {
478 result.expect("failed to initate reboot")
479 }
480 Either::Right(((), _client_fut)) => {
481 unreachable!("The `Initiator` protocol worker shouldn't exit")
482 }
483 }
484 });
485 futures::pin_mut!(initiate_fut);
486 let PerformPendingRebootResponse { rebooting, __source_breaking } =
487 exec.run_singlethreaded(&mut initiate_fut);
488
489 let expected_reasons = if uncancelled_reasons.is_empty() {
491 None
492 } else {
493 uncancelled_reasons.sort();
496 uncancelled_reasons.dedup();
497 Some(uncancelled_reasons)
498 };
499 assert_eq!(*mock.reasons.borrow(), expected_reasons);
500
501 rebooting.expect("rebooting should be present")
502 }
503
504 struct MockRebooterWithError {
505 error: zx::Status,
507 }
508
509 impl RebootActuator for MockRebooterWithError {
510 async fn perform_reboot(&self, _reasons: Vec<Reason>) -> Result<(), zx::Status> {
511 Err(self.error)
512 }
513 }
514
515 #[test_case(zx::Status::ALREADY_EXISTS, true; "already_exists")]
516 #[test_case(zx::Status::INTERNAL, false; "internal")]
517 #[fuchsia::test]
518 async fn reboot_error(error: zx::Status, should_succeed: bool) {
519 let inspector =
520 fuchsia_inspect::Inspector::new(fuchsia_inspect::InspectorConfig::default());
521 let (state, _cancellations) = new(&inspector);
522
523 let (scheduler_client, scheduler_request_stream) =
524 fidl::endpoints::create_request_stream::<CollaborativeRebootSchedulerMarker>();
525 let scheduler = scheduler_client.into_proxy();
526 let (initiator_client, initiator_request_stream) =
527 fidl::endpoints::create_request_stream::<CollaborativeRebootInitiatorMarker>();
528 let initiator = initiator_client.into_proxy();
529
530 let schedule_server_fut = state.handle_scheduler_requests(scheduler_request_stream).fuse();
533 futures::pin_mut!(schedule_server_fut);
534 futures::select!(
535 result = scheduler.schedule_reboot(Reason::SystemUpdate, None).fuse() => {
536 result.expect("failed to schedule reboot");
537 },
538 () = schedule_server_fut => {
539 unreachable!("The `Scheduler` protocol worker shouldn't exit");
540 }
541 );
542
543 let mock = MockRebooterWithError { error };
545 if should_succeed {
546 let result = futures::select!(
547 result = initiator.perform_pending_reboot().fuse() => result,
548 () = state.handle_initiator_requests(initiator_request_stream, &mock).fuse() => {
549 unreachable!("The `Initiator` protocol worker shouldn't exit.");
550 }
551 );
552 assert_matches!(result, Ok(_))
553 } else {
554 let (result, ()) = futures::join!(
555 initiator.perform_pending_reboot().fuse(),
556 state.handle_initiator_requests(initiator_request_stream, &mock).fuse()
557 );
558 assert_matches!(result, Err(_))
559 }
560 }
561
562 #[fuchsia_async::run_singlethreaded(test)]
563 async fn inspect() {
564 let inspector =
565 fuchsia_inspect::Inspector::new(fuchsia_inspect::InspectorConfig::default());
566 let (state, cancellations_worker) = new(&inspector);
567
568 let mut cancellation_signals = vec![];
569 let (scheduler_client, scheduler_request_stream) =
570 fidl::endpoints::create_request_stream::<CollaborativeRebootSchedulerMarker>();
571 let scheduler = scheduler_client.into_proxy();
572 let schedule_server_fut = state.handle_scheduler_requests(scheduler_request_stream).fuse();
573 futures::pin_mut!(schedule_server_fut);
574
575 assert_data_tree!(inspector, root: {
577 "CollaborativeReboot": {
578 "ScheduledRequests": {
579 "SystemUpdate": 0u64,
580 "NetstackMigration": 0u64,
581 }
582 }
583 });
584
585 for count in [1u64, 2u64] {
587 let (ep1, ep2) = zx::EventPair::create();
588 cancellation_signals.push(ep1);
589 futures::select!(
590 result = scheduler.schedule_reboot(
591 Reason::SystemUpdate, Some(ep2)).fuse() => {
592 result.expect("failed to schedule reboot");
593 },
594 () = schedule_server_fut => {
595 unreachable!("The `Scheduler` protocol worker shouldn't exit");
596 }
597 );
598 assert_data_tree!(inspector, root: {
599 "CollaborativeReboot": {
600 "ScheduledRequests": {
601 "SystemUpdate": count,
602 "NetstackMigration": 0u64,
603 }
604 }
605 });
606 }
607
608 futures::select!(
610 result = scheduler.schedule_reboot(Reason::NetstackMigration, None).fuse() => {
611 result.expect("failed to schedule reboot");
612 },
613 () = schedule_server_fut => {
614 unreachable!("The `Scheduler` protocol worker shouldn't exit");
615 }
616 );
617 assert_data_tree!(inspector, root: {
618 "CollaborativeReboot": {
619 "ScheduledRequests": {
620 "SystemUpdate": 2u64,
621 "NetstackMigration": 1u64,
622 }
623 }
624 });
625
626 std::mem::drop(cancellation_signals);
628 state.cancellation_sender.close_channel();
629 cancellations_worker.run().await;
630 assert_data_tree!(inspector, root: {
631 "CollaborativeReboot": {
632 "ScheduledRequests": {
633 "SystemUpdate": 0u64,
634 "NetstackMigration": 1u64,
635 }
636 }
637 });
638 }
639}