1use fidl_fuchsia_power::{
16 CollaborativeRebootInitiatorPerformPendingRebootResponse as PerformPendingRebootResponse,
17 CollaborativeRebootInitiatorRequest as InitiatorRequest,
18 CollaborativeRebootInitiatorRequestStream as InitiatorRequestStream,
19};
20use fidl_fuchsia_power_internal::{
21 CollaborativeRebootReason as Reason, CollaborativeRebootSchedulerRequest as SchedulerRequest,
22 CollaborativeRebootSchedulerRequestStream as SchedulerRequestStream,
23};
24use fuchsia_async::OnSignals;
25use fuchsia_inspect::NumericProperty;
26use fuchsia_sync::Mutex;
27use futures::{StreamExt, TryStreamExt};
28use std::sync::Arc;
29
30const CANCELLATION_SIGNALS: zx::Signals =
32 zx::Signals::USER_ALL.union(zx::Signals::OBJECT_PEER_CLOSED);
33
34pub(super) fn new(inspector: &fuchsia_inspect::Inspector) -> (State, Cancellations) {
36 let (cancellation_sender, cancellation_receiver) = futures::channel::mpsc::unbounded();
37 let cr_node = inspector.root().create_child("CollaborativeReboot");
38 let scheduled_requests_node = cr_node.create_child("ScheduledRequests");
39
40 let scheduled_requests: Arc<Mutex<ScheduledRequests>> =
41 Arc::new(Mutex::new(ScheduledRequests::new(&scheduled_requests_node)));
42 (
43 State {
44 scheduled_requests: scheduled_requests.clone(),
45 cancellation_sender,
46 _inspect_nodes: [cr_node, scheduled_requests_node],
47 },
48 Cancellations { scheduled_requests, cancellation_receiver },
49 )
50}
51
52#[derive(Debug)]
53pub(super) struct State {
55 scheduled_requests: Arc<Mutex<ScheduledRequests>>,
59
60 cancellation_sender: futures::channel::mpsc::UnboundedSender<Cancel>,
64
65 _inspect_nodes: [fuchsia_inspect::types::Node; 2],
70}
71
72impl State {
73 pub(super) async fn handle_scheduler_requests(&self, mut stream: SchedulerRequestStream) {
75 while let Ok(Some(request)) = stream.try_next().await {
76 match request {
77 SchedulerRequest::ScheduleReboot { reason, cancel, responder } => {
78 {
79 let mut scheduled_requests = self.scheduled_requests.lock();
80 scheduled_requests.schedule(reason);
81 println!(
82 "[shutdown-shim] Collaborative reboot scheduled for reason: \
83 {reason:?}. Current scheduled requests: {scheduled_requests}"
84 );
85 }
86 if let Some(cancel) = cancel {
87 self.cancellation_sender
88 .unbounded_send(Cancel { signal: cancel, reason })
89 .expect("receiver should not close");
90 }
91 match responder.send() {
92 Ok(()) => {}
93 Err(e) => {
94 eprintln!(
95 "[shutdown-shim] Failed to respond to 'ScheduleReboot': {e:?}"
96 );
97 return;
99 }
100 }
101 }
102 }
103 }
104 }
105
106 pub(super) async fn handle_initiator_requests<R: RebootActuator>(
108 &self,
109 mut stream: InitiatorRequestStream,
110 actuator: &R,
111 ) {
112 while let Ok(Some(request)) = stream.try_next().await {
113 match request {
114 InitiatorRequest::PerformPendingReboot { responder } => {
115 let reboot_reasons = {
116 let scheduled_requests = self.scheduled_requests.lock();
117 println!(
118 "[shutdown-shim] Asked to perform collaborative reboot. \
119 Current scheduled requests: {scheduled_requests}"
120 );
121 scheduled_requests.list_reasons()
122 };
123
124 let rebooting = !reboot_reasons.is_empty();
125
126 if rebooting {
127 println!("[shutdown-shim] Performing collaborative reboot ...");
128 match actuator.perform_reboot(reboot_reasons).await {
129 Ok(()) => {}
130 Err(status) => {
131 eprintln!("[shutdown-shim] Failed to perform reboot: {status:?}");
132 return;
134 }
135 }
136 }
137
138 match responder.send(&PerformPendingRebootResponse {
139 rebooting: Some(rebooting),
140 __source_breaking: fidl::marker::SourceBreaking,
141 }) {
142 Ok(()) => {}
143 Err(e) => {
144 eprintln!(
145 "[shutdown-shim] Failed to respond to 'PerformPendingReboot': {e:?}"
146 );
147 return;
149 }
150 }
151 }
152 }
153 }
154 }
155}
156
157pub(super) struct Cancellations {
159 scheduled_requests: Arc<Mutex<ScheduledRequests>>,
163
164 cancellation_receiver: futures::channel::mpsc::UnboundedReceiver<Cancel>,
168}
169
170impl Cancellations {
171 pub(super) async fn run(self) -> () {
176 let Self { scheduled_requests, cancellation_receiver } = self;
177 cancellation_receiver
178 .for_each_concurrent(None, |Cancel { reason, signal }| {
179 let scheduled_requests = scheduled_requests.clone();
180 async move {
181 let _signals = OnSignals::new(signal, CANCELLATION_SIGNALS)
186 .await
187 .expect("failed to wait for signals on eventpair");
188 {
189 let mut scheduled_requests = scheduled_requests.lock();
190 scheduled_requests.cancel(reason);
191 println!(
192 "[shutdown-shim] Collaborative reboot canceled for reason: {reason:?}. \
193 Current scheduled requests: {scheduled_requests}"
194 );
195 }
196 }
197 })
198 .await
199 }
200}
201
202struct Cancel {
204 reason: Reason,
205 signal: zx::EventPair,
206}
207
208#[derive(Debug)]
210struct ScheduledRequests {
211 system_update: usize,
212 netstack_migration: usize,
213 system_update_inspect_prop: fuchsia_inspect::types::UintProperty,
215 netstack_migration_inspect_prop: fuchsia_inspect::types::UintProperty,
216}
217
218impl ScheduledRequests {
219 fn new(inspect_node: &fuchsia_inspect::types::Node) -> Self {
220 Self {
221 system_update: 0,
222 netstack_migration: 0,
223 system_update_inspect_prop: inspect_node.create_uint("SystemUpdate", 0),
224 netstack_migration_inspect_prop: inspect_node.create_uint("NetstackMigration", 0),
225 }
226 }
227
228 fn schedule(&mut self, reason: Reason) {
229 let (rc, inspect_prop) = match reason {
230 Reason::SystemUpdate => (&mut self.system_update, &self.system_update_inspect_prop),
231 Reason::NetstackMigration => {
232 (&mut self.netstack_migration, &self.netstack_migration_inspect_prop)
233 }
234 };
235 *rc = rc.saturating_add(1);
236 inspect_prop.add(1);
237 }
238
239 fn cancel(&mut self, reason: Reason) {
240 let (rc, inspect_prop) = match reason {
241 Reason::SystemUpdate => (&mut self.system_update, &self.system_update_inspect_prop),
242 Reason::NetstackMigration => {
243 (&mut self.netstack_migration, &self.netstack_migration_inspect_prop)
244 }
245 };
246 *rc = rc.saturating_sub(1);
247 inspect_prop.subtract(1);
248 }
249
250 fn list_reasons(&self) -> Vec<Reason> {
251 let Self {
252 system_update,
253 netstack_migration,
254 system_update_inspect_prop: _,
255 netstack_migration_inspect_prop: _,
256 } = self;
257 let mut reasons = Vec::new();
258 if *system_update != 0 {
259 reasons.push(Reason::SystemUpdate);
260 }
261 if *netstack_migration != 0 {
262 reasons.push(Reason::NetstackMigration);
263 }
264 reasons
265 }
266}
267
268impl std::fmt::Display for ScheduledRequests {
269 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270 let Self {
271 system_update,
272 netstack_migration,
273 system_update_inspect_prop: _,
274 netstack_migration_inspect_prop: _,
275 } = self;
276 write!(f, "SystemUpdate:{system_update}, NetstackMigration:{netstack_migration}")
277 }
278}
279
280pub(super) trait RebootActuator {
282 async fn perform_reboot(&self, reasons: Vec<Reason>) -> Result<(), zx::Status>;
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289
290 use std::cell::RefCell;
291
292 use diagnostics_assertions::assert_data_tree;
293 use fidl::Peered;
294 use fidl_fuchsia_power::CollaborativeRebootInitiatorMarker;
295 use fidl_fuchsia_power_internal::CollaborativeRebootSchedulerMarker;
296 use futures::future::Either;
297 use futures::stream::FuturesUnordered;
298 use futures::FutureExt;
299 use test_case::test_case;
300
301 #[derive(Default)]
302 struct MockRebooter {
303 reasons: RefCell<Option<Vec<Reason>>>,
308 }
309
310 impl RebootActuator for MockRebooter {
311 async fn perform_reboot(&self, reasons: Vec<Reason>) -> Result<(), zx::Status> {
312 let original_reasons = self.reasons.borrow_mut().replace(reasons);
313 assert_eq!(None, original_reasons);
315 Ok(())
316 }
317 }
318
319 #[test_case(vec![] => false; "no_pending_reboot")]
320 #[test_case(vec![Reason::SystemUpdate] => true; "system_update")]
321 #[test_case(vec![Reason::NetstackMigration] => true; "netstack_migration")]
322 #[test_case(vec![Reason::SystemUpdate, Reason::NetstackMigration] => true; "different_reasons")]
323 #[test_case(vec![Reason::SystemUpdate, Reason::SystemUpdate] => true; "same_reasons")]
324 #[fuchsia_async::run_singlethreaded(test)]
325 async fn collaborative_reboot(mut reasons: Vec<Reason>) -> bool {
326 let inspector =
328 fuchsia_inspect::Inspector::new(fuchsia_inspect::InspectorConfig::default());
329 let (state, _cancellations) = new(&inspector);
330
331 let (scheduler_client, scheduler_request_stream) =
332 fidl::endpoints::create_request_stream::<CollaborativeRebootSchedulerMarker>();
333 let scheduler = scheduler_client.into_proxy();
334 let (initiator_client, initiator_request_stream) =
335 fidl::endpoints::create_request_stream::<CollaborativeRebootInitiatorMarker>();
336 let initiator = initiator_client.into_proxy();
337
338 let schedule_server_fut = state.handle_scheduler_requests(scheduler_request_stream).fuse();
341 futures::pin_mut!(schedule_server_fut);
342 for reason in &reasons {
343 futures::select!(
344 result = scheduler.schedule_reboot(*reason, None).fuse() => {
345 result.expect("failed to schedule reboot");
346 },
347 () = schedule_server_fut => {
348 unreachable!("The `Scheduler` protocol worker shouldn't exit");
349 }
350 );
351 }
352
353 let mock = MockRebooter::default();
355 let PerformPendingRebootResponse { rebooting, __source_breaking } = futures::select!(
356 result = initiator.perform_pending_reboot().fuse() => {
357 result.expect("failed to initate reboot")
358 },
359 () = state.handle_initiator_requests(initiator_request_stream, &mock).fuse() => {
360 unreachable!("The `Initiator` protocol worker shouldn't exit.");
361 }
362 );
363
364 let expected_reasons = if reasons.is_empty() {
366 None
367 } else {
368 reasons.sort();
371 reasons.dedup();
372 Some(reasons)
373 };
374 assert_eq!(*mock.reasons.borrow(), expected_reasons);
375
376 rebooting.expect("rebooting should be present")
377 }
378
379 enum Cancel {
381 None,
382 UserSignal,
383 Drop,
384 }
385
386 #[test_case(vec![(Reason::SystemUpdate, Cancel::None)] => true; "not_canceled")]
387 #[test_case(vec![(Reason::SystemUpdate, Cancel::UserSignal)] => false; "canceled")]
388 #[test_case(vec![(Reason::SystemUpdate, Cancel::Drop)] => false; "canceled_with_drop")]
389 #[test_case(vec![
390 (Reason::SystemUpdate, Cancel::UserSignal), (Reason::SystemUpdate, Cancel::None)
391 ] => true; "same_reasons_only_one_canceled")]
392 #[test_case(vec![
393 (Reason::SystemUpdate, Cancel::UserSignal), (Reason::SystemUpdate, Cancel::UserSignal)
394 ] => false; "same_reasons_both_canceled")]
395 #[test_case(vec![
396 (Reason::SystemUpdate, Cancel::UserSignal), (Reason::NetstackMigration, Cancel::None)
397 ] => true; "different_reasons_only_one_canceled")]
398 #[test_case(vec![
399 (Reason::SystemUpdate, Cancel::UserSignal), (Reason::NetstackMigration, Cancel::UserSignal)
400 ] => false; "different_reasons_both_canceled")]
401 #[fuchsia::test]
402 fn cancellation(requests: Vec<(Reason, Cancel)>) -> bool {
403 let mut exec = fuchsia_async::TestExecutor::new();
406
407 let inspector =
408 fuchsia_inspect::Inspector::new(fuchsia_inspect::InspectorConfig::default());
409 let (state, cancellations_worker) = new(&inspector);
410
411 let (scheduler_client, scheduler_request_stream) =
412 fidl::endpoints::create_request_stream::<CollaborativeRebootSchedulerMarker>();
413 let scheduler = scheduler_client.into_proxy();
414 let (initiator_client, initiator_request_stream) =
415 fidl::endpoints::create_request_stream::<CollaborativeRebootInitiatorMarker>();
416 let initiator = initiator_client.into_proxy();
417
418 let mut cancellation_signals = vec![];
419 let mut uncancelled_signals = vec![];
422 let mut uncancelled_reasons = vec![];
423
424 let schedule_client_fut = FuturesUnordered::new();
427 for (reason, should_cancel) in requests {
428 let (ep1, ep2) = zx::EventPair::create();
429 schedule_client_fut.push(scheduler.schedule_reboot(reason, Some(ep2)));
430 match should_cancel {
431 Cancel::None => {
432 uncancelled_signals.push(ep1);
433 uncancelled_reasons.push(reason);
434 }
435 Cancel::UserSignal => cancellation_signals.push(ep1),
436 Cancel::Drop => std::mem::drop(ep1),
437 }
438 }
439 let schedule_client_fut = schedule_client_fut
440 .for_each(|result| futures::future::ready(result.expect("failed to schedule_reboot")));
441 futures::pin_mut!(schedule_client_fut);
442 let schedule_server_fut = state.handle_scheduler_requests(scheduler_request_stream);
443 futures::pin_mut!(schedule_server_fut);
444 let schedule_fut =
445 futures::future::select(schedule_client_fut, schedule_server_fut).map(|result| {
446 match result {
447 Either::Left(((), _server_fut)) => {}
448 Either::Right(((), _client_fut)) => {
449 unreachable!("The `Scheduler` protocol worker shouldn't exit")
450 }
451 }
452 });
453 futures::pin_mut!(schedule_fut);
454 exec.run_singlethreaded(&mut schedule_fut);
455
456 for cancellation in cancellation_signals {
459 cancellation
460 .signal_peer(zx::Signals::NONE, zx::Signals::USER_0)
461 .expect("failed to cancel reboot");
462 }
463 let cancellation_fut = cancellations_worker.run();
464 futures::pin_mut!(cancellation_fut);
465 assert_eq!(futures::task::Poll::Pending, exec.run_until_stalled(&mut cancellation_fut));
466
467 let mock = MockRebooter::default();
469 let initiate_client_fut = initiator.perform_pending_reboot();
470 futures::pin_mut!(initiate_client_fut);
471 let initiate_server_fut = state.handle_initiator_requests(initiator_request_stream, &mock);
472 futures::pin_mut!(initiate_server_fut);
473 let initiate_fut =
474 futures::future::select(initiate_client_fut, initiate_server_fut).map(|result| {
475 match result {
476 Either::Left((result, _server_fut)) => {
477 result.expect("failed to initate reboot")
478 }
479 Either::Right(((), _client_fut)) => {
480 unreachable!("The `Initiator` protocol worker shouldn't exit")
481 }
482 }
483 });
484 futures::pin_mut!(initiate_fut);
485 let PerformPendingRebootResponse { rebooting, __source_breaking } =
486 exec.run_singlethreaded(&mut initiate_fut);
487
488 let expected_reasons = if uncancelled_reasons.is_empty() {
490 None
491 } else {
492 uncancelled_reasons.sort();
495 uncancelled_reasons.dedup();
496 Some(uncancelled_reasons)
497 };
498 assert_eq!(*mock.reasons.borrow(), expected_reasons);
499
500 rebooting.expect("rebooting should be present")
501 }
502
503 #[fuchsia_async::run_singlethreaded(test)]
504 async fn inspect() {
505 let inspector =
506 fuchsia_inspect::Inspector::new(fuchsia_inspect::InspectorConfig::default());
507 let (state, cancellations_worker) = new(&inspector);
508
509 let mut cancellation_signals = vec![];
510 let (scheduler_client, scheduler_request_stream) =
511 fidl::endpoints::create_request_stream::<CollaborativeRebootSchedulerMarker>();
512 let scheduler = scheduler_client.into_proxy();
513 let schedule_server_fut = state.handle_scheduler_requests(scheduler_request_stream).fuse();
514 futures::pin_mut!(schedule_server_fut);
515
516 assert_data_tree!(inspector, root: {
518 "CollaborativeReboot": {
519 "ScheduledRequests": {
520 "SystemUpdate": 0u64,
521 "NetstackMigration": 0u64,
522 }
523 }
524 });
525
526 for count in [1u64, 2u64] {
528 let (ep1, ep2) = zx::EventPair::create();
529 cancellation_signals.push(ep1);
530 futures::select!(
531 result = scheduler.schedule_reboot(
532 Reason::SystemUpdate, Some(ep2)).fuse() => {
533 result.expect("failed to schedule reboot");
534 },
535 () = schedule_server_fut => {
536 unreachable!("The `Scheduler` protocol worker shouldn't exit");
537 }
538 );
539 assert_data_tree!(inspector, root: {
540 "CollaborativeReboot": {
541 "ScheduledRequests": {
542 "SystemUpdate": count,
543 "NetstackMigration": 0u64,
544 }
545 }
546 });
547 }
548
549 futures::select!(
551 result = scheduler.schedule_reboot(Reason::NetstackMigration, None).fuse() => {
552 result.expect("failed to schedule reboot");
553 },
554 () = schedule_server_fut => {
555 unreachable!("The `Scheduler` protocol worker shouldn't exit");
556 }
557 );
558 assert_data_tree!(inspector, root: {
559 "CollaborativeReboot": {
560 "ScheduledRequests": {
561 "SystemUpdate": 2u64,
562 "NetstackMigration": 1u64,
563 }
564 }
565 });
566
567 std::mem::drop(cancellation_signals);
569 state.cancellation_sender.close_channel();
570 cancellations_worker.run().await;
571 assert_data_tree!(inspector, root: {
572 "CollaborativeReboot": {
573 "ScheduledRequests": {
574 "SystemUpdate": 0u64,
575 "NetstackMigration": 1u64,
576 }
577 }
578 });
579 }
580}