1use anyhow::Context;
6use chrono::{Datelike, TimeZone, Timelike};
7use fidl::endpoints::ServerEnd;
8use fidl_fuchsia_hardware_rtc::{DeviceRequest, DeviceRequestStream};
9use fidl_fuchsia_metrics::MetricEvent;
10use fidl_fuchsia_metrics_test::{
11 LogMethod, MetricEventLoggerQuerierMarker, MetricEventLoggerQuerierProxy,
12};
13use fidl_fuchsia_testing::{
14 FakeClockControlMarker, FakeClockControlProxy, FakeClockMarker, FakeClockProxy,
15};
16use fidl_fuchsia_time::{MaintenanceRequest, MaintenanceRequestStream};
17use fidl_fuchsia_time_external::{PushSourceMarker, Status, TimeSample};
18use fidl_test_time::{TimeSourceControlRequest, TimeSourceControlRequestStream};
19use fuchsia_component::server::ServiceFs;
20use fuchsia_component_test::{
21 Capability, ChildOptions, ChildRef, LocalComponentHandles, RealmBuilder, RealmInstance, Ref,
22 Route,
23};
24use fuchsia_sync::Mutex;
25use futures::channel::mpsc::Sender;
26use futures::stream::{Stream, StreamExt, TryStreamExt};
27use futures::{Future, FutureExt, SinkExt};
28use lazy_static::lazy_static;
29use push_source::{PushSource, TestUpdateAlgorithm, Update};
30use std::ops::Deref;
31use std::sync::Arc;
32use time_metrics_registry::PROJECT_ID;
33use vfs::pseudo_directory;
34use zx::{self as zx, HandleBased, Rights};
35use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
36
37const TIMEKEEPER_URL: &str = "#meta/timekeeper_for_integration.cm";
39const TIMEKEEPER_FAKE_TIME_URL: &str = "#meta/timekeeper_with_fake_time.cm";
41const COBALT_URL: &str = "#meta/fake_cobalt.cm";
43const FAKE_CLOCK_URL: &str = "#meta/fake_clock.cm";
45
46pub struct NestedTimekeeper {
49 _realm_instance: RealmInstance,
50}
51
52impl Into<RealmInstance> for NestedTimekeeper {
53 fn into(self) -> RealmInstance {
55 self._realm_instance
56 }
57}
58
59impl NestedTimekeeper {
60 pub async fn new(
73 clock: Arc<zx::Clock>,
74 rtc_options: RtcOptions,
75 use_fake_clock: bool,
76 ) -> (
77 Self,
78 Arc<PushSourcePuppet>,
79 RtcUpdates,
80 MetricEventLoggerQuerierProxy,
81 Option<FakeClockController>,
82 ) {
83 let push_source_puppet = Arc::new(PushSourcePuppet::new());
84
85 let builder = RealmBuilder::new().await.unwrap();
86 let fake_cobalt =
87 builder.add_child("fake_cobalt", COBALT_URL, ChildOptions::new()).await.unwrap();
88
89 let timekeeper_url = if use_fake_clock { TIMEKEEPER_FAKE_TIME_URL } else { TIMEKEEPER_URL };
90 log::trace!("using timekeeper_url: {}", timekeeper_url);
91 let timekeeper = builder
92 .add_child("timekeeper_test", timekeeper_url, ChildOptions::new().eager())
93 .await
94 .with_context(|| format!("while starting up timekeeper_test from: {timekeeper_url}"))
95 .unwrap();
96
97 let timesource_server = builder
98 .add_local_child(
99 "timesource_mock",
100 {
101 let push_source_puppet = Arc::clone(&push_source_puppet);
102 move |handles: LocalComponentHandles| {
103 Box::pin(timesource_mock_server(handles, Arc::clone(&push_source_puppet)))
104 }
105 },
106 ChildOptions::new(),
107 )
108 .await
109 .context("while starting up timesource_mock")
110 .unwrap();
111
112 let maintenance_server = builder
113 .add_local_child(
114 "maintenance_mock",
115 move |handles: LocalComponentHandles| {
116 Box::pin(maintenance_mock_server(handles, Arc::clone(&clock)))
117 },
118 ChildOptions::new(),
119 )
120 .await
121 .context("while starting up maintenance_mock")
122 .unwrap();
123
124 if use_fake_clock {
126 let fake_clock =
127 builder.add_child("fake_clock", FAKE_CLOCK_URL, ChildOptions::new()).await.unwrap();
128
129 builder
130 .add_route(
131 Route::new()
132 .capability(Capability::protocol_by_name(
133 "fuchsia.testing.FakeClockControl",
134 ))
135 .from(&fake_clock)
136 .to(Ref::parent()),
137 )
138 .await
139 .context("while setting up FakeClockControl")
140 .unwrap();
141
142 builder
143 .add_route(
144 Route::new()
145 .capability(Capability::protocol_by_name("fuchsia.testing.FakeClock"))
146 .from(&fake_clock)
147 .to(Ref::parent())
148 .to(&timekeeper),
149 )
150 .await
151 .context("while setting up FakeClock")
152 .unwrap();
153
154 builder
155 .add_route(
156 Route::new()
157 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
158 .from(Ref::parent())
159 .to(&fake_clock),
160 )
161 .await
162 .context("while setting up LogSink")
163 .unwrap();
164 };
165
166 builder
167 .add_route(
168 Route::new()
169 .capability(Capability::protocol_by_name("fuchsia.time.Maintenance"))
170 .from(&maintenance_server)
171 .to(&timekeeper),
172 )
173 .await
174 .context("while setting up Maintenance")
175 .unwrap();
176
177 builder
178 .add_route(
179 Route::new()
180 .capability(Capability::protocol_by_name("test.time.TimeSourceControl"))
181 .from(×ource_server)
182 .to(&timekeeper),
183 )
184 .await
185 .unwrap();
186
187 builder
188 .add_route(
189 Route::new()
190 .capability(Capability::protocol_by_name(
191 "fuchsia.metrics.test.MetricEventLoggerQuerier",
192 ))
193 .from(&fake_cobalt)
194 .to(Ref::parent()),
195 )
196 .await
197 .unwrap();
198
199 builder
200 .add_route(
201 Route::new()
202 .capability(Capability::protocol_by_name(
203 "fuchsia.metrics.MetricEventLoggerFactory",
204 ))
205 .from(&fake_cobalt)
206 .to(&timekeeper),
207 )
208 .await
209 .unwrap();
210
211 builder
212 .add_route(
213 Route::new()
214 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
215 .from(Ref::parent())
216 .to(&fake_cobalt)
217 .to(&timekeeper)
218 .to(×ource_server)
219 .to(&maintenance_server),
220 )
221 .await
222 .unwrap();
223
224 builder
225 .add_route(
226 Route::new()
227 .capability(Capability::configuration("fuchsia.time.config.WritableUTCTime"))
228 .from(Ref::parent())
229 .to(&timekeeper),
230 )
231 .await
232 .unwrap();
233
234 let rtc_updates = setup_rtc(rtc_options, &builder, &timekeeper).await;
235 let realm_instance = builder.build().await.unwrap();
236
237 let fake_clock_control = if use_fake_clock {
238 let control_proxy = realm_instance
239 .root
240 .connect_to_protocol_at_exposed_dir::<FakeClockControlMarker>()
241 .unwrap();
242 let clock_proxy = realm_instance
243 .root
244 .connect_to_protocol_at_exposed_dir::<FakeClockMarker>()
245 .unwrap();
246 Some(FakeClockController { control_proxy, clock_proxy })
247 } else {
248 None
249 };
250
251 let cobalt_querier = realm_instance
252 .root
253 .connect_to_protocol_at_exposed_dir::<MetricEventLoggerQuerierMarker>()
254 .expect("the connection succeeds");
255
256 let nested_timekeeper = Self { _realm_instance: realm_instance };
257
258 (nested_timekeeper, push_source_puppet, rtc_updates, cobalt_querier, fake_clock_control)
259 }
260}
261
262pub struct RemotePushSourcePuppet {
263 proxy: fidl_test_time_realm::PushSourcePuppetProxy,
264}
265
266impl RemotePushSourcePuppet {
267 pub fn new(proxy: fidl_test_time_realm::PushSourcePuppetProxy) -> Arc<Self> {
269 Arc::new(Self { proxy })
270 }
271
272 pub async fn set_sample(&self, sample: TimeSample) {
274 self.proxy.set_sample(&sample).await.expect("original API was infallible");
275 }
276
277 pub async fn set_status(&self, status: Status) {
279 self.proxy.set_status(status).await.expect("original API was infallible");
280 }
281
282 pub async fn simulate_crash(&self) {
284 self.proxy.crash().await.expect("original local API was infallible");
285 }
286
287 pub async fn lifetime_served_connections(&self) -> u32 {
290 self.proxy.get_lifetime_served_connections().await.expect("original API was infallible")
291 }
292}
293
294pub struct PushSourcePuppet {
296 inner: Mutex<PushSourcePuppetInner>,
299 cumulative_clients: Mutex<u32>,
301}
302
303impl PushSourcePuppet {
304 fn new() -> Self {
306 Self { inner: Mutex::new(PushSourcePuppetInner::new()), cumulative_clients: Mutex::new(0) }
307 }
308
309 fn serve_client(&self, server_end: ServerEnd<PushSourceMarker>) {
311 log::debug!("serve_client entry");
312 let mut inner = self.inner.lock();
313 if inner.served_client() {
317 *inner = PushSourcePuppetInner::new();
318 }
319 inner.serve_client(server_end);
320 *self.cumulative_clients.lock() += 1;
321 }
322
323 pub async fn set_sample(&self, sample: TimeSample) {
325 let mut sink = self.inner.lock().get_sink();
326 sink.send(sample.into()).await.unwrap();
327 }
328
329 pub async fn set_status(&self, status: Status) {
331 let mut sink = self.inner.lock().get_sink();
332 sink.send(status.into()).await.unwrap();
333 }
334
335 pub fn simulate_crash(&self) {
337 *self.inner.lock() = PushSourcePuppetInner::new();
338 }
340
341 pub fn lifetime_served_connections(&self) -> u32 {
344 *self.cumulative_clients.lock()
345 }
346}
347
348struct PushSourcePuppetInner {
351 push_source: Arc<PushSource<TestUpdateAlgorithm>>,
352 tasks: Vec<fasync::Task<()>>,
354 update_sink: Sender<Update>,
356}
357
358impl PushSourcePuppetInner {
359 fn new() -> Self {
360 let (update_algorithm, update_sink) = TestUpdateAlgorithm::new();
361 let push_source = Arc::new(PushSource::new(update_algorithm, Status::Ok).unwrap());
362 let push_source_clone = Arc::clone(&push_source);
363 let tasks = vec![fasync::Task::spawn(async move {
364 push_source_clone.poll_updates().await.unwrap();
365 })];
366 Self { push_source, tasks, update_sink }
367 }
368
369 fn served_client(&self) -> bool {
371 self.tasks.len() > 1
372 }
373
374 fn serve_client(&mut self, server_end: ServerEnd<PushSourceMarker>) {
376 let push_source_clone = Arc::clone(&self.push_source);
377 self.tasks.push(fasync::Task::spawn(async move {
378 push_source_clone.handle_requests_for_stream(server_end.into_stream()).await.unwrap();
379 }));
380 }
381
382 fn get_sink(&self) -> Sender<Update> {
387 self.update_sink.clone()
388 }
389}
390
391#[derive(Clone, Debug)]
393pub struct RtcUpdates(Arc<Mutex<Vec<fidl_fuchsia_hardware_rtc::Time>>>);
394
395impl RtcUpdates {
396 pub fn to_vec(&self) -> Vec<fidl_fuchsia_hardware_rtc::Time> {
398 self.0.lock().clone()
399 }
400}
401
402pub struct RemoteRtcUpdates {
405 proxy: fidl_test_time_realm::RtcUpdatesProxy,
406}
407
408impl RemoteRtcUpdates {
409 pub async fn to_vec(&self) -> Vec<fidl_fuchsia_hardware_rtc::Time> {
410 self.proxy
411 .get(fidl_test_time_realm::GetRequest::default())
412 .await
413 .expect("no errors or overflows") .unwrap()
415 .0
416 }
417 pub fn new(proxy: fidl_test_time_realm::RtcUpdatesProxy) -> Self {
418 RemoteRtcUpdates { proxy }
419 }
420}
421
422pub struct FakeClockController {
425 control_proxy: FakeClockControlProxy,
426 clock_proxy: FakeClockProxy,
427}
428
429impl Deref for FakeClockController {
430 type Target = FakeClockControlProxy;
431
432 fn deref(&self) -> &Self::Target {
433 &self.control_proxy
434 }
435}
436
437impl FakeClockController {
438 pub fn new(control_proxy: FakeClockControlProxy, clock_proxy: FakeClockProxy) -> Self {
440 FakeClockController { control_proxy, clock_proxy }
441 }
442
443 pub fn into_components(self) -> (FakeClockControlProxy, FakeClockProxy) {
445 (self.control_proxy, self.clock_proxy)
446 }
447
448 pub async fn get_monotonic(&self) -> Result<i64, fidl::Error> {
449 self.clock_proxy.get().await
450 }
451
452 pub async fn get_reference(&self) -> Result<zx::BootInstant, fidl::Error> {
454 self.get_monotonic().await.map(|v| zx::BootInstant::from_nanos(v))
455 }
456}
457
458pub enum RtcOptions {
460 None,
463 InitialRtcTime(zx::SyntheticInstant),
465 InjectedRtc(fio::DirectoryProxy),
481}
482
483impl From<fidl_test_time_realm::RtcOptions> for RtcOptions {
484 fn from(value: fidl_test_time_realm::RtcOptions) -> Self {
485 match value {
486 fidl_test_time_realm::RtcOptions::DevClassRtc(h) => {
487 RtcOptions::InjectedRtc(h.into_proxy())
488 }
489 fidl_test_time_realm::RtcOptions::InitialRtcTime(t) => {
490 RtcOptions::InitialRtcTime(zx::SyntheticInstant::from_nanos(t))
491 }
492 _ => unimplemented!(),
493 }
494 }
495}
496
497impl From<zx::SyntheticInstant> for RtcOptions {
498 fn from(value: zx::SyntheticInstant) -> Self {
499 RtcOptions::InitialRtcTime(value)
500 }
501}
502
503impl From<Option<zx::SyntheticInstant>> for RtcOptions {
504 fn from(value: Option<zx::SyntheticInstant>) -> Self {
505 value.map(|t| t.into()).unwrap_or(Self::None)
506 }
507}
508
509async fn setup_rtc(
520 rtc_options: RtcOptions,
521 builder: &RealmBuilder,
522 timekeeper: &ChildRef,
523) -> RtcUpdates {
524 let rtc_updates = RtcUpdates(Arc::new(Mutex::new(vec![])));
525
526 let rtc_dir = match rtc_options {
527 RtcOptions::InitialRtcTime(initial_time) => {
528 log::debug!("using fake /dev/class/rtc/000");
529 pseudo_directory! {
530 "class" => pseudo_directory! {
531 "rtc" => pseudo_directory! {
532 "000" => vfs::service::host({
533 let rtc_updates = rtc_updates.clone();
534 move |stream| {
535 serve_fake_rtc(initial_time, rtc_updates.clone(), stream)
536 }
537 })
538 }
539 }
540 }
541 }
542 RtcOptions::None => {
543 log::debug!("using an empty /dev/class/rtc directory");
544 pseudo_directory! {
545 "class" => pseudo_directory! {
546 "rtc" => pseudo_directory! {
547 }
548 }
549 }
550 }
551 RtcOptions::InjectedRtc(h) => {
552 log::debug!("using /dev/class/rtc provided by client");
553 pseudo_directory! {
554 "class" => pseudo_directory! {
555 "rtc" => vfs::remote::remote_dir(h)
556 }
557 }
558 }
559 };
560
561 let fake_rtc_server = builder
562 .add_local_child(
563 "fake_rtc",
564 {
565 move |handles| {
566 let rtc_dir = rtc_dir.clone();
567 async move {
568 let _ = &handles;
569 let mut fs = ServiceFs::new();
570 fs.add_remote("dev", vfs::directory::serve_read_only(rtc_dir));
571 fs.serve_connection(handles.outgoing_dir)
572 .expect("failed to serve fake RTC ServiceFs");
573 fs.collect::<()>().await;
574 Ok(())
575 }
576 .boxed()
577 }
578 },
579 ChildOptions::new().eager(),
580 )
581 .await
582 .unwrap();
583
584 builder
585 .add_route(
586 Route::new()
587 .capability(
588 Capability::directory("dev-rtc")
589 .path("/dev/class/rtc")
590 .rights(fio::R_STAR_DIR),
591 )
592 .from(&fake_rtc_server)
593 .to(&*timekeeper),
594 )
595 .await
596 .unwrap();
597
598 rtc_updates
599}
600
601async fn serve_fake_rtc(
602 initial_time: zx::SyntheticInstant,
603 rtc_updates: RtcUpdates,
604 mut stream: DeviceRequestStream,
605) {
606 while let Some(req) = stream.try_next().await.unwrap() {
607 match req {
608 DeviceRequest::Get { responder } => {
609 log::debug!("serve_fake_rtc: DeviceRequest::Get");
610 responder.send(Ok(&zx_time_to_rtc_time(initial_time))).unwrap();
613 }
614 DeviceRequest::Set { rtc, responder } => {
615 log::debug!("serve_fake_rtc: DeviceRequest::Set");
616 rtc_updates.0.lock().push(rtc);
617 responder.send(zx::Status::OK.into_raw()).unwrap();
618 }
619 DeviceRequest::Set2 { rtc, responder } => {
620 log::debug!("serve_fake_rtc: DeviceRequest::Set2");
621 rtc_updates.0.lock().push(rtc);
622 responder.send(Ok(())).unwrap();
623 }
624 DeviceRequest::_UnknownMethod { .. } => {}
625 }
626 }
627}
628
629async fn serve_test_control(puppet: &PushSourcePuppet, stream: TimeSourceControlRequestStream) {
630 stream
631 .try_for_each_concurrent(None, |req| async {
632 let _ = &req;
633 let TimeSourceControlRequest::ConnectPushSource { push_source, .. } = req;
634 puppet.serve_client(push_source);
635 Ok(())
636 })
637 .await
638 .unwrap();
639}
640
641async fn serve_maintenance(clock_handle: Arc<zx::Clock>, mut stream: MaintenanceRequestStream) {
642 while let Some(req) = stream.try_next().await.unwrap() {
643 let MaintenanceRequest::GetWritableUtcClock { responder } = req;
644 responder.send(clock_handle.duplicate_handle(Rights::SAME_RIGHTS).unwrap()).unwrap();
645 }
646}
647
648async fn timesource_mock_server(
649 handles: LocalComponentHandles,
650 push_source_puppet: Arc<PushSourcePuppet>,
651) -> Result<(), anyhow::Error> {
652 let mut fs = ServiceFs::new();
653 let mut tasks = vec![];
654
655 fs.dir("svc").add_fidl_service(move |stream: TimeSourceControlRequestStream| {
656 let puppet_clone = Arc::clone(&push_source_puppet);
657
658 tasks.push(fasync::Task::local(async move {
659 serve_test_control(&*puppet_clone, stream).await;
660 }));
661 });
662
663 fs.serve_connection(handles.outgoing_dir)?;
664 fs.collect::<()>().await;
665
666 Ok(())
667}
668
669async fn maintenance_mock_server(
670 handles: LocalComponentHandles,
671 clock: Arc<zx::Clock>,
672) -> Result<(), anyhow::Error> {
673 let mut fs = ServiceFs::new();
674 let mut tasks = vec![];
675
676 fs.dir("svc").add_fidl_service(move |stream: MaintenanceRequestStream| {
677 let clock_clone = Arc::clone(&clock);
678
679 tasks.push(fasync::Task::local(async move {
680 serve_maintenance(clock_clone, stream).await;
681 }));
682 });
683
684 fs.serve_connection(handles.outgoing_dir)?;
685 fs.collect::<()>().await;
686
687 Ok(())
688}
689
690fn from_rfc2822(date: &str) -> zx::SyntheticInstant {
691 zx::SyntheticInstant::from_nanos(
692 chrono::DateTime::parse_from_rfc2822(date).unwrap().timestamp_nanos_opt().unwrap(),
693 )
694}
695
696lazy_static! {
697 pub static ref BACKSTOP_TIME: zx::SyntheticInstant =
698 from_rfc2822("Sun, 20 Sep 2020 01:01:01 GMT");
699 pub static ref VALID_RTC_TIME: zx::SyntheticInstant =
700 from_rfc2822("Sun, 20 Sep 2020 02:02:02 GMT");
701 pub static ref BEFORE_BACKSTOP_TIME: zx::SyntheticInstant =
702 from_rfc2822("Fri, 06 Mar 2020 04:04:04 GMT");
703 pub static ref VALID_TIME: zx::SyntheticInstant = from_rfc2822("Tue, 29 Sep 2020 02:19:01 GMT");
704 pub static ref VALID_TIME_2: zx::SyntheticInstant =
705 from_rfc2822("Wed, 30 Sep 2020 14:59:59 GMT");
706}
707
708pub const BETWEEN_SAMPLES: zx::BootDuration = zx::BootDuration::from_seconds(5);
710
711pub const STD_DEV: zx::BootDuration = zx::BootDuration::from_millis(50);
713
714pub fn new_clock() -> Arc<zx::SyntheticClock> {
717 Arc::new(new_nonshareable_clock())
718}
719
720pub fn new_nonshareable_clock() -> zx::SyntheticClock {
722 zx::SyntheticClock::create(zx::ClockOpts::empty(), Some(*BACKSTOP_TIME)).unwrap()
723}
724
725fn zx_time_to_rtc_time(zx_time: zx::SyntheticInstant) -> fidl_fuchsia_hardware_rtc::Time {
726 let date = chrono::Utc.timestamp_nanos(zx_time.into_nanos());
727 fidl_fuchsia_hardware_rtc::Time {
728 seconds: date.second() as u8,
729 minutes: date.minute() as u8,
730 hours: date.hour() as u8,
731 day: date.day() as u8,
732 month: date.month() as u8,
733 year: date.year() as u16,
734 }
735}
736
737pub fn rtc_time_to_zx_time(rtc_time: fidl_fuchsia_hardware_rtc::Time) -> zx::SyntheticInstant {
738 let date = chrono::Utc
739 .with_ymd_and_hms(
740 rtc_time.year as i32,
741 rtc_time.month as u32,
742 rtc_time.day as u32,
743 rtc_time.hours as u32,
744 rtc_time.minutes as u32,
745 rtc_time.seconds as u32,
746 )
747 .unwrap();
748 zx::SyntheticInstant::from_nanos(date.timestamp_nanos_opt().unwrap())
749}
750
751pub fn create_cobalt_event_stream(
753 proxy: Arc<MetricEventLoggerQuerierProxy>,
754 log_method: LogMethod,
755) -> std::pin::Pin<Box<dyn Stream<Item = MetricEvent>>> {
756 async_utils::hanging_get::client::HangingGetStream::new(proxy, move |p| {
757 p.watch_logs(PROJECT_ID, log_method)
758 })
759 .map(|res| futures::stream::iter(res.expect("there should be a valid result here").0))
760 .flatten()
761 .boxed()
762}
763
764#[macro_export]
766macro_rules! poll_until_some {
767 ($condition:expr) => {
768 $crate::poll_until_some_impl(
769 $condition,
770 &$crate::SourceLocation::new(file!(), line!(), column!()),
771 )
772 };
773}
774
775#[macro_export]
778macro_rules! poll_until_some_async {
779 ($condition:expr) => {{
780 let loc = $crate::SourceLocation::new(file!(), line!(), column!());
781 log::info!("=> poll_until_some_async() for {}", &loc);
782 let mut result = None;
783 loop {
784 result = $condition.await;
785 if result.is_some() {
786 break;
787 }
788 fasync::Timer::new(fasync::MonotonicInstant::after($crate::RETRY_WAIT_DURATION)).await;
789 }
790 log::info!("=> poll_until_some_async() done for {}", &loc);
791 result.expect("we loop around while result is None")
792 }};
793}
794
795#[macro_export]
798macro_rules! poll_until_async {
799 ($condition:expr) => {
800 $crate::poll_until_async_impl(
801 $condition,
802 &$crate::SourceLocation::new(file!(), line!(), column!()),
803 )
804 };
805}
806
807#[macro_export]
809macro_rules! poll_until_async_2 {
810 ($condition:expr) => {{
811 let loc = $crate::SourceLocation::new(file!(), line!(), column!());
812 log::info!("=> poll_until_async() for {}", &loc);
813 let mut result = true;
814 loop {
815 result = $condition.await;
816 if result {
817 break;
818 }
819 fasync::Timer::new(fasync::MonotonicInstant::after($crate::RETRY_WAIT_DURATION)).await;
820 }
821 log::info!("=> poll_until_async_2() done for {}", &loc);
822 result
823 }};
824}
825
826#[macro_export]
828macro_rules! poll_until {
829 ($condition:expr) => {
830 $crate::poll_until_impl(
831 $condition,
832 &$crate::SourceLocation::new(file!(), line!(), column!()),
833 )
834 };
835}
836
837pub const RETRY_WAIT_DURATION: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(10);
839
840pub struct SourceLocation {
841 file: &'static str,
842 line: u32,
843 column: u32,
844}
845
846impl std::fmt::Display for SourceLocation {
847 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
848 write!(f, "(file: {}, line: {}, column: {})", self.file, self.line, self.column)
849 }
850}
851
852impl SourceLocation {
853 pub fn new(file: &'static str, line: u32, column: u32) -> Self {
854 Self { file, line, column }
855 }
856}
857
858pub async fn poll_until_some_impl<T, F>(poll_fn: F, loc: &SourceLocation) -> T
860where
861 F: Fn() -> Option<T>,
862{
863 log::info!("=> poll_until_some() for {}", loc);
864 loop {
865 match poll_fn() {
866 Some(value) => {
867 log::info!("<= poll_until_some() for {}", loc);
868 return value;
869 }
870 None => fasync::Timer::new(fasync::MonotonicInstant::after(RETRY_WAIT_DURATION)).await,
871 }
872 }
873}
874
875pub async fn poll_until_async_impl<F, Fut>(poll_fn: F, loc: &SourceLocation)
877where
878 F: Fn() -> Fut,
879 Fut: Future<Output = bool>,
880{
881 log::info!("=> poll_until_async() for {}", loc);
882 while !poll_fn().await {
883 fasync::Timer::new(fasync::MonotonicInstant::after(RETRY_WAIT_DURATION)).await
884 }
885 log::info!("<= poll_until_async() for {}", loc);
886}
887
888pub async fn poll_until_impl<F: Fn() -> bool>(poll_fn: F, loc: &SourceLocation) {
890 log::info!("=> poll_until() for {}", loc);
891 while !poll_fn() {
892 fasync::Timer::new(fasync::MonotonicInstant::after(RETRY_WAIT_DURATION)).await
893 }
894 log::info!("<= poll_until() for {}", loc);
895}