timekeeper_integration_lib/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::Context;
6use chrono::{Datelike, TimeZone, Timelike};
7use fidl::endpoints::ServerEnd;
8use fidl_fuchsia_hardware_rtc::{DeviceRequest, DeviceRequestStream};
9use fidl_fuchsia_metrics::MetricEvent;
10use fidl_fuchsia_metrics_test::{
11    LogMethod, MetricEventLoggerQuerierMarker, MetricEventLoggerQuerierProxy,
12};
13use fidl_fuchsia_testing::{
14    FakeClockControlMarker, FakeClockControlProxy, FakeClockMarker, FakeClockProxy,
15};
16use fidl_fuchsia_time::{MaintenanceRequest, MaintenanceRequestStream};
17use fidl_fuchsia_time_external::{PushSourceMarker, Status, TimeSample};
18use fidl_test_time::{TimeSourceControlRequest, TimeSourceControlRequestStream};
19use fuchsia_component::server::ServiceFs;
20use fuchsia_component_test::{
21    Capability, ChildOptions, ChildRef, LocalComponentHandles, RealmBuilder, RealmInstance, Ref,
22    Route,
23};
24use fuchsia_sync::Mutex;
25use futures::channel::mpsc::Sender;
26use futures::stream::{Stream, StreamExt, TryStreamExt};
27use futures::{Future, FutureExt, SinkExt};
28use lazy_static::lazy_static;
29use push_source::{PushSource, TestUpdateAlgorithm, Update};
30use std::ops::Deref;
31use std::sync::Arc;
32use time_metrics_registry::PROJECT_ID;
33use vfs::pseudo_directory;
34use zx::{self as zx, HandleBased, Rights};
35use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
36
37/// URL for timekeeper.
38const TIMEKEEPER_URL: &str = "#meta/timekeeper_for_integration.cm";
39/// URL for timekeeper with fake time.
40const TIMEKEEPER_FAKE_TIME_URL: &str = "#meta/timekeeper_with_fake_time.cm";
41/// URL for fake cobalt.
42const COBALT_URL: &str = "#meta/fake_cobalt.cm";
43/// URL for the fake clock component.
44const FAKE_CLOCK_URL: &str = "#meta/fake_clock.cm";
45
46/// A reference to a timekeeper running inside a nested environment which runs fake versions of
47/// the services timekeeper requires.
48pub struct NestedTimekeeper {
49    _realm_instance: RealmInstance,
50}
51
52impl Into<RealmInstance> for NestedTimekeeper {
53    // Deconstructs [Self] into an underlying [RealmInstance].
54    fn into(self) -> RealmInstance {
55        self._realm_instance
56    }
57}
58
59impl NestedTimekeeper {
60    /// Creates a new [NestedTimekeeper].
61    ///
62    /// Launches an instance of timekeeper maintaining the provided |clock| in a nested
63    /// environment.
64    ///
65    /// If |initial_rtc_time| is provided, then the environment contains a fake RTC
66    /// device that reports the time as |initial_rtc_time|.
67    ///
68    /// If use_fake_clock is true, also launches a fake monotonic clock service.
69    ///
70    /// Returns a `NestedTimekeeper`, handles to the PushSource and RTC it obtains updates from,
71    /// Cobalt debug querier, and a fake clock control handle if use_fake_clock is true.
72    pub async fn new(
73        clock: Arc<zx::Clock>,
74        rtc_options: RtcOptions,
75        use_fake_clock: bool,
76    ) -> (
77        Self,
78        Arc<PushSourcePuppet>,
79        RtcUpdates,
80        MetricEventLoggerQuerierProxy,
81        Option<FakeClockController>,
82    ) {
83        let push_source_puppet = Arc::new(PushSourcePuppet::new());
84
85        let builder = RealmBuilder::new().await.unwrap();
86        let fake_cobalt =
87            builder.add_child("fake_cobalt", COBALT_URL, ChildOptions::new()).await.unwrap();
88
89        let timekeeper_url = if use_fake_clock { TIMEKEEPER_FAKE_TIME_URL } else { TIMEKEEPER_URL };
90        log::trace!("using timekeeper_url: {}", timekeeper_url);
91        let timekeeper = builder
92            .add_child("timekeeper_test", timekeeper_url, ChildOptions::new().eager())
93            .await
94            .with_context(|| format!("while starting up timekeeper_test from: {timekeeper_url}"))
95            .unwrap();
96
97        let timesource_server = builder
98            .add_local_child(
99                "timesource_mock",
100                {
101                    let push_source_puppet = Arc::clone(&push_source_puppet);
102                    move |handles: LocalComponentHandles| {
103                        Box::pin(timesource_mock_server(handles, Arc::clone(&push_source_puppet)))
104                    }
105                },
106                ChildOptions::new(),
107            )
108            .await
109            .context("while starting up timesource_mock")
110            .unwrap();
111
112        let maintenance_server = builder
113            .add_local_child(
114                "maintenance_mock",
115                move |handles: LocalComponentHandles| {
116                    Box::pin(maintenance_mock_server(handles, Arc::clone(&clock)))
117                },
118                ChildOptions::new(),
119            )
120            .await
121            .context("while starting up maintenance_mock")
122            .unwrap();
123
124        // Launch fake clock if needed.
125        if use_fake_clock {
126            let fake_clock =
127                builder.add_child("fake_clock", FAKE_CLOCK_URL, ChildOptions::new()).await.unwrap();
128
129            builder
130                .add_route(
131                    Route::new()
132                        .capability(Capability::protocol_by_name(
133                            "fuchsia.testing.FakeClockControl",
134                        ))
135                        .from(&fake_clock)
136                        .to(Ref::parent()),
137                )
138                .await
139                .context("while setting up FakeClockControl")
140                .unwrap();
141
142            builder
143                .add_route(
144                    Route::new()
145                        .capability(Capability::protocol_by_name("fuchsia.testing.FakeClock"))
146                        .from(&fake_clock)
147                        .to(Ref::parent())
148                        .to(&timekeeper),
149                )
150                .await
151                .context("while setting up FakeClock")
152                .unwrap();
153
154            builder
155                .add_route(
156                    Route::new()
157                        .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
158                        .from(Ref::parent())
159                        .to(&fake_clock),
160                )
161                .await
162                .context("while setting up LogSink")
163                .unwrap();
164        };
165
166        builder
167            .add_route(
168                Route::new()
169                    .capability(Capability::protocol_by_name("fuchsia.time.Maintenance"))
170                    .from(&maintenance_server)
171                    .to(&timekeeper),
172            )
173            .await
174            .context("while setting up Maintenance")
175            .unwrap();
176
177        builder
178            .add_route(
179                Route::new()
180                    .capability(Capability::protocol_by_name("test.time.TimeSourceControl"))
181                    .from(&timesource_server)
182                    .to(&timekeeper),
183            )
184            .await
185            .unwrap();
186
187        builder
188            .add_route(
189                Route::new()
190                    .capability(Capability::protocol_by_name(
191                        "fuchsia.metrics.test.MetricEventLoggerQuerier",
192                    ))
193                    .from(&fake_cobalt)
194                    .to(Ref::parent()),
195            )
196            .await
197            .unwrap();
198
199        builder
200            .add_route(
201                Route::new()
202                    .capability(Capability::protocol_by_name(
203                        "fuchsia.metrics.MetricEventLoggerFactory",
204                    ))
205                    .from(&fake_cobalt)
206                    .to(&timekeeper),
207            )
208            .await
209            .unwrap();
210
211        builder
212            .add_route(
213                Route::new()
214                    .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
215                    .from(Ref::parent())
216                    .to(&fake_cobalt)
217                    .to(&timekeeper)
218                    .to(&timesource_server)
219                    .to(&maintenance_server),
220            )
221            .await
222            .unwrap();
223
224        builder
225            .add_route(
226                Route::new()
227                    .capability(Capability::configuration("fuchsia.time.config.WritableUTCTime"))
228                    .from(Ref::parent())
229                    .to(&timekeeper),
230            )
231            .await
232            .unwrap();
233
234        let rtc_updates = setup_rtc(rtc_options, &builder, &timekeeper).await;
235        let realm_instance = builder.build().await.unwrap();
236
237        let fake_clock_control = if use_fake_clock {
238            let control_proxy = realm_instance
239                .root
240                .connect_to_protocol_at_exposed_dir::<FakeClockControlMarker>()
241                .unwrap();
242            let clock_proxy = realm_instance
243                .root
244                .connect_to_protocol_at_exposed_dir::<FakeClockMarker>()
245                .unwrap();
246            Some(FakeClockController { control_proxy, clock_proxy })
247        } else {
248            None
249        };
250
251        let cobalt_querier = realm_instance
252            .root
253            .connect_to_protocol_at_exposed_dir::<MetricEventLoggerQuerierMarker>()
254            .expect("the connection succeeds");
255
256        let nested_timekeeper = Self { _realm_instance: realm_instance };
257
258        (nested_timekeeper, push_source_puppet, rtc_updates, cobalt_querier, fake_clock_control)
259    }
260}
261
262pub struct RemotePushSourcePuppet {
263    proxy: fidl_test_time_realm::PushSourcePuppetProxy,
264}
265
266impl RemotePushSourcePuppet {
267    /// Creates a new [RemotePushSourcePuppet].
268    pub fn new(proxy: fidl_test_time_realm::PushSourcePuppetProxy) -> Arc<Self> {
269        Arc::new(Self { proxy })
270    }
271
272    /// Set the next sample reported by the time source.
273    pub async fn set_sample(&self, sample: TimeSample) {
274        self.proxy.set_sample(&sample).await.expect("original API was infallible");
275    }
276
277    /// Set the next status reported by the time source.
278    pub async fn set_status(&self, status: Status) {
279        self.proxy.set_status(status).await.expect("original API was infallible");
280    }
281
282    /// Simulate a crash by closing client channels and wiping state.
283    pub async fn simulate_crash(&self) {
284        self.proxy.crash().await.expect("original local API was infallible");
285    }
286
287    /// Returns the number of cumulative connections served. This allows asserting
288    /// behavior such as whether Timekeeper has restarted a connection.
289    pub async fn lifetime_served_connections(&self) -> u32 {
290        self.proxy.get_lifetime_served_connections().await.expect("original API was infallible")
291    }
292}
293
294/// A `PushSource` that allows a single client and can be controlled by a test.
295pub struct PushSourcePuppet {
296    /// Internal state for the current PushSource. May be dropped and replaced
297    /// to clear all state.
298    inner: Mutex<PushSourcePuppetInner>,
299    /// The number of client connections received over the lifetime of the puppet.
300    cumulative_clients: Mutex<u32>,
301}
302
303impl PushSourcePuppet {
304    /// Create a new `PushSourcePuppet`.
305    fn new() -> Self {
306        Self { inner: Mutex::new(PushSourcePuppetInner::new()), cumulative_clients: Mutex::new(0) }
307    }
308
309    /// Serve the `PushSource` service to a client.
310    fn serve_client(&self, server_end: ServerEnd<PushSourceMarker>) {
311        log::debug!("serve_client entry");
312        let mut inner = self.inner.lock();
313        // Timekeeper should only need to connect to a push source once, except when it is
314        // restarting a time source. This case appears to the test as a second connection to the
315        // puppet. Since the puppet is restarted, all its state should be cleared as well.
316        if inner.served_client() {
317            *inner = PushSourcePuppetInner::new();
318        }
319        inner.serve_client(server_end);
320        *self.cumulative_clients.lock() += 1;
321    }
322
323    /// Set the next sample reported by the time source.
324    pub async fn set_sample(&self, sample: TimeSample) {
325        let mut sink = self.inner.lock().get_sink();
326        sink.send(sample.into()).await.unwrap();
327    }
328
329    /// Set the next status reported by the time source.
330    pub async fn set_status(&self, status: Status) {
331        let mut sink = self.inner.lock().get_sink();
332        sink.send(status.into()).await.unwrap();
333    }
334
335    /// Simulate a crash by closing client channels and wiping state.
336    pub fn simulate_crash(&self) {
337        *self.inner.lock() = PushSourcePuppetInner::new();
338        // This drops the old inner and cleans up any tasks it owns.
339    }
340
341    /// Returns the number of cumulative connections served. This allows asserting
342    /// behavior such as whether Timekeeper has restarted a connection.
343    pub fn lifetime_served_connections(&self) -> u32 {
344        *self.cumulative_clients.lock()
345    }
346}
347
348/// Internal state for a PushSourcePuppet. This struct contains a PushSource and
349/// all Tasks needed for it to serve requests,
350struct PushSourcePuppetInner {
351    push_source: Arc<PushSource<TestUpdateAlgorithm>>,
352    /// Tasks serving PushSource clients.
353    tasks: Vec<fasync::Task<()>>,
354    /// Sink through which updates are passed to the PushSource.
355    update_sink: Sender<Update>,
356}
357
358impl PushSourcePuppetInner {
359    fn new() -> Self {
360        let (update_algorithm, update_sink) = TestUpdateAlgorithm::new();
361        let push_source = Arc::new(PushSource::new(update_algorithm, Status::Ok).unwrap());
362        let push_source_clone = Arc::clone(&push_source);
363        let tasks = vec![fasync::Task::spawn(async move {
364            push_source_clone.poll_updates().await.unwrap();
365        })];
366        Self { push_source, tasks, update_sink }
367    }
368
369    /// Returns true if this puppet has or is currently serving a client.
370    fn served_client(&self) -> bool {
371        self.tasks.len() > 1
372    }
373
374    /// Serve the `PushSource` service to a client.
375    fn serve_client(&mut self, server_end: ServerEnd<PushSourceMarker>) {
376        let push_source_clone = Arc::clone(&self.push_source);
377        self.tasks.push(fasync::Task::spawn(async move {
378            push_source_clone.handle_requests_for_stream(server_end.into_stream()).await.unwrap();
379        }));
380    }
381
382    /// Obtains the sink used to send commands to the push source puppet.
383    ///
384    /// The sink is detached from the puppet, so can be used whenever needed
385    /// without locking.
386    fn get_sink(&self) -> Sender<Update> {
387        self.update_sink.clone()
388    }
389}
390
391/// The list of RTC update requests received by a `NestedTimekeeper`.
392#[derive(Clone, Debug)]
393pub struct RtcUpdates(Arc<Mutex<Vec<fidl_fuchsia_hardware_rtc::Time>>>);
394
395impl RtcUpdates {
396    /// Get all received RTC times as a vec.
397    pub fn to_vec(&self) -> Vec<fidl_fuchsia_hardware_rtc::Time> {
398        self.0.lock().clone()
399    }
400}
401
402/// Remote RTC updates - peek into the life of the RTC on the other side of a
403/// RTC connection.
404pub struct RemoteRtcUpdates {
405    proxy: fidl_test_time_realm::RtcUpdatesProxy,
406}
407
408impl RemoteRtcUpdates {
409    pub async fn to_vec(&self) -> Vec<fidl_fuchsia_hardware_rtc::Time> {
410        self.proxy
411            .get(fidl_test_time_realm::GetRequest::default())
412            .await
413            .expect("no errors or overflows") // Original API was infallible.
414            .unwrap()
415            .0
416    }
417    pub fn new(proxy: fidl_test_time_realm::RtcUpdatesProxy) -> Self {
418        RemoteRtcUpdates { proxy }
419    }
420}
421
422/// A wrapper around a `FakeClockControlProxy` that also allows a client to read
423/// the current fake time.
424pub struct FakeClockController {
425    control_proxy: FakeClockControlProxy,
426    clock_proxy: FakeClockProxy,
427}
428
429impl Deref for FakeClockController {
430    type Target = FakeClockControlProxy;
431
432    fn deref(&self) -> &Self::Target {
433        &self.control_proxy
434    }
435}
436
437impl FakeClockController {
438    /// Re-constructs FakeClockController from the constituents.
439    pub fn new(control_proxy: FakeClockControlProxy, clock_proxy: FakeClockProxy) -> Self {
440        FakeClockController { control_proxy, clock_proxy }
441    }
442
443    /// Deconstructs [Self] into fake clock proxies.
444    pub fn into_components(self) -> (FakeClockControlProxy, FakeClockProxy) {
445        (self.control_proxy, self.clock_proxy)
446    }
447
448    pub async fn get_monotonic(&self) -> Result<i64, fidl::Error> {
449        self.clock_proxy.get().await
450    }
451
452    /// Returns the current fake instant on the reference timeline.
453    pub async fn get_reference(&self) -> Result<zx::BootInstant, fidl::Error> {
454        self.get_monotonic().await.map(|v| zx::BootInstant::from_nanos(v))
455    }
456}
457
458/// The RTC configuration options.
459pub enum RtcOptions {
460    /// No real-time clock available. This configuration simulates a system that
461    /// does not have a RTC circuit available.
462    None,
463    /// Fake real-time clock. Supplied initial RTC time to report.
464    InitialRtcTime(zx::SyntheticInstant),
465    /// Injected real-time clock.
466    ///
467    /// This is the handle that will appear as the directory
468    /// `/dev/class/rtc` in the Timekeeper's namespace.
469    ///
470    /// The caller must set this directory up so that it serves
471    /// a RTC device (e.g. named `/dev/class/rtc/000`, and serving
472    /// the FIDL `fuchsia.hardware.rtc/Device`) from this directory.
473    ///
474    /// It is also possible to serve more RTCs from the directory, or
475    /// other files and file types at the caller's option.
476    ///
477    /// Use this option if you need to implement corner cases, or
478    /// very specific RTC behavior, such as abnormal configuration
479    /// or anomalous behavior.
480    InjectedRtc(fio::DirectoryProxy),
481}
482
483impl From<fidl_test_time_realm::RtcOptions> for RtcOptions {
484    fn from(value: fidl_test_time_realm::RtcOptions) -> Self {
485        match value {
486            fidl_test_time_realm::RtcOptions::DevClassRtc(h) => {
487                RtcOptions::InjectedRtc(h.into_proxy())
488            }
489            fidl_test_time_realm::RtcOptions::InitialRtcTime(t) => {
490                RtcOptions::InitialRtcTime(zx::SyntheticInstant::from_nanos(t))
491            }
492            _ => unimplemented!(),
493        }
494    }
495}
496
497impl From<zx::SyntheticInstant> for RtcOptions {
498    fn from(value: zx::SyntheticInstant) -> Self {
499        RtcOptions::InitialRtcTime(value)
500    }
501}
502
503impl From<Option<zx::SyntheticInstant>> for RtcOptions {
504    fn from(value: Option<zx::SyntheticInstant>) -> Self {
505        value.map(|t| t.into()).unwrap_or(Self::None)
506    }
507}
508
509/// Sets up the RTC serving.
510///
511/// Args:
512/// - `rtc_options`: options for RTC setup.
513/// - `build`: the `RealmBuilder` that will construct the realm.
514/// - `timekeeper`: the Timekeeper component instance.
515///
516/// Returns:
517/// - `RtcUpdates`: A vector of RTC updates received from a fake RTC. If the
518///   client serves the RTC directory, then the return value is useless.
519async fn setup_rtc(
520    rtc_options: RtcOptions,
521    builder: &RealmBuilder,
522    timekeeper: &ChildRef,
523) -> RtcUpdates {
524    let rtc_updates = RtcUpdates(Arc::new(Mutex::new(vec![])));
525
526    let rtc_dir = match rtc_options {
527        RtcOptions::InitialRtcTime(initial_time) => {
528            log::debug!("using fake /dev/class/rtc/000");
529            pseudo_directory! {
530                "class" => pseudo_directory! {
531                    "rtc" => pseudo_directory! {
532                        "000" => vfs::service::host({
533                            let rtc_updates = rtc_updates.clone();
534                            move |stream| {
535                                serve_fake_rtc(initial_time, rtc_updates.clone(), stream)
536                            }
537                        })
538                    }
539                }
540            }
541        }
542        RtcOptions::None => {
543            log::debug!("using an empty /dev/class/rtc directory");
544            pseudo_directory! {
545                "class" => pseudo_directory! {
546                    "rtc" => pseudo_directory! {
547                    }
548                }
549            }
550        }
551        RtcOptions::InjectedRtc(h) => {
552            log::debug!("using /dev/class/rtc provided by client");
553            pseudo_directory! {
554                "class" => pseudo_directory! {
555                    "rtc" => vfs::remote::remote_dir(h)
556                }
557            }
558        }
559    };
560
561    let fake_rtc_server = builder
562        .add_local_child(
563            "fake_rtc",
564            {
565                move |handles| {
566                    let rtc_dir = rtc_dir.clone();
567                    async move {
568                        let _ = &handles;
569                        let mut fs = ServiceFs::new();
570                        fs.add_remote("dev", vfs::directory::serve_read_only(rtc_dir));
571                        fs.serve_connection(handles.outgoing_dir)
572                            .expect("failed to serve fake RTC ServiceFs");
573                        fs.collect::<()>().await;
574                        Ok(())
575                    }
576                    .boxed()
577                }
578            },
579            ChildOptions::new().eager(),
580        )
581        .await
582        .unwrap();
583
584    builder
585        .add_route(
586            Route::new()
587                .capability(
588                    Capability::directory("dev-rtc")
589                        .path("/dev/class/rtc")
590                        .rights(fio::R_STAR_DIR),
591                )
592                .from(&fake_rtc_server)
593                .to(&*timekeeper),
594        )
595        .await
596        .unwrap();
597
598    rtc_updates
599}
600
601async fn serve_fake_rtc(
602    initial_time: zx::SyntheticInstant,
603    rtc_updates: RtcUpdates,
604    mut stream: DeviceRequestStream,
605) {
606    while let Some(req) = stream.try_next().await.unwrap() {
607        match req {
608            DeviceRequest::Get { responder } => {
609                log::debug!("serve_fake_rtc: DeviceRequest::Get");
610                // Since timekeeper only pulls a time off of the RTC device once on startup, we
611                // don't attempt to update the sent time.
612                responder.send(Ok(&zx_time_to_rtc_time(initial_time))).unwrap();
613            }
614            DeviceRequest::Set { rtc, responder } => {
615                log::debug!("serve_fake_rtc: DeviceRequest::Set");
616                rtc_updates.0.lock().push(rtc);
617                responder.send(zx::Status::OK.into_raw()).unwrap();
618            }
619            DeviceRequest::Set2 { rtc, responder } => {
620                log::debug!("serve_fake_rtc: DeviceRequest::Set2");
621                rtc_updates.0.lock().push(rtc);
622                responder.send(Ok(())).unwrap();
623            }
624            DeviceRequest::_UnknownMethod { .. } => {}
625        }
626    }
627}
628
629async fn serve_test_control(puppet: &PushSourcePuppet, stream: TimeSourceControlRequestStream) {
630    stream
631        .try_for_each_concurrent(None, |req| async {
632            let _ = &req;
633            let TimeSourceControlRequest::ConnectPushSource { push_source, .. } = req;
634            puppet.serve_client(push_source);
635            Ok(())
636        })
637        .await
638        .unwrap();
639}
640
641async fn serve_maintenance(clock_handle: Arc<zx::Clock>, mut stream: MaintenanceRequestStream) {
642    while let Some(req) = stream.try_next().await.unwrap() {
643        let MaintenanceRequest::GetWritableUtcClock { responder } = req;
644        responder.send(clock_handle.duplicate_handle(Rights::SAME_RIGHTS).unwrap()).unwrap();
645    }
646}
647
648async fn timesource_mock_server(
649    handles: LocalComponentHandles,
650    push_source_puppet: Arc<PushSourcePuppet>,
651) -> Result<(), anyhow::Error> {
652    let mut fs = ServiceFs::new();
653    let mut tasks = vec![];
654
655    fs.dir("svc").add_fidl_service(move |stream: TimeSourceControlRequestStream| {
656        let puppet_clone = Arc::clone(&push_source_puppet);
657
658        tasks.push(fasync::Task::local(async move {
659            serve_test_control(&*puppet_clone, stream).await;
660        }));
661    });
662
663    fs.serve_connection(handles.outgoing_dir)?;
664    fs.collect::<()>().await;
665
666    Ok(())
667}
668
669async fn maintenance_mock_server(
670    handles: LocalComponentHandles,
671    clock: Arc<zx::Clock>,
672) -> Result<(), anyhow::Error> {
673    let mut fs = ServiceFs::new();
674    let mut tasks = vec![];
675
676    fs.dir("svc").add_fidl_service(move |stream: MaintenanceRequestStream| {
677        let clock_clone = Arc::clone(&clock);
678
679        tasks.push(fasync::Task::local(async move {
680            serve_maintenance(clock_clone, stream).await;
681        }));
682    });
683
684    fs.serve_connection(handles.outgoing_dir)?;
685    fs.collect::<()>().await;
686
687    Ok(())
688}
689
690fn from_rfc2822(date: &str) -> zx::SyntheticInstant {
691    zx::SyntheticInstant::from_nanos(
692        chrono::DateTime::parse_from_rfc2822(date).unwrap().timestamp_nanos_opt().unwrap(),
693    )
694}
695
696lazy_static! {
697    pub static ref BACKSTOP_TIME: zx::SyntheticInstant =
698        from_rfc2822("Sun, 20 Sep 2020 01:01:01 GMT");
699    pub static ref VALID_RTC_TIME: zx::SyntheticInstant =
700        from_rfc2822("Sun, 20 Sep 2020 02:02:02 GMT");
701    pub static ref BEFORE_BACKSTOP_TIME: zx::SyntheticInstant =
702        from_rfc2822("Fri, 06 Mar 2020 04:04:04 GMT");
703    pub static ref VALID_TIME: zx::SyntheticInstant = from_rfc2822("Tue, 29 Sep 2020 02:19:01 GMT");
704    pub static ref VALID_TIME_2: zx::SyntheticInstant =
705        from_rfc2822("Wed, 30 Sep 2020 14:59:59 GMT");
706}
707
708/// Time between each reported sample.
709pub const BETWEEN_SAMPLES: zx::BootDuration = zx::BootDuration::from_seconds(5);
710
711/// The standard deviation to report on valid time samples.
712pub const STD_DEV: zx::BootDuration = zx::BootDuration::from_millis(50);
713
714/// Create a new clock with backstop time set to `BACKSTOP_TIME`.
715// TODO: b/306024715 - To be removed once all tests are migrated to TTRF.
716pub fn new_clock() -> Arc<zx::SyntheticClock> {
717    Arc::new(new_nonshareable_clock())
718}
719
720/// Create a new clock with backstop time set to `BACKSTOP_TIME`.
721pub fn new_nonshareable_clock() -> zx::SyntheticClock {
722    zx::SyntheticClock::create(zx::ClockOpts::empty(), Some(*BACKSTOP_TIME)).unwrap()
723}
724
725fn zx_time_to_rtc_time(zx_time: zx::SyntheticInstant) -> fidl_fuchsia_hardware_rtc::Time {
726    let date = chrono::Utc.timestamp_nanos(zx_time.into_nanos());
727    fidl_fuchsia_hardware_rtc::Time {
728        seconds: date.second() as u8,
729        minutes: date.minute() as u8,
730        hours: date.hour() as u8,
731        day: date.day() as u8,
732        month: date.month() as u8,
733        year: date.year() as u16,
734    }
735}
736
737pub fn rtc_time_to_zx_time(rtc_time: fidl_fuchsia_hardware_rtc::Time) -> zx::SyntheticInstant {
738    let date = chrono::Utc
739        .with_ymd_and_hms(
740            rtc_time.year as i32,
741            rtc_time.month as u32,
742            rtc_time.day as u32,
743            rtc_time.hours as u32,
744            rtc_time.minutes as u32,
745            rtc_time.seconds as u32,
746        )
747        .unwrap();
748    zx::SyntheticInstant::from_nanos(date.timestamp_nanos_opt().unwrap())
749}
750
751/// Create a stream of MetricEvents from a proxy.
752pub fn create_cobalt_event_stream(
753    proxy: Arc<MetricEventLoggerQuerierProxy>,
754    log_method: LogMethod,
755) -> std::pin::Pin<Box<dyn Stream<Item = MetricEvent>>> {
756    async_utils::hanging_get::client::HangingGetStream::new(proxy, move |p| {
757        p.watch_logs(PROJECT_ID, log_method)
758    })
759    .map(|res| futures::stream::iter(res.expect("there should be a valid result here").0))
760    .flatten()
761    .boxed()
762}
763
764/// Repeatedly evaluates `condition` until it returns `Some(v)`. Returns `v`.
765#[macro_export]
766macro_rules! poll_until_some {
767    ($condition:expr) => {
768        $crate::poll_until_some_impl(
769            $condition,
770            &$crate::SourceLocation::new(file!(), line!(), column!()),
771        )
772    };
773}
774
775/// Repeatedly evaluates an async `condition` until it returns `Some(v)`. Returns `v`.
776/// Use if your condition is an async fn.
777#[macro_export]
778macro_rules! poll_until_some_async {
779    ($condition:expr) => {{
780        let loc = $crate::SourceLocation::new(file!(), line!(), column!());
781        log::info!("=> poll_until_some_async() for {}", &loc);
782        let mut result = None;
783        loop {
784            result = $condition.await;
785            if result.is_some() {
786                break;
787            }
788            fasync::Timer::new(fasync::MonotonicInstant::after($crate::RETRY_WAIT_DURATION)).await;
789        }
790        log::info!("=> poll_until_some_async() done for {}", &loc);
791        result.expect("we loop around while result is None")
792    }};
793}
794
795/// Repeatedly evaluates `condition` to create a `Future`, and then awaits the `Future`.
796/// Returns `()` when the (most recently created) `Future` resolves to `true`.
797#[macro_export]
798macro_rules! poll_until_async {
799    ($condition:expr) => {
800        $crate::poll_until_async_impl(
801            $condition,
802            &$crate::SourceLocation::new(file!(), line!(), column!()),
803        )
804    };
805}
806
807/// A reimplementation of the above, which deals better with borrows.
808#[macro_export]
809macro_rules! poll_until_async_2 {
810    ($condition:expr) => {{
811        let loc = $crate::SourceLocation::new(file!(), line!(), column!());
812        log::info!("=> poll_until_async() for {}", &loc);
813        let mut result = true;
814        loop {
815            result = $condition.await;
816            if result {
817                break;
818            }
819            fasync::Timer::new(fasync::MonotonicInstant::after($crate::RETRY_WAIT_DURATION)).await;
820        }
821        log::info!("=> poll_until_async_2() done for {}", &loc);
822        result
823    }};
824}
825
826/// Repeatedly evaluates `condition` until it returns `true`. Returns `()`.
827#[macro_export]
828macro_rules! poll_until {
829    ($condition:expr) => {
830        $crate::poll_until_impl(
831            $condition,
832            &$crate::SourceLocation::new(file!(), line!(), column!()),
833        )
834    };
835}
836
837/// Wait duration for polling.
838pub const RETRY_WAIT_DURATION: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(10);
839
840pub struct SourceLocation {
841    file: &'static str,
842    line: u32,
843    column: u32,
844}
845
846impl std::fmt::Display for SourceLocation {
847    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
848        write!(f, "(file: {}, line: {}, column: {})", self.file, self.line, self.column)
849    }
850}
851
852impl SourceLocation {
853    pub fn new(file: &'static str, line: u32, column: u32) -> Self {
854        Self { file, line, column }
855    }
856}
857
858/// Use `poll_until_some!()` instead.
859pub async fn poll_until_some_impl<T, F>(poll_fn: F, loc: &SourceLocation) -> T
860where
861    F: Fn() -> Option<T>,
862{
863    log::info!("=> poll_until_some() for {}", loc);
864    loop {
865        match poll_fn() {
866            Some(value) => {
867                log::info!("<= poll_until_some() for {}", loc);
868                return value;
869            }
870            None => fasync::Timer::new(fasync::MonotonicInstant::after(RETRY_WAIT_DURATION)).await,
871        }
872    }
873}
874
875/// Use `poll_until_async!()` instead.
876pub async fn poll_until_async_impl<F, Fut>(poll_fn: F, loc: &SourceLocation)
877where
878    F: Fn() -> Fut,
879    Fut: Future<Output = bool>,
880{
881    log::info!("=> poll_until_async() for {}", loc);
882    while !poll_fn().await {
883        fasync::Timer::new(fasync::MonotonicInstant::after(RETRY_WAIT_DURATION)).await
884    }
885    log::info!("<= poll_until_async() for {}", loc);
886}
887
888/// Use `poll_until!()` instead.
889pub async fn poll_until_impl<F: Fn() -> bool>(poll_fn: F, loc: &SourceLocation) {
890    log::info!("=> poll_until() for {}", loc);
891    while !poll_fn() {
892        fasync::Timer::new(fasync::MonotonicInstant::after(RETRY_WAIT_DURATION)).await
893    }
894    log::info!("<= poll_until() for {}", loc);
895}