stack_migration/
main.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod rollback;
6
7use std::pin::{pin, Pin};
8
9use cobalt_client::traits::AsEventCode as _;
10use fuchsia_async::Task;
11use fuchsia_component::server::{ServiceFs, ServiceFsDir};
12use fuchsia_inspect::Property as _;
13use futures::channel::mpsc;
14use futures::{Stream, StreamExt as _};
15use log::{error, info, warn};
16use networking_metrics_registry::networking_metrics_registry as metrics_registry;
17use {
18    fidl_fuchsia_metrics as fmetrics, fidl_fuchsia_net_stackmigrationdeprecated as fnet_migration,
19    fidl_fuchsia_power_internal as fpower,
20};
21
22const DEFAULT_NETSTACK: NetstackVersion = NetstackVersion::Netstack3;
23
24#[derive(Debug, Clone, Copy, serde::Deserialize, serde::Serialize, Eq, PartialEq)]
25enum NetstackVersion {
26    Netstack2,
27    Netstack3,
28}
29
30impl NetstackVersion {
31    fn inspect_uint_value(&self) -> u64 {
32        match self {
33            Self::Netstack2 => 2,
34            Self::Netstack3 => 3,
35        }
36    }
37
38    fn optional_inspect_uint_value(o: &Option<Self>) -> u64 {
39        o.as_ref().map(Self::inspect_uint_value).unwrap_or(0)
40    }
41}
42
43impl From<fnet_migration::NetstackVersion> for NetstackVersion {
44    fn from(value: fnet_migration::NetstackVersion) -> Self {
45        match value {
46            fnet_migration::NetstackVersion::Netstack2 => NetstackVersion::Netstack2,
47            fnet_migration::NetstackVersion::Netstack3 => NetstackVersion::Netstack3,
48        }
49    }
50}
51
52impl From<NetstackVersion> for fnet_migration::NetstackVersion {
53    fn from(value: NetstackVersion) -> Self {
54        match value {
55            NetstackVersion::Netstack2 => fnet_migration::NetstackVersion::Netstack2,
56            NetstackVersion::Netstack3 => fnet_migration::NetstackVersion::Netstack3,
57        }
58    }
59}
60
61impl From<NetstackVersion> for Box<fnet_migration::VersionSetting> {
62    fn from(value: NetstackVersion) -> Self {
63        Box::new(fnet_migration::VersionSetting { version: value.into() })
64    }
65}
66
67#[derive(Debug, PartialEq)]
68enum RollbackNetstackVersion {
69    Netstack2,
70    // The automated setting requested Netstack3, but the persisted state
71    // indicates that the previous boot had too many health check failure.
72    // Forcibly use Netstack2.
73    ForceNetstack2,
74    Netstack3,
75}
76
77impl RollbackNetstackVersion {
78    // Convert into a `NetstackVersion`, while honoring the forced setting.
79    fn version(&self) -> NetstackVersion {
80        match self {
81            Self::Netstack2 | Self::ForceNetstack2 => NetstackVersion::Netstack2,
82            Self::Netstack3 => NetstackVersion::Netstack3,
83        }
84    }
85
86    // Convert into a `NetstackVersion`, while ignoring the forced setting.
87    fn version_ignoring_force(&self) -> NetstackVersion {
88        match self {
89            Self::Netstack2 => NetstackVersion::Netstack2,
90            Self::Netstack3 | Self::ForceNetstack2 => NetstackVersion::Netstack3,
91        }
92    }
93}
94
95impl From<NetstackVersion> for RollbackNetstackVersion {
96    fn from(version: NetstackVersion) -> Self {
97        match version {
98            NetstackVersion::Netstack2 => RollbackNetstackVersion::Netstack2,
99            NetstackVersion::Netstack3 => RollbackNetstackVersion::Netstack3,
100        }
101    }
102}
103
104#[derive(Default, Debug, serde::Deserialize, serde::Serialize)]
105#[cfg_attr(test, derive(Eq, PartialEq))]
106struct Persisted {
107    automated: Option<NetstackVersion>,
108    user: Option<NetstackVersion>,
109    rollback: Option<rollback::Persisted>,
110}
111
112impl Persisted {
113    fn load<R: std::io::Read>(r: R) -> Self {
114        serde_json::from_reader(std::io::BufReader::new(r)).unwrap_or_else(|e| {
115            error!("error loading persisted config {e:?}, using defaults");
116            Persisted::default()
117        })
118    }
119
120    fn save<W: std::io::Write>(&self, w: W) {
121        serde_json::to_writer(w, self).unwrap_or_else(|e: serde_json::Error| {
122            error!("error persisting configuration {self:?}: {e:?}")
123        })
124    }
125
126    // Determine the desired NetstackVersion based on the persisted values
127    fn desired_netstack_version(&self) -> RollbackNetstackVersion {
128        match self {
129            Persisted { user: Some(user), automated: _, rollback: _ } => (*user).into(),
130            Persisted {
131                user: None,
132                rollback: Some(rollback::Persisted::HealthcheckFailures(failures)),
133                automated: Some(NetstackVersion::Netstack3),
134            } if *failures >= rollback::MAX_FAILED_HEALTHCHECKS => {
135                RollbackNetstackVersion::ForceNetstack2
136            }
137            Persisted { user: None, automated: Some(automated), rollback: _ } => {
138                (*automated).into()
139            }
140            // Use the default version if nothing is set.
141            Persisted { user: None, automated: None, rollback: _ } => DEFAULT_NETSTACK.into(),
142        }
143    }
144}
145
146enum ServiceRequest {
147    Control(fnet_migration::ControlRequest),
148    State(fnet_migration::StateRequest),
149}
150
151struct Migration<P, CR> {
152    current_boot: RollbackNetstackVersion,
153    persisted: Persisted,
154    persistence: P,
155    collaborative_reboot: CollaborativeReboot<CR>,
156}
157
158trait PersistenceProvider {
159    type Writer: std::io::Write;
160    type Reader: std::io::Read;
161
162    fn open_writer(&mut self) -> std::io::Result<Self::Writer>;
163    fn open_reader(&self) -> std::io::Result<Self::Reader>;
164}
165
166struct DataPersistenceProvider {}
167
168const PERSISTED_FILE_PATH: &'static str = "/data/config.json";
169
170impl PersistenceProvider for DataPersistenceProvider {
171    type Writer = std::fs::File;
172    type Reader = std::fs::File;
173
174    fn open_writer(&mut self) -> std::io::Result<Self::Writer> {
175        std::fs::File::create(PERSISTED_FILE_PATH)
176    }
177
178    fn open_reader(&self) -> std::io::Result<Self::Reader> {
179        std::fs::File::open(PERSISTED_FILE_PATH)
180    }
181}
182
183struct CollaborativeReboot<CR> {
184    scheduler: CR,
185    /// `Some(<cancellation_token>)` if there's an outstanding collaborative
186    /// reboot scheduled.
187    scheduled_req: Option<zx::EventPair>,
188}
189
190impl<CR: CollaborativeRebootScheduler> CollaborativeReboot<CR> {
191    /// Schedules a collaborative reboot.
192    ///
193    /// No-Op if there's already a reboot scheduled.
194    async fn schedule(&mut self) {
195        let Self { scheduler, scheduled_req } = self;
196        if scheduled_req.is_some() {
197            // We already have an outstanding request.
198            return;
199        }
200
201        info!("Scheduling collaborative reboot");
202        let (mine, theirs) = zx::EventPair::create();
203        *scheduled_req = Some(mine);
204        scheduler
205            .schedule(fpower::CollaborativeRebootReason::NetstackMigration, Some(theirs))
206            .await;
207    }
208
209    /// Cancels the currently scheduled collaborative Reboot.
210    ///
211    /// No-Op if there's none scheduled.
212    fn cancel(&mut self) {
213        if let Some(cancel) = self.scheduled_req.take() {
214            info!("Canceling collaborative reboot request. It's no longer necessary.");
215            // Dropping the eventpair cancels the request.
216            std::mem::drop(cancel);
217        }
218    }
219}
220
221/// An abstraction over the `fpower::CollaborativeRebootScheduler` FIDL API.
222trait CollaborativeRebootScheduler {
223    async fn schedule(
224        &mut self,
225        reason: fpower::CollaborativeRebootReason,
226        cancel: Option<zx::EventPair>,
227    );
228}
229
230/// An implementation of `CollaborativeRebootScheduler` that connects to the
231/// API over FIDL.
232struct Scheduler {}
233
234impl CollaborativeRebootScheduler for Scheduler {
235    async fn schedule(
236        &mut self,
237        reason: fpower::CollaborativeRebootReason,
238        cancel: Option<zx::EventPair>,
239    ) {
240        let proxy = match fuchsia_component::client::connect_to_protocol::<
241            fpower::CollaborativeRebootSchedulerMarker,
242        >() {
243            Ok(proxy) => proxy,
244            Err(e) => {
245                error!("Failed to connect to collaborative reboot scheduler: {e:?}");
246                return;
247            }
248        };
249        match proxy.schedule_reboot(reason, cancel).await {
250            Ok(()) => {}
251            Err(e) => error!("Failed to schedule collaborative reboot: {e:?}"),
252        }
253    }
254}
255
256impl<P: PersistenceProvider, CR: CollaborativeRebootScheduler> Migration<P, CR> {
257    fn new(persistence: P, cr_scheduler: CR) -> Self {
258        let persisted = persistence.open_reader().map(Persisted::load).unwrap_or_else(|e| {
259            warn!("could not open persistence reader: {e:?}. using defaults");
260            Persisted::default()
261        });
262        let current_boot = persisted.desired_netstack_version();
263
264        if current_boot == RollbackNetstackVersion::ForceNetstack2 {
265            warn!(
266                "Previous boot failed to migrate to Netstack3. \
267                Ignoring automated setting and forcibly using Netstack2."
268            );
269        }
270
271        Self {
272            current_boot,
273            persisted,
274            persistence,
275            collaborative_reboot: CollaborativeReboot {
276                scheduler: cr_scheduler,
277                scheduled_req: None,
278            },
279        }
280    }
281
282    fn persist(&mut self) {
283        let Self { current_boot: _, persisted, persistence, collaborative_reboot: _ } = self;
284        let w = match persistence.open_writer() {
285            Ok(w) => w,
286            Err(e) => {
287                error!("failed to open writer to persist settings: {e:?}");
288                return;
289            }
290        };
291        persisted.save(w);
292    }
293
294    fn map_version_setting(
295        version: Option<Box<fnet_migration::VersionSetting>>,
296    ) -> Option<NetstackVersion> {
297        version.map(|v| {
298            let fnet_migration::VersionSetting { version } = &*v;
299            (*version).into()
300        })
301    }
302
303    async fn update_collaborative_reboot(&mut self) {
304        let Self { current_boot, persisted, persistence: _, collaborative_reboot } = self;
305        if persisted.desired_netstack_version().version() != current_boot.version() {
306            // When the current boot differs from our desired version, schedule
307            // a reboot (if there's not already one).
308            collaborative_reboot.schedule().await
309        } else {
310            // When the current_boot matches our desired version, we no longer
311            // need reboot. Cancel the outstanding request (if any)
312            collaborative_reboot.cancel()
313        }
314    }
315
316    async fn update_rollback_state(&mut self, new_state: rollback::Persisted) {
317        if self.persisted.rollback != Some(new_state) {
318            self.persisted.rollback = Some(new_state);
319            self.update_collaborative_reboot().await;
320            self.persist();
321        }
322    }
323
324    async fn handle_control_request(
325        &mut self,
326        req: fnet_migration::ControlRequest,
327    ) -> Result<(), fidl::Error> {
328        match req {
329            fnet_migration::ControlRequest::SetAutomatedNetstackVersion { version, responder } => {
330                let version = Self::map_version_setting(version);
331                let Self {
332                    current_boot: _,
333                    persisted: Persisted { automated, user: _, rollback: _ },
334                    persistence: _,
335                    collaborative_reboot: _,
336                } = self;
337                if version != *automated {
338                    info!("automated netstack version switched to {version:?}");
339                    *automated = version;
340                    self.persist();
341                    self.update_collaborative_reboot().await;
342                }
343                responder.send()
344            }
345            fnet_migration::ControlRequest::SetUserNetstackVersion { version, responder } => {
346                let version = Self::map_version_setting(version);
347                let Self {
348                    current_boot: _,
349                    persisted: Persisted { automated: _, user, rollback: _ },
350                    persistence: _,
351                    collaborative_reboot: _,
352                } = self;
353                if version != *user {
354                    info!("user netstack version switched to {version:?}");
355                    *user = version;
356                    self.persist();
357                    self.update_collaborative_reboot().await;
358                }
359                responder.send()
360            }
361        }
362    }
363
364    fn handle_state_request(&self, req: fnet_migration::StateRequest) -> Result<(), fidl::Error> {
365        let Migration {
366            current_boot,
367            persisted: Persisted { user, automated, rollback: _ },
368            persistence: _,
369            collaborative_reboot: _,
370        } = self;
371        match req {
372            fnet_migration::StateRequest::GetNetstackVersion { responder } => {
373                responder.send(&fnet_migration::InEffectVersion {
374                    current_boot: current_boot.version().into(),
375                    user: (*user).map(Into::into),
376                    automated: (*automated).map(Into::into),
377                })
378            }
379        }
380    }
381
382    async fn handle_request(&mut self, req: ServiceRequest) -> Result<(), fidl::Error> {
383        match req {
384            ServiceRequest::Control(r) => self.handle_control_request(r).await,
385            ServiceRequest::State(r) => self.handle_state_request(r),
386        }
387    }
388}
389
390struct InspectNodes {
391    automated_setting: fuchsia_inspect::UintProperty,
392    user_setting: fuchsia_inspect::UintProperty,
393    rollback_state: fuchsia_inspect::StringProperty,
394}
395
396impl InspectNodes {
397    fn new<P, CR>(inspector: &fuchsia_inspect::Inspector, m: &Migration<P, CR>) -> Self {
398        let root = inspector.root();
399        let Migration { current_boot, persisted: Persisted { automated, user, rollback }, .. } = m;
400        let automated_setting = root.create_uint(
401            "automated_setting",
402            NetstackVersion::optional_inspect_uint_value(automated),
403        );
404        let user_setting =
405            root.create_uint("user_setting", NetstackVersion::optional_inspect_uint_value(user));
406
407        let rollback_state = root.create_string("rollback_state", format!("{rollback:?}"));
408
409        // The current boot version is immutable, record it once instead of
410        // keeping track of a property node.
411        root.record_uint("current_boot", current_boot.version().inspect_uint_value());
412        root.record_bool(
413            "forced_netstack2",
414            *current_boot == RollbackNetstackVersion::ForceNetstack2,
415        );
416
417        Self { automated_setting, user_setting, rollback_state }
418    }
419
420    fn update<P, CR>(&self, m: &Migration<P, CR>) {
421        let Migration { persisted: Persisted { automated, user, rollback }, .. } = m;
422        let Self { automated_setting, user_setting, rollback_state } = self;
423        automated_setting.set(NetstackVersion::optional_inspect_uint_value(automated));
424        user_setting.set(NetstackVersion::optional_inspect_uint_value(user));
425        rollback_state.set(&format!("{rollback:?}"));
426    }
427}
428
429/// Wraps communication with metrics (cobalt) server.
430struct MetricsLogger {
431    logger: Option<fmetrics::MetricEventLoggerProxy>,
432}
433
434impl MetricsLogger {
435    async fn new() -> Self {
436        let (logger, server_end) =
437            fidl::endpoints::create_proxy::<fmetrics::MetricEventLoggerMarker>();
438
439        let factory = match fuchsia_component::client::connect_to_protocol::<
440            fmetrics::MetricEventLoggerFactoryMarker,
441        >() {
442            Ok(f) => f,
443            Err(e) => {
444                warn!("can't connect to logger factory {e:?}");
445                return Self { logger: None };
446            }
447        };
448
449        match factory
450            .create_metric_event_logger(
451                &fmetrics::ProjectSpec {
452                    customer_id: Some(metrics_registry::CUSTOMER_ID),
453                    project_id: Some(metrics_registry::PROJECT_ID),
454                    ..Default::default()
455                },
456                server_end,
457            )
458            .await
459        {
460            Ok(Ok(())) => Self { logger: Some(logger) },
461            Ok(Err(e)) => {
462                warn!("can't create event logger {e:?}");
463                Self { logger: None }
464            }
465            Err(e) => {
466                warn!("error connecting to metric event logger {e:?}");
467                Self { logger: None }
468            }
469        }
470    }
471
472    /// Logs metrics from `migration` to the metrics server.
473    async fn log_metrics<P, CR>(&self, migration: &Migration<P, CR>) {
474        let logger = if let Some(logger) = self.logger.as_ref() {
475            logger
476        } else {
477            // Silently don't log metrics if we didn't manage to create a
478            // logger, warnings are emitted upon creation.
479            return;
480        };
481
482        let current_boot = match migration.current_boot {
483            RollbackNetstackVersion::Netstack2 | RollbackNetstackVersion::ForceNetstack2 => {
484                metrics_registry::StackMigrationCurrentBootMetricDimensionNetstackVersion::Netstack2
485            }
486            RollbackNetstackVersion::Netstack3 => {
487                metrics_registry::StackMigrationCurrentBootMetricDimensionNetstackVersion::Netstack3
488            }
489        }
490        .as_event_code();
491        let user = match migration.persisted.user {
492            None => metrics_registry::StackMigrationUserSettingMetricDimensionNetstackVersion::NoSelection,
493            Some(NetstackVersion::Netstack2) => metrics_registry::StackMigrationUserSettingMetricDimensionNetstackVersion::Netstack2,
494            Some(NetstackVersion::Netstack3) => metrics_registry::StackMigrationUserSettingMetricDimensionNetstackVersion::Netstack3,
495        }
496        .as_event_code();
497        let automated = match migration.persisted.automated {
498            None => metrics_registry::StackMigrationAutomatedSettingMetricDimensionNetstackVersion::NoSelection,
499            Some(NetstackVersion::Netstack2) => metrics_registry::StackMigrationAutomatedSettingMetricDimensionNetstackVersion::Netstack2,
500            Some(NetstackVersion::Netstack3) => metrics_registry::StackMigrationAutomatedSettingMetricDimensionNetstackVersion::Netstack3,
501        }.as_event_code();
502        let rollback_state = compute_state_metric(migration).as_event_code();
503        for (metric_id, event_code) in [
504            (metrics_registry::STACK_MIGRATION_CURRENT_BOOT_METRIC_ID, current_boot),
505            (metrics_registry::STACK_MIGRATION_USER_SETTING_METRIC_ID, user),
506            (metrics_registry::STACK_MIGRATION_AUTOMATED_SETTING_METRIC_ID, automated),
507            (metrics_registry::STACK_MIGRATION_STATE_METRIC_ID, rollback_state),
508        ] {
509            let occurrence_count = 1;
510            logger
511                .log_occurrence(metric_id, occurrence_count, &[event_code][..])
512                .await
513                .map(|r| {
514                    r.unwrap_or_else(|e| warn!("error reported logging metric {metric_id} {e:?}"))
515                })
516                .unwrap_or_else(|fidl_error| {
517                    warn!("error logging metric {metric_id} {fidl_error:?}")
518                });
519        }
520    }
521}
522
523fn compute_state_metric<P, CR>(
524    migration: &Migration<P, CR>,
525) -> metrics_registry::StackMigrationStateMetricDimensionMigrationState {
526    use metrics_registry::StackMigrationStateMetricDimensionMigrationState as state_metric;
527    let Migration {
528        current_boot,
529        persisted: Persisted { automated, user: _, rollback },
530        persistence: _,
531        collaborative_reboot: _,
532    } = migration;
533
534    match (current_boot, automated, rollback) {
535        (RollbackNetstackVersion::Netstack2, Some(NetstackVersion::Netstack2) | None, _) => {
536            state_metric::NotStarted
537        }
538        (RollbackNetstackVersion::Netstack2, Some(NetstackVersion::Netstack3), _) => {
539            state_metric::Scheduled
540        }
541        (RollbackNetstackVersion::ForceNetstack2, _, _) => state_metric::RolledBack,
542        (RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack2) | None, _) => {
543            state_metric::Canceled
544        }
545        (RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack3), None) => {
546            state_metric::InProgress
547        }
548        (
549            RollbackNetstackVersion::Netstack3,
550            Some(NetstackVersion::Netstack3),
551            Some(rollback::Persisted::HealthcheckFailures(f)),
552        ) => {
553            if *f >= rollback::MAX_FAILED_HEALTHCHECKS {
554                state_metric::Failed
555            } else {
556                state_metric::InProgress
557            }
558        }
559        (
560            RollbackNetstackVersion::Netstack3,
561            Some(NetstackVersion::Netstack3),
562            Some(rollback::Persisted::Success),
563        ) => state_metric::Success,
564    }
565}
566
567#[fuchsia::main]
568pub async fn main() {
569    info!("running netstack migration service");
570
571    let mut fs = ServiceFs::new();
572    let _: &mut ServiceFsDir<'_, _> = fs
573        .dir("svc")
574        .add_fidl_service(|rs: fnet_migration::ControlRequestStream| {
575            rs.map(|req| req.map(ServiceRequest::Control)).left_stream()
576        })
577        .add_fidl_service(|rs: fnet_migration::StateRequestStream| {
578            rs.map(|req| req.map(ServiceRequest::State)).right_stream()
579        });
580    let _: &mut ServiceFs<_> =
581        fs.take_and_serve_directory_handle().expect("failed to take out directory handle");
582
583    let mut migration = Migration::new(DataPersistenceProvider {}, Scheduler {});
584    main_inner(
585        &mut migration,
586        fs.fuse().flatten_unordered(None),
587        rollback::FidlHttpFetcher::new(),
588        rollback::new_healthcheck_stream(),
589    )
590    .await
591}
592
593async fn main_inner<
594    P: PersistenceProvider,
595    CR: CollaborativeRebootScheduler,
596    H: rollback::HttpFetcher + Send + 'static,
597    T: Stream<Item = ()> + Send + 'static,
598    SR: Stream<Item = Result<ServiceRequest, fidl::Error>>,
599>(
600    migration: &mut Migration<P, CR>,
601    service_request_stream: SR,
602    http_fetcher: H,
603    healthcheck_tick: T,
604) {
605    let inspector = fuchsia_inspect::component::inspector();
606    let _inspect_server =
607        inspect_runtime::publish(inspector, inspect_runtime::PublishOptions::default())
608            .expect("failed to serve inspector");
609    let inspect_nodes = InspectNodes::new(inspector, &migration);
610
611    let metrics_logger = MetricsLogger::new().await;
612
613    let (desired_version_sender, desired_version_receiver) = mpsc::unbounded();
614    let (rollback_state_sender, rollback_state_receiver) = mpsc::unbounded();
615    let rollback_state =
616        rollback::State::new(migration.persisted.rollback, migration.current_boot.version());
617    // Update rollback persistence immediately in case the device reboots before
618    // the rollback module has time to send an asynchronous update. This is
619    // required for correctness if Netstack3 is crashing on startup, or in the
620    // following case:
621    //
622    // 1. Device fails to migrate to Netstack3 and persists
623    //    HealthcheckFailures(MAX_FAILED_HEALTHCHECKS), which will force
624    //    Netstack2 on subsequent boots.
625    // 2. Device reboots into Netstack2, sees that it should be running
626    //    Netstack3, and schedules a reboot without clearing the failures.
627    // 3. Device reboots back into Netstack3 and sees that it should schedule
628    //    a reboot because the persisted failures are above the limit.
629    migration.update_rollback_state(rollback_state.persisted()).await;
630
631    Task::spawn(async move {
632        rollback::run(
633            rollback_state,
634            http_fetcher,
635            desired_version_receiver,
636            rollback_state_sender,
637            pin!(healthcheck_tick),
638        )
639        .await
640    })
641    .detach();
642
643    enum Action {
644        ServiceRequest(Result<ServiceRequest, fidl::Error>),
645        LogMetrics,
646        UpdateRollbackState(rollback::Persisted),
647    }
648
649    let metrics_logging_interval = fuchsia_async::MonotonicDuration::from_hours(1);
650    let mut stream: futures::stream::SelectAll<Pin<Box<dyn Stream<Item = Action>>>> =
651        futures::stream::SelectAll::new();
652
653    // Always log metrics once on startup then periodically log new values so
654    // the aggregation window always contains one sample of the current
655    // settings.
656    stream.push(Box::pin(Box::new(
657        futures::stream::once(futures::future::ready(()))
658            .chain(fuchsia_async::Interval::new(metrics_logging_interval))
659            .map(|()| Action::LogMetrics),
660    )));
661    stream.push(Box::pin(Box::new(Box::pin(service_request_stream.map(Action::ServiceRequest)))));
662    stream.push(Box::pin(rollback_state_receiver.map(|state| Action::UpdateRollbackState(state))));
663
664    while let Some(action) = stream.next().await {
665        match action {
666            Action::ServiceRequest(req) => {
667                let result = match req {
668                    Ok(req) => migration.handle_request(req).await,
669                    Err(e) => Err(e),
670                };
671                // Always update inspector state after handling a request.
672                inspect_nodes.update(&migration);
673
674                // Send the desired netstack version to the rollback mechanism,
675                // but ignore the "forced" setting. The "forced" setting comes
676                // from the rollback mechanism, and sending that signal back
677                // into it would cause a the mechanism to incorrectly detect
678                // a cancelation.
679                match desired_version_sender.unbounded_send(
680                    migration.persisted.desired_netstack_version().version_ignoring_force(),
681                ) {
682                    Ok(()) => (),
683                    Err(e) => {
684                        error!("error sending update to rollback module: {:?}", e);
685                    }
686                }
687
688                match result {
689                    Ok(()) => (),
690                    Err(e) => {
691                        if !e.is_closed() {
692                            error!("error processing FIDL request {:?}", e)
693                        }
694                    }
695                }
696            }
697            Action::LogMetrics => {
698                metrics_logger.log_metrics(&migration).await;
699            }
700            Action::UpdateRollbackState(new_state) => {
701                migration.update_rollback_state(new_state).await;
702                // Always update inspector state when the rollback state
703                // changes.
704                inspect_nodes.update(&migration);
705            }
706        }
707    }
708}
709
710#[cfg(test)]
711mod tests {
712    use super::*;
713    use assert_matches::assert_matches;
714    use async_utils::event::{Event, EventWait};
715    use diagnostics_assertions::assert_data_tree;
716    use fidl::Peered as _;
717    use fidl_fuchsia_net_http as fnet_http;
718    use fuchsia_async::TimeoutExt;
719    use futures::FutureExt;
720    use std::cell::RefCell;
721    use std::rc::Rc;
722    use std::time::Duration;
723    use test_case::test_case;
724
725    #[derive(Default, Clone)]
726    struct InMemory {
727        file: Rc<RefCell<Option<Vec<u8>>>>,
728    }
729
730    impl InMemory {
731        fn with_persisted(p: Persisted) -> Self {
732            let mut s = Self::default();
733            p.save(s.open_writer().unwrap());
734            s
735        }
736    }
737
738    impl PersistenceProvider for InMemory {
739        type Writer = Self;
740        type Reader = std::io::Cursor<Vec<u8>>;
741
742        fn open_writer(&mut self) -> std::io::Result<Self::Writer> {
743            *self.file.borrow_mut() = Some(Vec::new());
744            Ok(self.clone())
745        }
746
747        fn open_reader(&self) -> std::io::Result<Self::Reader> {
748            self.file
749                .borrow()
750                .clone()
751                .map(std::io::Cursor::new)
752                .ok_or(std::io::ErrorKind::NotFound.into())
753        }
754    }
755
756    impl std::io::Write for InMemory {
757        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
758            let r = self.file.borrow_mut().as_mut().expect("no file open").write(buf);
759            r
760        }
761
762        fn flush(&mut self) -> std::io::Result<()> {
763            Ok(())
764        }
765    }
766
767    struct NoCollaborativeReboot;
768
769    impl CollaborativeRebootScheduler for NoCollaborativeReboot {
770        async fn schedule(
771            &mut self,
772            _reason: fpower::CollaborativeRebootReason,
773            _cancel: Option<zx::EventPair>,
774        ) {
775            panic!("unexpectedly attempted to schedule a collaborative reboot");
776        }
777    }
778
779    #[derive(Default)]
780    struct FakeCollaborativeReboot {
781        req: Option<zx::EventPair>,
782    }
783
784    impl CollaborativeRebootScheduler for FakeCollaborativeReboot {
785        async fn schedule(
786            &mut self,
787            reason: fpower::CollaborativeRebootReason,
788            cancel: Option<zx::EventPair>,
789        ) {
790            assert_eq!(reason, fpower::CollaborativeRebootReason::NetstackMigration);
791            let cancel = cancel.expect("cancellation signal must be provided");
792            assert_eq!(self.req.replace(cancel), None, "attempted to schedule multiple reboots");
793        }
794    }
795
796    fn serve_migration<P: PersistenceProvider, CR: CollaborativeRebootScheduler>(
797        migration: Migration<P, CR>,
798    ) -> (
799        impl futures::Future<Output = Migration<P, CR>>,
800        fnet_migration::ControlProxy,
801        fnet_migration::StateProxy,
802    ) {
803        let (control, control_server) =
804            fidl::endpoints::create_proxy_and_stream::<fnet_migration::ControlMarker>();
805        let (state, state_server) =
806            fidl::endpoints::create_proxy_and_stream::<fnet_migration::StateMarker>();
807
808        let fut = {
809            let control =
810                control_server.map(|req| ServiceRequest::Control(req.expect("control error")));
811            let state = state_server.map(|req| ServiceRequest::State(req.expect("state error")));
812            futures::stream::select(control, state).fold(migration, |mut migration, req| async {
813                migration.handle_request(req).await.expect("handling request");
814                migration
815            })
816        };
817        (fut, control, state)
818    }
819
820    #[test_case(Persisted{
821        user: Some(NetstackVersion::Netstack2),
822        automated: None,
823        rollback: None,
824    }; "user_netstack2")]
825    #[test_case(Persisted{
826        user: Some(NetstackVersion::Netstack3),
827        automated: None,
828        rollback: None,
829    }; "user_netstack3")]
830    #[test_case(Persisted{
831        user: None,
832        automated: None,
833        rollback: None,
834    }; "none")]
835    #[test_case(Persisted{
836        user: None,
837        automated: Some(NetstackVersion::Netstack2),
838        rollback: None,
839    }; "automated_netstack2")]
840    #[test_case(Persisted{
841        user: None,
842        automated: Some(NetstackVersion::Netstack3),
843        rollback: None,
844    }; "automated_netstack3")]
845    #[test_case(Persisted{
846        user: None,
847        automated: None,
848        rollback: Some(rollback::Persisted::Success),
849    }; "rollback_success")]
850    #[test_case(Persisted{
851        user: Some(NetstackVersion::Netstack2),
852        automated: Some(NetstackVersion::Netstack3),
853        rollback: Some(rollback::Persisted::HealthcheckFailures(5)),
854    }; "all")]
855    #[fuchsia::test(add_test_attr = false)]
856    fn persist_save_load(v: Persisted) {
857        let mut m = InMemory::default();
858        v.save(m.open_writer().unwrap());
859        assert_eq!(Persisted::load(m.open_reader().unwrap()), v);
860    }
861
862    #[fuchsia::test]
863    fn uses_defaults_if_no_persistence() {
864        let m = Migration::new(InMemory::default(), NoCollaborativeReboot);
865        let Migration {
866            current_boot,
867            persisted: Persisted { user, automated, rollback: _ },
868            persistence: _,
869            collaborative_reboot: _,
870        } = m;
871        assert_eq!(current_boot.version(), DEFAULT_NETSTACK);
872        assert_eq!(user, None);
873        assert_eq!(automated, None);
874    }
875
876    #[test_case(
877        None, Some(NetstackVersion::Netstack3), None, NetstackVersion::Netstack3;
878        "automated_ns3")]
879    #[test_case(
880        None, Some(NetstackVersion::Netstack2), None, NetstackVersion::Netstack2;
881        "automated_ns2")]
882    #[test_case(
883        Some(NetstackVersion::Netstack3),
884        Some(NetstackVersion::Netstack2),
885        Some(rollback::Persisted::HealthcheckFailures(rollback::MAX_FAILED_HEALTHCHECKS)),
886        NetstackVersion::Netstack3;
887        "user_ns3_override")]
888    #[test_case(
889        Some(NetstackVersion::Netstack2),
890        Some(NetstackVersion::Netstack3),
891        Some(rollback::Persisted::Success),
892        NetstackVersion::Netstack2;
893        "user_ns2_override")]
894    #[test_case(
895        Some(NetstackVersion::Netstack2),
896        None,
897        None,
898        NetstackVersion::Netstack2; "user_ns2")]
899    #[test_case(
900        Some(NetstackVersion::Netstack3),
901        None,
902        None,
903        NetstackVersion::Netstack3; "user_ns3")]
904    #[test_case(
905        None,
906        Some(NetstackVersion::Netstack3),
907        Some(rollback::Persisted::HealthcheckFailures(rollback::MAX_FAILED_HEALTHCHECKS)),
908        NetstackVersion::Netstack2; "rollback_to_ns2")]
909    #[test_case(None, None, None, DEFAULT_NETSTACK; "default")]
910    #[fuchsia::test]
911    async fn get_netstack_version(
912        p_user: Option<NetstackVersion>,
913        p_automated: Option<NetstackVersion>,
914        p_rollback: Option<rollback::Persisted>,
915        expect: NetstackVersion,
916    ) {
917        let m = Migration::new(
918            InMemory::with_persisted(Persisted {
919                user: p_user,
920                automated: p_automated,
921                rollback: p_rollback,
922            }),
923            NoCollaborativeReboot,
924        );
925        let Migration {
926            current_boot,
927            persisted: Persisted { user, automated, rollback: _ },
928            persistence: _,
929            collaborative_reboot: _,
930        } = &m;
931        assert_eq!(current_boot.version(), expect);
932        assert_eq!(*user, p_user);
933        assert_eq!(*automated, p_automated);
934
935        let (serve, _, state) = serve_migration(m);
936        let fut = async move {
937            let fnet_migration::InEffectVersion { current_boot, user, automated } =
938                state.get_netstack_version().await.expect("get netstack version");
939            let expect = expect.into();
940            let p_user = p_user.map(Into::into);
941            let p_automated = p_automated.map(Into::into);
942            assert_eq!(current_boot, expect);
943            assert_eq!(user, p_user);
944            assert_eq!(automated, p_automated);
945        };
946        let (_, ()): (Migration<_, _>, _) = futures::future::join(serve, fut).await;
947    }
948
949    #[derive(Debug, Copy, Clone)]
950    enum SetMechanism {
951        User,
952        Automated,
953    }
954
955    #[test_case(SetMechanism::User, NetstackVersion::Netstack2; "set_user_ns2")]
956    #[test_case(SetMechanism::User, NetstackVersion::Netstack3; "set_user_ns3")]
957    #[test_case(SetMechanism::Automated, NetstackVersion::Netstack2; "set_automated_ns2")]
958    #[test_case(SetMechanism::Automated, NetstackVersion::Netstack3; "set_automated_ns3")]
959    #[fuchsia::test]
960    async fn set_netstack_version(mechanism: SetMechanism, set_version: NetstackVersion) {
961        let m = Migration::new(
962            InMemory::with_persisted(Default::default()),
963            FakeCollaborativeReboot::default(),
964        );
965        let (serve, control, _) = serve_migration(m);
966        let fut = async move {
967            let setting = fnet_migration::VersionSetting { version: set_version.into() };
968            match mechanism {
969                SetMechanism::User => control
970                    .set_user_netstack_version(Some(&setting))
971                    .await
972                    .expect("set user netstack version"),
973                SetMechanism::Automated => control
974                    .set_automated_netstack_version(Some(&setting))
975                    .await
976                    .expect("set automated netstack version"),
977            }
978        };
979        let (migration, ()) = futures::future::join(serve, fut).await;
980
981        let validate_versions = |m: &Migration<_, _>, current| {
982            let Migration {
983                current_boot,
984                persisted: Persisted { user, automated, rollback: _ },
985                persistence: _,
986                collaborative_reboot: _,
987            } = m;
988            assert_eq!(current_boot.version(), current);
989            match mechanism {
990                SetMechanism::User => {
991                    assert_eq!(*user, Some(set_version));
992                    assert_eq!(*automated, None);
993                }
994                SetMechanism::Automated => {
995                    assert_eq!(*user, None);
996                    assert_eq!(*automated, Some(set_version));
997                }
998            }
999        };
1000
1001        validate_versions(&migration, DEFAULT_NETSTACK);
1002        // There should only be a collaborative reboot request if the new
1003        // version differs from the default version.
1004        let cr_req = &migration.collaborative_reboot.scheduler.req;
1005        if set_version != DEFAULT_NETSTACK {
1006            assert_eq!(Ok(false), cr_req.as_ref().expect("there should be a request").is_closed());
1007        } else {
1008            assert_eq!(cr_req, &None);
1009        }
1010
1011        // Check that the setting was properly persisted.
1012        let migration =
1013            Migration::new(migration.persistence, migration.collaborative_reboot.scheduler);
1014        validate_versions(&migration, set_version);
1015    }
1016
1017    #[fuchsia::test]
1018    async fn update_rollback_state() {
1019        let mut migration = Migration::new(
1020            InMemory::with_persisted(Persisted {
1021                automated: Some(NetstackVersion::Netstack3),
1022                user: None,
1023                rollback: None,
1024            }),
1025            FakeCollaborativeReboot::default(),
1026        );
1027
1028        assert_eq!(migration.current_boot.version(), NetstackVersion::Netstack3);
1029        assert!(migration.collaborative_reboot.scheduler.req.is_none());
1030
1031        // The first update shouldn't schedule a reboot because we haven't
1032        // passed the healthcheck threshold yet.
1033        migration.update_rollback_state(rollback::Persisted::HealthcheckFailures(1)).await;
1034        assert_matches!(
1035            migration.persisted.rollback,
1036            Some(rollback::Persisted::HealthcheckFailures(1))
1037        );
1038        assert!(migration.collaborative_reboot.scheduler.req.is_none());
1039
1040        // This second update should schedule a reboot because we've passed
1041        // the healthcheck limit and want to roll back to Netstack2.
1042        migration
1043            .update_rollback_state(rollback::Persisted::HealthcheckFailures(
1044                rollback::MAX_FAILED_HEALTHCHECKS,
1045            ))
1046            .await;
1047        assert_matches!(
1048            migration.persisted.rollback,
1049            Some(rollback::Persisted::HealthcheckFailures(rollback::MAX_FAILED_HEALTHCHECKS))
1050        );
1051        assert_eq!(
1052            migration
1053                .collaborative_reboot
1054                .scheduler
1055                .req
1056                .as_ref()
1057                .expect("reboot was not scheduled")
1058                .is_closed()
1059                .unwrap(),
1060            false
1061        );
1062
1063        // This emulates seeing a healthcheck success before rebooting, in which
1064        // case we should see the reboot get canceled.
1065        migration.update_rollback_state(rollback::Persisted::Success).await;
1066        assert_matches!(migration.persisted.rollback, Some(rollback::Persisted::Success));
1067        assert!(migration
1068            .collaborative_reboot
1069            .scheduler
1070            .req
1071            .as_ref()
1072            .unwrap()
1073            .is_closed()
1074            .unwrap());
1075
1076        // Ensure that the changes were persisted successfully.
1077        let migration =
1078            Migration::new(migration.persistence, migration.collaborative_reboot.scheduler);
1079        assert_matches!(migration.persisted.rollback, Some(rollback::Persisted::Success));
1080    }
1081
1082    #[test_case(SetMechanism::User, Some(NetstackVersion::Netstack2), false)]
1083    #[test_case(SetMechanism::User, Some(NetstackVersion::Netstack3), true)]
1084    #[test_case(SetMechanism::User, None, false)]
1085    #[test_case(SetMechanism::Automated, Some(NetstackVersion::Netstack2), false)]
1086    #[test_case(SetMechanism::Automated, Some(NetstackVersion::Netstack3), true)]
1087    #[test_case(SetMechanism::Automated, None, true)]
1088    #[fuchsia::test]
1089    async fn cancel_collaborative_reboot(
1090        mechanism: SetMechanism,
1091        version: Option<NetstackVersion>,
1092        expect_canceled: bool,
1093    ) {
1094        let migration = Migration::new(
1095            InMemory::with_persisted(Persisted { user: None, automated: None, rollback: None }),
1096            FakeCollaborativeReboot::default(),
1097        );
1098
1099        // Start of by updating the automated setting to Netstack2; this ensures
1100        // there is a pending request to cancel.
1101        let (serve, control, _) = serve_migration(migration);
1102        let fut = async move {
1103            control
1104                .set_automated_netstack_version(Some(&fnet_migration::VersionSetting {
1105                    version: fnet_migration::NetstackVersion::Netstack2,
1106                }))
1107                .await
1108                .expect("set automated netstack version");
1109        };
1110        let (migration, ()) = futures::future::join(serve, fut).await;
1111        let cancel = migration
1112            .collaborative_reboot
1113            .scheduler
1114            .req
1115            .as_ref()
1116            .expect("there should be a request");
1117        assert_eq!(Ok(false), cancel.is_closed());
1118
1119        // Update the setting based on the test parameters
1120        let (serve, control, _) = serve_migration(migration);
1121        let fut = async move {
1122            let setting = version.map(|v| fnet_migration::VersionSetting { version: v.into() });
1123            match mechanism {
1124                SetMechanism::User => control
1125                    .set_user_netstack_version(setting.as_ref())
1126                    .await
1127                    .expect("set user netstack version"),
1128                SetMechanism::Automated => control
1129                    .set_automated_netstack_version(setting.as_ref())
1130                    .await
1131                    .expect("set automated netstack version"),
1132            }
1133        };
1134        let (migration, ()) = futures::future::join(serve, fut).await;
1135
1136        let cancel = migration
1137            .collaborative_reboot
1138            .scheduler
1139            .req
1140            .as_ref()
1141            .expect("there should be a request");
1142        assert_eq!(Ok(expect_canceled), cancel.is_closed());
1143    }
1144
1145    #[test_case(SetMechanism::User)]
1146    #[test_case(SetMechanism::Automated)]
1147    #[fuchsia::test]
1148    async fn clear_netstack_version(mechanism: SetMechanism) {
1149        const PREVIOUS_VERSION: NetstackVersion = NetstackVersion::Netstack2;
1150        let m = Migration::new(
1151            InMemory::with_persisted(Persisted {
1152                user: Some(PREVIOUS_VERSION),
1153                automated: Some(PREVIOUS_VERSION),
1154                rollback: None,
1155            }),
1156            NoCollaborativeReboot,
1157        );
1158        let (serve, control, _) = serve_migration(m);
1159        let fut = async move {
1160            match mechanism {
1161                SetMechanism::User => control
1162                    .set_user_netstack_version(None)
1163                    .await
1164                    .expect("set user netstack version"),
1165                SetMechanism::Automated => control
1166                    .set_automated_netstack_version(None)
1167                    .await
1168                    .expect("set automated netstack version"),
1169            }
1170        };
1171        let (migration, ()) = futures::future::join(serve, fut).await;
1172
1173        let validate_versions = |m: &Migration<_, _>| {
1174            let Migration {
1175                current_boot,
1176                persisted: Persisted { user, automated, rollback: _ },
1177                persistence: _,
1178                collaborative_reboot: _,
1179            } = m;
1180            assert_eq!(current_boot.version(), PREVIOUS_VERSION);
1181            match mechanism {
1182                SetMechanism::User => {
1183                    assert_eq!(*user, None);
1184                    assert_eq!(*automated, Some(PREVIOUS_VERSION));
1185                }
1186                SetMechanism::Automated => {
1187                    assert_eq!(*user, Some(PREVIOUS_VERSION));
1188                    assert_eq!(*automated, None);
1189                }
1190            }
1191        };
1192
1193        validate_versions(&migration);
1194        // Check that the setting was properly persisted.
1195        let migration =
1196            Migration::new(migration.persistence, migration.collaborative_reboot.scheduler);
1197        validate_versions(&migration);
1198    }
1199
1200    #[fuchsia::test]
1201    async fn inspect() {
1202        let mut m = Migration::new(
1203            InMemory::with_persisted(Persisted {
1204                user: Some(NetstackVersion::Netstack2),
1205                automated: Some(NetstackVersion::Netstack3),
1206                rollback: None,
1207            }),
1208            NoCollaborativeReboot,
1209        );
1210        let inspector = fuchsia_inspect::component::inspector();
1211        let nodes = InspectNodes::new(inspector, &m);
1212        assert_data_tree!(inspector,
1213            root: {
1214                current_boot: 2u64,
1215                user_setting: 2u64,
1216                automated_setting: 3u64,
1217                rollback_state: "None",
1218                forced_netstack2: false,
1219            }
1220        );
1221
1222        m.persisted =
1223            Persisted { user: None, automated: Some(NetstackVersion::Netstack2), rollback: None };
1224        nodes.update(&m);
1225        assert_data_tree!(inspector,
1226            root: {
1227                current_boot: 2u64,
1228                user_setting: 0u64,
1229                automated_setting: 2u64,
1230                rollback_state: "None",
1231                forced_netstack2: false,
1232            }
1233        );
1234    }
1235
1236    #[fuchsia::test]
1237    async fn inspect_rollback() {
1238        let mut m = Migration::new(
1239            InMemory::with_persisted(Persisted {
1240                user: None,
1241                automated: Some(NetstackVersion::Netstack3),
1242                rollback: Some(rollback::Persisted::HealthcheckFailures(
1243                    rollback::MAX_FAILED_HEALTHCHECKS,
1244                )),
1245            }),
1246            NoCollaborativeReboot,
1247        );
1248        let inspector = fuchsia_inspect::component::inspector();
1249        let nodes = InspectNodes::new(inspector, &m);
1250        assert_data_tree!(inspector,
1251            root: {
1252                current_boot: 2u64,
1253                user_setting: 0u64,
1254                automated_setting: 3u64,
1255                rollback_state: "Some(HealthcheckFailures(5))",
1256                forced_netstack2: true,
1257            }
1258        );
1259
1260        m.persisted.rollback =
1261            Some(rollback::Persisted::HealthcheckFailures(rollback::MAX_FAILED_HEALTHCHECKS + 1));
1262        nodes.update(&m);
1263        assert_data_tree!(inspector,
1264            root: {
1265                current_boot: 2u64,
1266                user_setting: 0u64,
1267                automated_setting: 3u64,
1268                rollback_state: "Some(HealthcheckFailures(6))",
1269                forced_netstack2: true,
1270            }
1271        );
1272        m.persisted.rollback = Some(rollback::Persisted::Success);
1273        nodes.update(&m);
1274        assert_data_tree!(inspector,
1275            root: {
1276                current_boot: 2u64,
1277                user_setting: 0u64,
1278                automated_setting: 3u64,
1279                rollback_state: "Some(Success)",
1280                forced_netstack2: true,
1281            }
1282        );
1283    }
1284
1285    #[test_case::test_matrix(
1286    [
1287        (RollbackNetstackVersion::Netstack2, metrics_registry::StackMigrationCurrentBootMetricDimensionNetstackVersion::Netstack2),
1288        (RollbackNetstackVersion::Netstack3, metrics_registry::StackMigrationCurrentBootMetricDimensionNetstackVersion::Netstack3),
1289    ],
1290    [
1291        (None, metrics_registry::StackMigrationUserSettingMetricDimensionNetstackVersion::NoSelection),
1292        (Some(NetstackVersion::Netstack2), metrics_registry::StackMigrationUserSettingMetricDimensionNetstackVersion::Netstack2),
1293        (Some(NetstackVersion::Netstack3), metrics_registry::StackMigrationUserSettingMetricDimensionNetstackVersion::Netstack3),
1294    ],
1295    [
1296        (None, metrics_registry::StackMigrationAutomatedSettingMetricDimensionNetstackVersion::NoSelection),
1297        (Some(NetstackVersion::Netstack2), metrics_registry::StackMigrationAutomatedSettingMetricDimensionNetstackVersion::Netstack2),
1298        (Some(NetstackVersion::Netstack3), metrics_registry::StackMigrationAutomatedSettingMetricDimensionNetstackVersion::Netstack3),
1299    ]
1300    )]
1301    #[fuchsia::test]
1302    async fn metrics_logger(
1303        current_boot: (
1304            RollbackNetstackVersion,
1305            metrics_registry::StackMigrationCurrentBootMetricDimensionNetstackVersion,
1306        ),
1307        user: (
1308            Option<NetstackVersion>,
1309            metrics_registry::StackMigrationUserSettingMetricDimensionNetstackVersion,
1310        ),
1311        automated: (
1312            Option<NetstackVersion>,
1313            metrics_registry::StackMigrationAutomatedSettingMetricDimensionNetstackVersion,
1314        ),
1315    ) {
1316        let (current_boot, current_boot_expect) = current_boot;
1317        let (user, user_expect) = user;
1318        let (automated, automated_expect) = automated;
1319        let mut m = Migration::new(
1320            InMemory::with_persisted(Persisted { user, automated, rollback: None }),
1321            NoCollaborativeReboot,
1322        );
1323        m.current_boot = current_boot;
1324        let (logger, mut logger_stream) =
1325            fidl::endpoints::create_proxy_and_stream::<fmetrics::MetricEventLoggerMarker>();
1326
1327        let metrics_logger = MetricsLogger { logger: Some(logger) };
1328
1329        let ((), ()) = futures::future::join(metrics_logger.log_metrics(&m), async {
1330            let expect = [
1331                (
1332                    metrics_registry::STACK_MIGRATION_CURRENT_BOOT_METRIC_ID,
1333                    Some(current_boot_expect.as_event_code()),
1334                ),
1335                (
1336                    metrics_registry::STACK_MIGRATION_USER_SETTING_METRIC_ID,
1337                    Some(user_expect.as_event_code()),
1338                ),
1339                (
1340                    metrics_registry::STACK_MIGRATION_AUTOMATED_SETTING_METRIC_ID,
1341                    Some(automated_expect.as_event_code()),
1342                ),
1343                (
1344                    metrics_registry::STACK_MIGRATION_STATE_METRIC_ID,
1345                    // Note: The rollback state doesn't have a flat expectation.
1346                    // Don't assert on its value here, and instead we directly
1347                    // test it in a separate test case.
1348                    None,
1349                ),
1350            ];
1351            for (id, ev) in expect {
1352                let (metric, occurences, codes, responder) = logger_stream
1353                    .next()
1354                    .await
1355                    .unwrap()
1356                    .unwrap()
1357                    .into_log_occurrence()
1358                    .expect("bad request");
1359                assert_eq!(metric, id);
1360                assert_eq!(occurences, 1);
1361                if let Some(ev) = ev {
1362                    assert_eq!(codes, vec![ev]);
1363                }
1364                responder.send(Ok(())).unwrap();
1365            }
1366        })
1367        .await;
1368    }
1369
1370    #[test_case(
1371        RollbackNetstackVersion::Netstack2, None, None =>
1372        metrics_registry::StackMigrationStateMetricDimensionMigrationState::NotStarted;
1373        "not_started_none"
1374    )]
1375    #[test_case(
1376        RollbackNetstackVersion::Netstack2, Some(NetstackVersion::Netstack2), None =>
1377        metrics_registry::StackMigrationStateMetricDimensionMigrationState::NotStarted;
1378        "not_started_ns2"
1379    )]
1380    #[test_case(
1381        RollbackNetstackVersion::Netstack2, Some(NetstackVersion::Netstack3), None =>
1382        metrics_registry::StackMigrationStateMetricDimensionMigrationState::Scheduled;
1383        "scheduled"
1384    )]
1385    #[test_case(
1386        RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack3), None =>
1387        metrics_registry::StackMigrationStateMetricDimensionMigrationState::InProgress;
1388        "in_progress_none"
1389    )]
1390    #[test_case(
1391        RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack3),
1392        Some(rollback::Persisted::HealthcheckFailures(rollback::MAX_FAILED_HEALTHCHECKS - 1)) =>
1393        metrics_registry::StackMigrationStateMetricDimensionMigrationState::InProgress;
1394        "in_progress_some"
1395    )]
1396    #[test_case(
1397        RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack3),
1398        Some(rollback::Persisted::HealthcheckFailures(rollback::MAX_FAILED_HEALTHCHECKS)) =>
1399        metrics_registry::StackMigrationStateMetricDimensionMigrationState::Failed;
1400        "failed_exact"
1401    )]
1402    #[test_case(
1403        RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack3),
1404        Some(rollback::Persisted::HealthcheckFailures(rollback::MAX_FAILED_HEALTHCHECKS + 1)) =>
1405        metrics_registry::StackMigrationStateMetricDimensionMigrationState::Failed;
1406        "failed_more"
1407    )]
1408    #[test_case(
1409        RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack3),
1410        Some(rollback::Persisted::Success) =>
1411        metrics_registry::StackMigrationStateMetricDimensionMigrationState::Success;
1412        "success"
1413    )]
1414    #[test_case(
1415        RollbackNetstackVersion::Netstack3, None, None =>
1416        metrics_registry::StackMigrationStateMetricDimensionMigrationState::Canceled;
1417        "canceled_none"
1418    )]
1419    #[test_case(
1420        RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack2), None =>
1421        metrics_registry::StackMigrationStateMetricDimensionMigrationState::Canceled;
1422        "canceled_ns2"
1423    )]
1424    #[test_case(
1425        RollbackNetstackVersion::ForceNetstack2, Some(NetstackVersion::Netstack3), None =>
1426        metrics_registry::StackMigrationStateMetricDimensionMigrationState::RolledBack;
1427        "rolled_back"
1428    )]
1429    #[fuchsia::test]
1430    fn test_state_metric(
1431        current_boot: RollbackNetstackVersion,
1432        automated: Option<NetstackVersion>,
1433        rollback: Option<rollback::Persisted>,
1434    ) -> metrics_registry::StackMigrationStateMetricDimensionMigrationState {
1435        let mut migration = Migration::new(
1436            InMemory::with_persisted(Persisted { user: None, automated, rollback }),
1437            NoCollaborativeReboot,
1438        );
1439        migration.current_boot = current_boot;
1440        compute_state_metric(&migration)
1441    }
1442
1443    /// An in-memory mock-persistence that triggers an event once the target
1444    /// state has been persisted.
1445    #[derive(Clone)]
1446    struct AwaitPersisted {
1447        file: Rc<RefCell<Option<Vec<u8>>>>,
1448        target: Vec<u8>,
1449        event: Event,
1450    }
1451
1452    impl AwaitPersisted {
1453        fn with_persisted(start: Persisted, target: &Persisted) -> (Self, EventWait) {
1454            let event = Event::new();
1455            let wait = event.wait();
1456            let target_bytes = serde_json::to_vec(target).expect("failed to serialize target");
1457            let mut s = Self { file: Default::default(), target: target_bytes, event };
1458            start.save(s.open_writer().unwrap());
1459            (s, wait)
1460        }
1461    }
1462
1463    impl PersistenceProvider for AwaitPersisted {
1464        type Writer = Self;
1465        type Reader = std::io::Cursor<Vec<u8>>;
1466
1467        fn open_writer(&mut self) -> std::io::Result<Self::Writer> {
1468            *self.file.borrow_mut() = Some(Vec::new());
1469            Ok(self.clone())
1470        }
1471
1472        fn open_reader(&self) -> std::io::Result<Self::Reader> {
1473            self.file
1474                .borrow()
1475                .clone()
1476                .map(std::io::Cursor::new)
1477                .ok_or(std::io::ErrorKind::NotFound.into())
1478        }
1479    }
1480
1481    impl std::io::Write for AwaitPersisted {
1482        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1483            let r = self.file.borrow_mut().as_mut().expect("no file open").write(buf);
1484            if self.file.borrow().as_ref().expect("no_file_open") == &self.target {
1485                let _: bool = self.event.signal();
1486            }
1487            r
1488        }
1489
1490        fn flush(&mut self) -> std::io::Result<()> {
1491            Ok(())
1492        }
1493    }
1494
1495    #[fuchsia::test]
1496    async fn migrate_to_ns3_success() {
1497        let start =
1498            Persisted { user: None, automated: Some(NetstackVersion::Netstack3), rollback: None };
1499        let target = Persisted {
1500            user: None,
1501            automated: Some(NetstackVersion::Netstack3),
1502            rollback: Some(rollback::Persisted::Success),
1503        };
1504
1505        let (persistence, mut wait) = AwaitPersisted::with_persisted(start, &target);
1506        let mut migration = Migration::new(persistence, NoCollaborativeReboot);
1507        // No service requests.
1508        let service_request_stream = futures::stream::pending();
1509        // A health check that always succeeds.
1510        let mock_healthcheck = rollback::testutil::MockHttpRequester(|| {
1511            Ok(fnet_http::Response { error: None, status_code: Some(204), ..Default::default() })
1512        });
1513        let healthcheck_tick = futures::stream::once(futures::future::ready(()));
1514
1515        {
1516            let main_fut = main_inner(
1517                &mut migration,
1518                service_request_stream,
1519                mock_healthcheck,
1520                healthcheck_tick,
1521            )
1522            .fuse();
1523            futures::pin_mut!(main_fut);
1524            futures::select!(
1525                () = main_fut => unreachable!("main fut should never exit"),
1526                () = wait => {}
1527            );
1528        }
1529
1530        assert_eq!(migration.persisted.rollback, Some(rollback::Persisted::Success));
1531    }
1532
1533    #[fuchsia::test]
1534    async fn migrate_to_ns3_fails() {
1535        let start =
1536            Persisted { user: None, automated: Some(NetstackVersion::Netstack3), rollback: None };
1537        let target = Persisted {
1538            user: None,
1539            automated: Some(NetstackVersion::Netstack3),
1540            rollback: Some(rollback::Persisted::HealthcheckFailures(
1541                rollback::MAX_FAILED_HEALTHCHECKS,
1542            )),
1543        };
1544
1545        let (persistence, mut wait) = AwaitPersisted::with_persisted(start, &target);
1546        let mut migration = Migration::new(persistence, FakeCollaborativeReboot::default());
1547        // No service requests.
1548        let service_request_stream = futures::stream::pending();
1549        // A health check that always fails.
1550        let mock_healthcheck = rollback::testutil::MockHttpRequester(|| {
1551            Ok(fnet_http::Response { error: None, status_code: Some(500), ..Default::default() })
1552        });
1553        // Use a non-zero interval so that the Healthcheck code doesn't hog the scheduler.
1554        let healthcheck_tick = fuchsia_async::Interval::new(Duration::from_millis(1).into());
1555
1556        {
1557            let main_fut = main_inner(
1558                &mut migration,
1559                service_request_stream,
1560                mock_healthcheck,
1561                healthcheck_tick,
1562            )
1563            .fuse();
1564            futures::pin_mut!(main_fut);
1565            futures::select!(
1566                () = main_fut => unreachable!("main fut should never exit"),
1567                () = wait => {}
1568            );
1569        }
1570
1571        assert_matches!(
1572            migration.persisted.rollback,
1573            Some(rollback::Persisted::HealthcheckFailures(f)) if
1574            f >= rollback::MAX_FAILED_HEALTHCHECKS
1575        );
1576        // Verify a failed migration schedules a collaborative reboot.
1577        let cr_req = &migration.collaborative_reboot.scheduler.req;
1578        assert_eq!(Ok(false), cr_req.as_ref().expect("there should be a request").is_closed())
1579    }
1580
1581    // Regression test for https://fxbug.dev/395913604.
1582    //
1583    // The original bug would reset the number of failed healthchecks in
1584    // persistence from `rollback::MAX_FAILED_HEALTHCHECKS` to 0, if an inbound
1585    // service request was received.
1586    //
1587    // Verify this is no longer the case by triggering a failed healthcheck,
1588    // pushing the total to `rollback::MAX_FAILED_HEALTHCHECKS`, then sending
1589    // a `fuchsia.net.migration/State.GetNetstackVersion` request.
1590    #[fuchsia::test]
1591    async fn migrate_to_ns3_rollback_regression_test() {
1592        let start = Persisted {
1593            user: None,
1594            automated: Some(NetstackVersion::Netstack3),
1595            rollback: Some(rollback::Persisted::HealthcheckFailures(
1596                rollback::MAX_FAILED_HEALTHCHECKS - 1,
1597            )),
1598        };
1599        let target = Persisted {
1600            user: None,
1601            automated: Some(NetstackVersion::Netstack3),
1602            rollback: Some(rollback::Persisted::HealthcheckFailures(0)),
1603        };
1604
1605        let (persistence, wait) = AwaitPersisted::with_persisted(start, &target);
1606        let mut migration = Migration::new(persistence, FakeCollaborativeReboot::default());
1607        // A health check that always fails.
1608        let mock_healthcheck = rollback::testutil::MockHttpRequester(|| {
1609            Ok(fnet_http::Response { error: None, status_code: Some(500), ..Default::default() })
1610        });
1611        let healthcheck_tick = futures::stream::once(futures::future::ready(()));
1612        // Send "get" requests, to trigger the bug.
1613        let (client, server) =
1614            fidl::endpoints::create_proxy_and_stream::<fnet_migration::StateMarker>();
1615        let service_request_stream = server.map(|r| r.map(ServiceRequest::State));
1616        let client_fut = async move {
1617            // Send multiple get requests, to ensure that at least one would occur after the failed
1618            // healthcheck.
1619            let mut stream = fuchsia_async::Interval::new(Duration::from_millis(1).into());
1620            while let Some(()) = stream.next().await {
1621                let _ =
1622                    client.get_netstack_version().await.expect("failed to get netstack version");
1623            }
1624        }
1625        .fuse();
1626
1627        // If wait were to fire, the bug has occurred. Instead expect a timeout.
1628        // Use 1 second to keep the test runtime short; If CQ has a hiccup and
1629        // pauses execution, we'd see a false negative, which isn't a big deal.
1630        let wait_fut = wait
1631            .map(|()| panic!("unexpectedly observed the persisted healthcheck failures reset to 0"))
1632            .on_timeout(Duration::from_secs(1), || ())
1633            .fuse();
1634
1635        {
1636            let main_fut = main_inner(
1637                &mut migration,
1638                service_request_stream,
1639                mock_healthcheck,
1640                healthcheck_tick,
1641            )
1642            .fuse();
1643            futures::pin_mut!(main_fut);
1644            futures::pin_mut!(client_fut);
1645            futures::pin_mut!(wait_fut);
1646            futures::select!(
1647                () = main_fut => unreachable!("main fut should never exit"),
1648                () = client_fut => unreachable!("client fut should never exit"),
1649                () = wait_fut => {}
1650            );
1651        }
1652    }
1653}