1use std::time::Duration;
6
7use fidl_fuchsia_net_http as fnet_http;
8use futures::{Stream, StreamExt};
9use log::{debug, info, warn};
10use replace_with::replace_with;
11
12use crate::NetstackVersion;
13
14const HEALTHCHECK_INTERVAL: Duration = Duration::from_secs(5 * 60 * 60);
16
17const HEALTHCHECK_STARTUP_DELAY: Duration = Duration::from_secs(5 * 60);
19
20pub(crate) const MAX_FAILED_HEALTHCHECKS: usize = 5;
23
24const HEALTHCHECK_URL: &str = "https://connectivitycheck.gstatic.com/generate_204";
30
31#[derive(Debug, Copy, Clone, PartialEq, Eq)]
38pub(crate) enum State {
39 Netstack2,
44
45 Checking(usize),
58
59 Canceled(Canceled),
69
70 Success,
77}
78
79#[derive(Debug, Copy, Clone, PartialEq, Eq)]
80pub(crate) enum Canceled {
81 FromChecking(usize),
82 FromSuccess,
83}
84
85impl State {
86 pub(crate) fn new(persisted: Option<Persisted>, current_boot: NetstackVersion) -> Self {
89 match (persisted, current_boot) {
90 (_, NetstackVersion::Netstack2) => State::Netstack2,
91 (None, NetstackVersion::Netstack3) => State::Checking(0),
92 (Some(Persisted::HealthcheckFailures(failures)), NetstackVersion::Netstack3) => {
93 State::Checking(failures + 1)
96 }
97 (Some(Persisted::Success), NetstackVersion::Netstack3) => State::Success,
98 }
99 }
100
101 fn on_desired_version_change(self, desired_version: NetstackVersion) -> Self {
104 let old = self.clone();
105 let new = match (self, desired_version) {
106 (State::Netstack2, _) => self,
107
108 (State::Checking(failures), NetstackVersion::Netstack2) => {
109 State::Canceled(Canceled::FromChecking(failures))
110 }
111 (State::Checking(_), NetstackVersion::Netstack3) => self,
112
113 (State::Success, NetstackVersion::Netstack2) => State::Canceled(Canceled::FromSuccess),
114 (State::Success, NetstackVersion::Netstack3) => self,
115
116 (State::Canceled(_), NetstackVersion::Netstack2) => self,
117 (State::Canceled(inner), NetstackVersion::Netstack3) => match inner {
118 Canceled::FromChecking(failures) => State::Checking(failures),
119 Canceled::FromSuccess => State::Success,
120 },
121 };
122
123 if new != old {
124 info!("on_desired_version_change: Rollback state changed from {old:?} to {new:?}");
125 }
126 new
127 }
128
129 fn on_healthcheck(self, result: HealthcheckResult) -> Self {
131 let old = self.clone();
132 let new = match self {
133 State::Netstack2 | State::Success | State::Canceled(_) => self,
135
136 State::Checking(failures) => match result {
137 HealthcheckResult::Success => State::Success,
138 HealthcheckResult::Failure => State::Checking(failures + 1),
139 },
140 };
141
142 if new != old {
143 info!("on_healthcheck: Rollback state changed from {old:?} to {new:?}");
144 }
145 new
146 }
147
148 fn should_healthcheck(&self) -> bool {
149 match self {
150 State::Checking(_) => true,
151 State::Netstack2 | State::Success | State::Canceled(_) => false,
152 }
153 }
154
155 pub(crate) fn persisted(&self) -> Persisted {
157 match self {
158 State::Netstack2 => Persisted::HealthcheckFailures(0),
159 State::Checking(failures) => Persisted::HealthcheckFailures(*failures),
160
161 State::Success => Persisted::Success,
162 State::Canceled(_) => Persisted::HealthcheckFailures(0),
163 }
164 }
165}
166
167#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
169pub(crate) enum Persisted {
170 HealthcheckFailures(usize),
173
174 Success,
177}
178
179#[cfg_attr(test, derive(Debug, Eq, PartialEq))]
180enum HealthcheckResult {
181 Success,
182 Failure,
183}
184
185pub(crate) trait HttpFetcher {
186 fn fetch(
187 &mut self,
188 request: fnet_http::Request,
189 ) -> impl std::future::Future<Output = fidl::Result<fnet_http::Response>> + Send;
190}
191
192pub(crate) struct FidlHttpFetcher(fnet_http::LoaderProxy);
193
194impl FidlHttpFetcher {
195 pub(crate) fn new() -> Self {
196 let loader = fuchsia_component::client::connect_to_protocol::<fnet_http::LoaderMarker>()
197 .expect("unable to connect to fuchsia.net.http.Loader");
198 FidlHttpFetcher(loader)
199 }
200}
201
202impl HttpFetcher for FidlHttpFetcher {
203 async fn fetch(&mut self, request: fnet_http::Request) -> fidl::Result<fnet_http::Response> {
204 self.0.fetch(request).await
205 }
206}
207
208struct HttpHealthchecker<R> {
209 requester: R,
210}
211
212impl<R> HttpHealthchecker<R>
213where
214 R: HttpFetcher,
215{
216 async fn healthcheck(&mut self) -> HealthcheckResult {
217 let request = fnet_http::Request {
218 url: Some(HEALTHCHECK_URL.to_owned()),
219 method: Some("HEAD".into()),
220 headers: None,
221 body: None,
222 deadline: None,
223 ..Default::default()
224 };
225
226 let resp = match self.requester.fetch(request).await {
227 Ok(r) => r,
228 Err(e) => {
229 warn!("FIDL error while sending HTTP request: {e:?}");
230 return HealthcheckResult::Failure;
231 }
232 };
233
234 if let Some(err) = resp.error {
236 warn!("network error while sending HTTP request: {err:?}");
237 return HealthcheckResult::Failure;
238 }
239
240 match resp.status_code {
241 Some(code) => {
242 if code == 204 {
243 info!("HTTP healthcheck successful");
244 HealthcheckResult::Success
245 } else if code >= 200 && code < 300 {
246 warn!("unexpectedly received non-204 success: {code}");
247 HealthcheckResult::Failure
248 } else {
249 warn!("received non-success status: {code}");
250 HealthcheckResult::Failure
251 }
252 }
253
254 None => {
257 warn!("no status code found");
258 HealthcheckResult::Failure
259 }
260 }
261 }
262}
263
264pub(crate) fn new_healthcheck_stream() -> impl futures::stream::Stream<Item = ()> {
265 futures::stream::once(fuchsia_async::Timer::new(HEALTHCHECK_STARTUP_DELAY))
266 .chain(fuchsia_async::Interval::new(HEALTHCHECK_INTERVAL.into()))
267}
268
269pub(crate) async fn run<H, T>(
274 mut state: State,
275 http_fetcher: H,
276 desired_version_updates: futures::channel::mpsc::UnboundedReceiver<NetstackVersion>,
277 persistance_updates: futures::channel::mpsc::UnboundedSender<Persisted>,
278 healthcheck_tick: T,
279) where
280 H: HttpFetcher,
281 T: Stream<Item = ()> + Unpin,
282{
283 let mut health_checker = HttpHealthchecker { requester: http_fetcher };
284 enum Action {
285 Healthcheck,
286 NewDesiredVersion(NetstackVersion),
287 }
288
289 let mut stream = futures::stream::select(
290 healthcheck_tick.map(|()| Action::Healthcheck),
291 desired_version_updates.map(Action::NewDesiredVersion),
292 );
293
294 while let Some(action) = stream.next().await {
295 match action {
296 Action::Healthcheck => {
297 if state.should_healthcheck() {
298 info!("running healthcheck");
299 let hc_result = health_checker.healthcheck().await;
300 replace_with(&mut state, |state| state.on_healthcheck(hc_result));
301 }
302 }
303 Action::NewDesiredVersion(version) => {
304 debug!("new desired netstack version: {version:?}");
305 replace_with(&mut state, |state| state.on_desired_version_change(version));
306 }
307 }
308
309 persistance_updates.unbounded_send(state.persisted()).unwrap()
310 }
311}
312
313#[cfg(test)]
314pub(crate) mod testutil {
315 use super::*;
316
317 pub(crate) struct MockHttpRequester<F>(pub(crate) F);
318
319 impl<F> HttpFetcher for MockHttpRequester<F>
320 where
321 F: FnMut() -> fidl::Result<fidl_fuchsia_net_http::Response>,
322 {
323 fn fetch(
324 &mut self,
325 _request: fnet_http::Request,
326 ) -> impl futures::future::Future<Output = fidl::Result<fidl_fuchsia_net_http::Response>> + Send
327 {
328 futures::future::ready(self.0())
329 }
330 }
331}
332
333#[cfg(test)]
334mod test {
335 use super::*;
336
337 use assert_matches::assert_matches;
338 use fidl_fuchsia_net_http as fnet_http;
339 use fuchsia_async::Task;
340 use futures::channel::mpsc;
341 use futures::SinkExt;
342 use test_case::test_case;
343
344 use crate::rollback::testutil::MockHttpRequester;
345 use crate::NetstackVersion;
346
347 #[test_case(None, NetstackVersion::Netstack2 => State::Netstack2)]
348 #[test_case(None, NetstackVersion::Netstack3 => State::Checking(0))]
349 #[test_case(
350 Some(Persisted::HealthcheckFailures(10)),
351 NetstackVersion::Netstack2 => State::Netstack2
352 )]
353 #[test_case(
354 Some(Persisted::HealthcheckFailures(10)),
355 NetstackVersion::Netstack3 => State::Checking(11)
356 )]
357 #[test_case(Some(Persisted::Success), NetstackVersion::Netstack2 => State::Netstack2)]
358 #[test_case(Some(Persisted::Success), NetstackVersion::Netstack3 => State::Success)]
359 fn test_state_construction(
360 persisted: Option<Persisted>,
361 current_boot: NetstackVersion,
362 ) -> State {
363 State::new(persisted, current_boot)
364 }
365
366 #[test_case(State::Netstack2, NetstackVersion::Netstack2 => State::Netstack2)]
367 #[test_case(State::Netstack2, NetstackVersion::Netstack3 => State::Netstack2)]
368 #[test_case(
369 State::Checking(1),
370 NetstackVersion::Netstack2 => State::Canceled(Canceled::FromChecking(1)))]
371 #[test_case(
372 State::Checking(1),
373 NetstackVersion::Netstack3 => State::Checking(1))]
374 #[test_case(
375 State::Checking(MAX_FAILED_HEALTHCHECKS+1),
376 NetstackVersion::Netstack2 =>
377 State::Canceled(Canceled::FromChecking(MAX_FAILED_HEALTHCHECKS+1))
378 )]
379 #[test_case(
380 State::Checking(MAX_FAILED_HEALTHCHECKS+1),
381 NetstackVersion::Netstack3 => State::Checking(MAX_FAILED_HEALTHCHECKS+1)
382 )]
383 #[test_case(
384 State::Canceled(Canceled::FromChecking(1)),
385 NetstackVersion::Netstack2 => State::Canceled(Canceled::FromChecking(1)))]
386 #[test_case(
387 State::Canceled(Canceled::FromChecking(1)),
388 NetstackVersion::Netstack3 => State::Checking(1))]
389 #[test_case(
390 State::Canceled(Canceled::FromChecking(MAX_FAILED_HEALTHCHECKS+1)),
391 NetstackVersion::Netstack2 =>
392 State::Canceled(Canceled::FromChecking(MAX_FAILED_HEALTHCHECKS+1))
393 )]
394 #[test_case(
395 State::Canceled(Canceled::FromChecking(MAX_FAILED_HEALTHCHECKS+1)),
396 NetstackVersion::Netstack3 => State::Checking(MAX_FAILED_HEALTHCHECKS+1)
397 )]
398 #[test_case(
399 State::Canceled(Canceled::FromSuccess),
400 NetstackVersion::Netstack2 => State::Canceled(Canceled::FromSuccess))]
401 #[test_case(
402 State::Canceled(Canceled::FromSuccess),
403 NetstackVersion::Netstack3 => State::Success)]
404 #[test_case(
405 State::Success,
406 NetstackVersion::Netstack2 => State::Canceled(Canceled::FromSuccess))]
407 #[test_case(State::Success, NetstackVersion::Netstack3 => State::Success)]
408 fn test_on_desired_version_change(state: State, desired_version: NetstackVersion) -> State {
409 state.on_desired_version_change(desired_version)
410 }
411
412 #[test_case(State::Netstack2, HealthcheckResult::Success => State::Netstack2)]
413 #[test_case(State::Netstack2, HealthcheckResult::Failure => State::Netstack2)]
414 #[test_case(State::Checking(1), HealthcheckResult::Success => State::Success)]
415 #[test_case(State::Checking(1), HealthcheckResult::Failure => State::Checking(2))]
416 #[test_case(
417 State::Checking(MAX_FAILED_HEALTHCHECKS+1),
418 HealthcheckResult::Success => State::Success)]
419 #[test_case(
420 State::Checking(MAX_FAILED_HEALTHCHECKS+1),
421 HealthcheckResult::Failure => State::Checking(MAX_FAILED_HEALTHCHECKS+2))]
422 #[test_case(
423 State::Canceled(Canceled::FromChecking(1)),
424 HealthcheckResult::Success => State::Canceled(Canceled::FromChecking(1)))]
425 #[test_case(
426 State::Canceled(Canceled::FromChecking(1)),
427 HealthcheckResult::Failure => State::Canceled(Canceled::FromChecking(1)))]
428 #[test_case(
429 State::Canceled(Canceled::FromChecking(MAX_FAILED_HEALTHCHECKS+1)),
430 HealthcheckResult::Success =>
431 State::Canceled(Canceled::FromChecking(MAX_FAILED_HEALTHCHECKS+1)))]
432 #[test_case(
433 State::Canceled(Canceled::FromChecking(MAX_FAILED_HEALTHCHECKS+1)),
434 HealthcheckResult::Failure =>
435 State::Canceled(Canceled::FromChecking(MAX_FAILED_HEALTHCHECKS+1)))]
436 #[test_case(
437 State::Canceled(Canceled::FromSuccess),
438 HealthcheckResult::Success => State::Canceled(Canceled::FromSuccess))]
439 #[test_case(
440 State::Canceled(Canceled::FromSuccess),
441 HealthcheckResult::Failure => State::Canceled(Canceled::FromSuccess))]
442 #[test_case(State::Success, HealthcheckResult::Success => State::Success)]
443 #[test_case(State::Success, HealthcheckResult::Failure => State::Success)]
444 fn test_on_healthcheck(state: State, helthcheck_result: HealthcheckResult) -> State {
445 state.on_healthcheck(helthcheck_result)
446 }
447
448 #[test_case(State::Netstack2 => false)]
449 #[test_case(State::Checking(1) => true)]
450 #[test_case(State::Checking(MAX_FAILED_HEALTHCHECKS+1) => true)]
451 #[test_case(State::Canceled(Canceled::FromChecking(1)) => false)]
452 #[test_case(State::Canceled(Canceled::FromSuccess) => false)]
453 #[test_case(State::Success => false)]
454 fn test_should_healthcheck(state: State) -> bool {
455 state.should_healthcheck()
456 }
457
458 #[test_case(
459 State::Netstack2 =>
460 Persisted::HealthcheckFailures(0))]
461 #[test_case(
462 State::Checking(1) =>
463 Persisted::HealthcheckFailures(1))]
464 #[test_case(
465 State::Checking(MAX_FAILED_HEALTHCHECKS+1) =>
466 Persisted::HealthcheckFailures(MAX_FAILED_HEALTHCHECKS+1))]
467 #[test_case(
468 State::Canceled(Canceled::FromChecking(1)) =>
469 Persisted::HealthcheckFailures(0))]
470 #[test_case(
471 State::Canceled(Canceled::FromChecking(MAX_FAILED_HEALTHCHECKS+1)) =>
472 Persisted::HealthcheckFailures(0))]
473 #[test_case(
474 State::Canceled(Canceled::FromSuccess) =>
475 Persisted::HealthcheckFailures(0))]
476 #[test_case(
477 State::Success =>
478 Persisted::Success)]
479 fn test_persisted(state: State) -> Persisted {
480 state.persisted()
481 }
482
483 #[test_case(
484 || {
485 Ok(fnet_http::Response{error: None, status_code: Some(204), ..Default::default()})
486 } => HealthcheckResult::Success;
487 "success"
488 )]
489 #[test_case(
490 || {
491 Err(fidl::Error::Invalid)
492 } => HealthcheckResult::Failure;
493 "failure fidl error")]
494 #[test_case(
495 || {
496 Ok(fnet_http::Response{error: Some(fnet_http::Error::Internal), ..Default::default()})
497 } => HealthcheckResult::Failure;
498 "failure http error")]
499 #[test_case(
500 || {
501 Ok(fnet_http::Response{error: None, status_code: Some(200), ..Default::default()})
502 } => HealthcheckResult::Failure;
503 "failure 200")]
504 #[test_case(
505 || {
506 Ok(fnet_http::Response{error: None, status_code: Some(404), ..Default::default()})
507 } => HealthcheckResult::Failure;
508 "failure 404")]
509 #[test_case(
510 || {
511 Ok(fnet_http::Response{error: None, status_code: None, ..Default::default()})
512 } => HealthcheckResult::Failure;
513 "failure no status")]
514 #[fuchsia::test]
515 async fn test_healthchecker(
516 response: impl FnMut() -> fidl::Result<fnet_http::Response>,
517 ) -> HealthcheckResult {
518 let r = MockHttpRequester(response);
519 HttpHealthchecker { requester: r }.healthcheck().await
520 }
521
522 #[fuchsia::test]
523 async fn test_healthcheck_fails_then_succeeds() {
524 let mut n = 0;
525 let r = MockHttpRequester(move || {
526 n += 1;
527 if n <= MAX_FAILED_HEALTHCHECKS {
528 Ok(fnet_http::Response {
529 error: None,
530 status_code: Some(500),
531 ..Default::default()
532 })
533 } else {
534 Ok(fnet_http::Response {
535 error: None,
536 status_code: Some(204),
537 ..Default::default()
538 })
539 }
540 });
541
542 let state = State::Checking(0);
543 let (mut healthcheck_timer_sender, healthcheck_timer_receiver) = mpsc::unbounded();
544 let (mut desired_version_sender, desired_version_receiver) = mpsc::unbounded();
545 let (persistence_sender, mut persistence_receiver) = mpsc::unbounded();
546
547 let task = Task::spawn(super::run(
548 state,
549 r,
550 desired_version_receiver,
551 persistence_sender,
552 healthcheck_timer_receiver,
553 ));
554
555 for i in 1..=MAX_FAILED_HEALTHCHECKS - 1 {
556 healthcheck_timer_sender.send(()).await.unwrap();
557 let n = assert_matches!(
558 persistence_receiver.next().await.unwrap(),
559 Persisted::HealthcheckFailures(n) => n
560 );
561 assert_eq!(n, i);
562 }
563 healthcheck_timer_sender.send(()).await.unwrap();
564 let n = assert_matches!(
565 persistence_receiver.next().await.unwrap(),
566 Persisted::HealthcheckFailures(n) => n
567 );
568 assert_eq!(n, MAX_FAILED_HEALTHCHECKS);
569
570 desired_version_sender.send(NetstackVersion::Netstack2).await.unwrap();
571 assert_matches!(
572 persistence_receiver.next().await.unwrap(),
573 Persisted::HealthcheckFailures(0)
574 );
575
576 healthcheck_timer_sender.send(()).await.unwrap();
579 assert_matches!(
580 persistence_receiver.next().await.unwrap(),
581 Persisted::HealthcheckFailures(0)
582 );
583
584 desired_version_sender.send(NetstackVersion::Netstack3).await.unwrap();
585 let n = assert_matches!(
586 persistence_receiver.next().await.unwrap(),
587 Persisted::HealthcheckFailures(n) => n
588 );
589 assert_eq!(n, MAX_FAILED_HEALTHCHECKS);
590
591 healthcheck_timer_sender.send(()).await.unwrap();
592 assert_matches!(persistence_receiver.next().await.unwrap(), Persisted::Success);
593
594 drop(healthcheck_timer_sender);
596 drop(desired_version_sender);
597 task.await;
598 }
599}