1#![deny(missing_docs)]
6
7pub mod state;
11pub use state::{
12 FailFetchData, FailStageData, FetchFailureReason, PrepareFailureReason, Progress,
13 StageFailureReason, State, StateId, UpdateInfo, UpdateInfoAndProgress,
14};
15
16pub mod options;
17pub use options::{Initiator, Options};
18
19use fidl::endpoints::{ClientEnd, ServerEnd};
20use fidl_fuchsia_update_installer::{
21 InstallerProxy, MonitorMarker, MonitorRequest, MonitorRequestStream, RebootControllerMarker,
22 UpdateNotStartedReason,
23};
24use futures::prelude::*;
25use futures::task::{Context, Poll};
26use log::info;
27use pin_project::pin_project;
28use std::fmt;
29use std::pin::Pin;
30use thiserror::Error;
31
32#[derive(Debug, Error)]
34pub enum UpdateAttemptError {
35 #[error("FIDL error")]
37 FIDL(#[source] fidl::Error),
38
39 #[error("an installation was already in progress")]
41 InstallInProgress,
42}
43
44#[derive(Debug, Error)]
46pub enum MonitorUpdateAttemptError {
47 #[error("FIDL error")]
49 FIDL(#[source] fidl::Error),
50
51 #[error("unable to decode State")]
53 DecodeState(#[source] state::DecodeStateError),
54}
55
56#[pin_project(project = UpdateAttemptProj)]
58#[derive(Debug)]
59pub struct UpdateAttempt {
60 attempt_id: String,
62
63 #[pin]
65 monitor: UpdateAttemptMonitor,
66}
67
68#[pin_project(project = UpdateAttemptMonitorProj)]
70pub struct UpdateAttemptMonitor {
71 #[pin]
73 stream: MonitorRequestStream,
74}
75
76impl UpdateAttempt {
77 pub fn attempt_id(&self) -> &str {
79 &self.attempt_id
80 }
81}
82
83impl UpdateAttemptMonitor {
84 fn new() -> Result<(ClientEnd<MonitorMarker>, Self), fidl::Error> {
85 let (monitor_client_end, stream) =
86 fidl::endpoints::create_request_stream::<MonitorMarker>();
87
88 Ok((monitor_client_end, Self { stream }))
89 }
90
91 pub fn from_stream(stream: MonitorRequestStream) -> Self {
93 Self { stream }
94 }
95}
96
97pub async fn start_update(
100 update_url: &url::Url,
101 options: Options,
102 installer_proxy: &InstallerProxy,
103 reboot_controller_server_end: Option<ServerEnd<RebootControllerMarker>>,
104 signature: Option<&[u8]>,
105) -> Result<UpdateAttempt, UpdateAttemptError> {
106 let url = fidl_fuchsia_pkg::PackageUrl { url: update_url.to_string() };
107 let (monitor_client_end, monitor) =
108 UpdateAttemptMonitor::new().map_err(UpdateAttemptError::FIDL)?;
109
110 let attempt_id = installer_proxy
111 .start_update(
112 &url,
113 &options.into(),
114 monitor_client_end,
115 reboot_controller_server_end,
116 signature,
117 )
118 .await
119 .map_err(UpdateAttemptError::FIDL)?
120 .map_err(|reason| match reason {
121 UpdateNotStartedReason::AlreadyInProgress => UpdateAttemptError::InstallInProgress,
122 })?;
123
124 info!("Update started with attempt id: {}", attempt_id);
125 Ok(UpdateAttempt { attempt_id, monitor })
126}
127
128pub async fn monitor_update(
131 attempt_id: Option<&str>,
132 installer_proxy: &InstallerProxy,
133) -> Result<Option<UpdateAttemptMonitor>, fidl::Error> {
134 let (monitor_client_end, monitor) = UpdateAttemptMonitor::new()?;
135
136 let attached = installer_proxy.monitor_update(attempt_id, monitor_client_end).await?;
137
138 if attached { Ok(Some(monitor)) } else { Ok(None) }
139}
140
141impl fmt::Debug for UpdateAttemptMonitor {
142 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143 f.debug_struct("UpdateAttemptMonitor").field("stream", &"MonitorRequestStream").finish()
144 }
145}
146
147impl Stream for UpdateAttemptMonitor {
148 type Item = Result<State, MonitorUpdateAttemptError>;
149
150 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
151 let UpdateAttemptMonitorProj { stream } = self.project();
152 let poll_res = match stream.poll_next(cx) {
153 Poll::Ready(None) => return Poll::Ready(None),
154 Poll::Ready(Some(res)) => res.map_err(MonitorUpdateAttemptError::FIDL)?,
155 Poll::Pending => return Poll::Pending,
156 };
157 let MonitorRequest::OnState { state, responder } = poll_res;
158 let _ = responder.send();
159 let state = state.try_into().map_err(MonitorUpdateAttemptError::DecodeState)?;
160 Poll::Ready(Some(Ok(state)))
161 }
162}
163
164impl Stream for UpdateAttempt {
165 type Item = Result<State, MonitorUpdateAttemptError>;
166
167 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
168 let UpdateAttemptProj { attempt_id: _, monitor } = self.project();
169 monitor.poll_next(cx)
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176 use assert_matches::assert_matches;
177 use fidl_fuchsia_update_installer::{
178 InstallationProgress, InstallerMarker, InstallerRequest, MonitorProxy,
179 };
180 use fuchsia_async as fasync;
181 use futures::stream::StreamExt;
182
183 const TEST_URL: &str = "fuchsia-pkg://fuchsia.com/update/0";
184
185 impl UpdateAttemptMonitor {
186 fn new_test() -> (TestAttempt, Self) {
189 let (monitor_client_end, monitor) = Self::new().unwrap();
190
191 (TestAttempt::new(monitor_client_end), monitor)
192 }
193 }
194
195 struct TestAttempt {
196 proxy: MonitorProxy,
197 }
198
199 impl TestAttempt {
200 fn new(monitor_client_end: ClientEnd<MonitorMarker>) -> Self {
203 let proxy = monitor_client_end.into_proxy();
204
205 Self { proxy }
206 }
207
208 async fn send_state_and_recv_ack(&mut self, state: State) {
209 self.send_raw_state_and_recv_ack(state.into()).await;
210 }
211
212 async fn send_raw_state_and_recv_ack(
213 &mut self,
214 state: fidl_fuchsia_update_installer::State,
215 ) {
216 let () = self.proxy.on_state(&state).await.unwrap();
217 }
218 }
219
220 #[fasync::run_singlethreaded(test)]
221 async fn update_attempt_monitor_forwards_and_acks_progress() {
222 let (mut send, monitor) = UpdateAttemptMonitor::new_test();
223
224 let expected_fetch_state = &State::Fetch(
225 UpdateInfoAndProgress::builder()
226 .info(UpdateInfo::builder().download_size(1000).build())
227 .progress(Progress::builder().fraction_completed(0.5).bytes_downloaded(500).build())
228 .build(),
229 );
230
231 let client_fut = async move {
232 assert_eq!(
233 monitor.try_collect::<Vec<State>>().await.unwrap(),
234 vec![State::Prepare, expected_fetch_state.clone()]
235 );
236 };
237
238 let server_fut = async move {
239 send.send_state_and_recv_ack(State::Prepare).await;
240 send.send_state_and_recv_ack(expected_fetch_state.clone()).await;
241 };
242
243 future::join(client_fut, server_fut).await;
244 }
245
246 #[fasync::run_singlethreaded(test)]
247 async fn update_attempt_monitor_rejects_invalid_state() {
248 let (mut send, mut monitor) = UpdateAttemptMonitor::new_test();
249
250 let client_fut = async move {
251 assert_matches!(
252 monitor.next().await.unwrap(),
253 Err(MonitorUpdateAttemptError::DecodeState(_))
254 );
255 assert_matches!(monitor.next().await, Some(Ok(State::Prepare)));
256 };
257
258 let server_fut = async move {
259 send.send_raw_state_and_recv_ack(fidl_fuchsia_update_installer::State::Fetch(
260 fidl_fuchsia_update_installer::FetchData {
261 info: Some(fidl_fuchsia_update_installer::UpdateInfo {
262 download_size: None,
263 ..Default::default()
264 }),
265 progress: Some(InstallationProgress {
266 fraction_completed: Some(2.0),
267 bytes_downloaded: None,
268 ..Default::default()
269 }),
270 ..Default::default()
271 },
272 ))
273 .await;
274
275 send.send_state_and_recv_ack(State::Prepare).await;
278 };
279
280 future::join(client_fut, server_fut).await;
281 }
282
283 #[fasync::run_singlethreaded(test)]
284 async fn start_update_forwards_args_and_returns_attempt_id() {
285 let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
286
287 let opts = Options {
288 initiator: Initiator::User,
289 allow_attach_to_existing_attempt: false,
290 should_write_recovery: true,
291 };
292
293 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
294
295 let (_reboot_controller, reboot_controller_server_end) =
296 fidl::endpoints::create_proxy::<RebootControllerMarker>();
297
298 let installer_fut = async move {
299 let returned_update_attempt = start_update(
300 &pkgurl,
301 opts,
302 &proxy,
303 Some(reboot_controller_server_end),
304 Some(&[1; 64]),
305 )
306 .await
307 .unwrap();
308 assert_eq!(
309 returned_update_attempt.attempt_id(),
310 "00000000-0000-0000-0000-000000000001"
311 );
312 };
313
314 let stream_fut = async move {
315 match stream.next().await.unwrap() {
316 Ok(InstallerRequest::StartUpdate {
317 url,
318 options:
319 fidl_fuchsia_update_installer::Options {
320 initiator,
321 should_write_recovery,
322 allow_attach_to_existing_attempt,
323 ..
324 },
325 monitor: _,
326 reboot_controller,
327 signature,
328 responder,
329 }) => {
330 assert_eq!(url.url, TEST_URL);
331 assert_eq!(initiator, Some(fidl_fuchsia_update_installer::Initiator::User));
332 assert_matches!(reboot_controller, Some(_));
333 assert_eq!(should_write_recovery, Some(true));
334 assert_eq!(allow_attach_to_existing_attempt, Some(false));
335 assert_eq!(signature, Some(vec![1; 64]));
336 responder.send(Ok("00000000-0000-0000-0000-000000000001")).unwrap();
337 }
338 request => panic!("Unexpected request: {request:?}"),
339 }
340 };
341 future::join(installer_fut, stream_fut).await;
342 }
343
344 #[fasync::run_singlethreaded(test)]
345 async fn test_install_error() {
346 let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
347
348 let opts = Options {
349 initiator: Initiator::User,
350 allow_attach_to_existing_attempt: false,
351 should_write_recovery: true,
352 };
353
354 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
355
356 let (_reboot_controller, reboot_controller_server_end) =
357 fidl::endpoints::create_proxy::<RebootControllerMarker>();
358
359 let installer_fut = async move {
360 let returned_update_attempt =
361 start_update(&pkgurl, opts, &proxy, Some(reboot_controller_server_end), None)
362 .await
363 .unwrap();
364
365 assert_eq!(
366 returned_update_attempt.try_collect::<Vec<State>>().await.unwrap(),
367 vec![State::FailPrepare(PrepareFailureReason::Internal)]
368 );
369 };
370
371 let stream_fut = async move {
372 match stream.next().await.unwrap() {
373 Ok(InstallerRequest::StartUpdate { monitor, responder, .. }) => {
374 responder.send(Ok("00000000-0000-0000-0000-000000000002")).unwrap();
375
376 let mut attempt = TestAttempt::new(monitor);
377 attempt
378 .send_state_and_recv_ack(State::FailPrepare(PrepareFailureReason::Internal))
379 .await;
380 }
381 request => panic!("Unexpected request: {request:?}"),
382 }
383 };
384 future::join(installer_fut, stream_fut).await;
385 }
386
387 #[fasync::run_singlethreaded(test)]
388 async fn start_update_forwards_fidl_error() {
389 let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
390
391 let opts = Options {
392 initiator: Initiator::User,
393 allow_attach_to_existing_attempt: false,
394 should_write_recovery: true,
395 };
396
397 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
398
399 let installer_fut = async move {
400 match start_update(&pkgurl, opts, &proxy, None, None).await {
401 Err(UpdateAttemptError::FIDL(_)) => {} _ => panic!("Unexpected result"),
403 }
404 };
405 let stream_fut = async move {
406 match stream.next().await.unwrap() {
407 Ok(InstallerRequest::StartUpdate { .. }) => {
408 }
410 request => panic!("Unexpected request: {request:?}"),
411 }
412 };
413 future::join(installer_fut, stream_fut).await;
414 }
415
416 #[fasync::run_singlethreaded(test)]
417 async fn test_state_decode_error() {
418 let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
419
420 let opts = Options {
421 initiator: Initiator::User,
422 allow_attach_to_existing_attempt: false,
423 should_write_recovery: true,
424 };
425
426 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
427
428 let (_reboot_controller, reboot_controller_server_end) =
429 fidl::endpoints::create_proxy::<RebootControllerMarker>();
430
431 let installer_fut = async move {
432 let mut returned_update_attempt =
433 start_update(&pkgurl, opts, &proxy, Some(reboot_controller_server_end), None)
434 .await
435 .unwrap();
436 assert_matches!(
437 returned_update_attempt.next().await,
438 Some(Err(MonitorUpdateAttemptError::DecodeState(
439 state::DecodeStateError::DecodeProgress(
440 state::DecodeProgressError::FractionCompletedOutOfRange
441 )
442 )))
443 );
444 };
445
446 let stream_fut = async move {
447 match stream.next().await.unwrap() {
448 Ok(InstallerRequest::StartUpdate { monitor, responder, .. }) => {
449 responder.send(Ok("00000000-0000-0000-0000-000000000002")).unwrap();
450
451 let mut monitor = TestAttempt::new(monitor);
452 monitor
453 .send_raw_state_and_recv_ack(fidl_fuchsia_update_installer::State::Fetch(
454 fidl_fuchsia_update_installer::FetchData {
455 info: Some(fidl_fuchsia_update_installer::UpdateInfo {
456 download_size: None,
457 ..Default::default()
458 }),
459 progress: Some(InstallationProgress {
460 fraction_completed: Some(2.0),
461 bytes_downloaded: None,
462 ..Default::default()
463 }),
464 ..Default::default()
465 },
466 ))
467 .await;
468 }
469 request => panic!("Unexpected request: {request:?}"),
470 }
471 };
472 future::join(installer_fut, stream_fut).await;
473 }
474
475 #[fasync::run_singlethreaded(test)]
476 async fn test_server_close_unexpectedly() {
477 let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
478
479 let opts = Options {
480 initiator: Initiator::User,
481 allow_attach_to_existing_attempt: false,
482 should_write_recovery: true,
483 };
484
485 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
486
487 let (_reboot_controller, reboot_controller_server_end) =
488 fidl::endpoints::create_proxy::<RebootControllerMarker>();
489
490 let expected_states = vec![
491 State::Prepare,
492 State::Fetch(
493 UpdateInfoAndProgress::builder()
494 .info(UpdateInfo::builder().download_size(0).build())
495 .progress(
496 Progress::builder().fraction_completed(0.0).bytes_downloaded(0).build(),
497 )
498 .build(),
499 ),
500 ];
501
502 let installer_fut = async move {
503 let returned_update_attempt =
504 start_update(&pkgurl, opts, &proxy, Some(reboot_controller_server_end), None)
505 .await
506 .unwrap();
507
508 assert_eq!(
509 returned_update_attempt.try_collect::<Vec<State>>().await.unwrap(),
510 expected_states,
511 );
512 };
513 let stream_fut = async move {
514 match stream.next().await.unwrap() {
515 Ok(InstallerRequest::StartUpdate { monitor, responder, .. }) => {
516 responder.send(Ok("00000000-0000-0000-0000-000000000003")).unwrap();
517
518 let mut monitor = TestAttempt::new(monitor);
519 monitor.send_state_and_recv_ack(State::Prepare).await;
520 monitor
521 .send_raw_state_and_recv_ack(fidl_fuchsia_update_installer::State::Fetch(
522 fidl_fuchsia_update_installer::FetchData {
523 info: Some(fidl_fuchsia_update_installer::UpdateInfo {
524 download_size: None,
525 ..Default::default()
526 }),
527 progress: Some(InstallationProgress {
528 fraction_completed: Some(0.0),
529 bytes_downloaded: None,
530 ..Default::default()
531 }),
532 ..Default::default()
533 },
534 ))
535 .await;
536
537 }
541 request => panic!("Unexpected request: {request:?}"),
542 }
543 };
544 future::join(installer_fut, stream_fut).await;
545 }
546
547 #[fasync::run_singlethreaded(test)]
548 async fn monitor_update_uses_provided_attempt_id() {
549 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
550
551 let client_fut = async move {
552 let _ = monitor_update(Some("id"), &proxy).await;
553 };
554
555 let server_fut = async move {
556 match stream.next().await.unwrap().unwrap() {
557 InstallerRequest::MonitorUpdate { attempt_id, .. } => {
558 assert_eq!(attempt_id.as_deref(), Some("id"));
559 }
560 request => panic!("Unexpected request: {request:?}"),
561 }
562 };
563
564 future::join(client_fut, server_fut).await;
565 }
566
567 #[fasync::run_singlethreaded(test)]
568 async fn monitor_update_handles_no_update_in_progress() {
569 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
570
571 let client_fut = async move {
572 assert_matches!(monitor_update(None, &proxy).await, Ok(None));
573 };
574
575 let server_fut = async move {
576 match stream.next().await.unwrap().unwrap() {
577 InstallerRequest::MonitorUpdate { attempt_id, monitor, responder } => {
578 assert_eq!(attempt_id, None);
579 drop(monitor);
580 responder.send(false).unwrap();
581 }
582 request => panic!("Unexpected request: {request:?}"),
583 }
584 assert_matches!(stream.next().await, None);
585 };
586
587 future::join(client_fut, server_fut).await;
588 }
589
590 #[fasync::run_singlethreaded(test)]
591 async fn monitor_update_forwards_fidl_error() {
592 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
593
594 let client_fut = async move {
595 assert_matches!(monitor_update(None, &proxy).await, Err(_));
596 };
597 let server_fut = async move {
598 match stream.next().await.unwrap() {
599 Ok(InstallerRequest::MonitorUpdate { .. }) => {
600 }
602 request => panic!("Unexpected request: {request:?}"),
603 }
604 };
605 future::join(client_fut, server_fut).await;
606 }
607
608 #[fasync::run_singlethreaded(test)]
609 async fn monitor_update_forwards_and_acks_progress() {
610 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
611
612 let client_fut = async move {
613 let monitor = monitor_update(None, &proxy).await.unwrap().unwrap();
614
615 assert_eq!(
616 monitor.try_collect::<Vec<State>>().await.unwrap(),
617 vec![State::Prepare, State::FailPrepare(PrepareFailureReason::Internal)]
618 );
619 };
620
621 let server_fut = async move {
622 match stream.next().await.unwrap().unwrap() {
623 InstallerRequest::MonitorUpdate { attempt_id, monitor, responder } => {
624 assert_eq!(attempt_id, None);
625 responder.send(true).unwrap();
626 let mut monitor = TestAttempt::new(monitor);
627
628 monitor.send_state_and_recv_ack(State::Prepare).await;
629 monitor
630 .send_state_and_recv_ack(State::FailPrepare(PrepareFailureReason::Internal))
631 .await;
632 }
633 request => panic!("Unexpected request: {request:?}"),
634 }
635 assert_matches!(stream.next().await, None);
636 };
637
638 future::join(client_fut, server_fut).await;
639 }
640}