sampler/
executor.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! # Data Structure and Algorithm Overview
6//!
7//! Cobalt is organized into Projects, each of which contains several Metrics.
8//!
9//! [`MetricConfig`] - defined in src/diagnostics/lib/sampler-config/src/lib.rs
10//! This is deserialized from Sampler config files or created by FIRE by interpolating
11//! component information into FIRE config files. It contains
12//!
13//!  - selectors: Vec<Selector>
14//!  - metric_id
15//!  - metric_type: MetricType
16//!  - event_codes: Vec<u32>
17//!  - upload_once boolean
18//!
19//! **NOTE:** Multiple selectors can be provided in a single metric. Only one selector is
20//! expected to fetch/match data. When one does, the other selectors will be disabled for
21//! efficiency.
22//!
23//! [`ProjectConfig`] - defined in src/diagnostics/lib/sampler-config/src/lib.rs
24//! This encodes the contents of a single config file:
25//!
26//!  - project_id
27//!  - customer_id (defaults to 1)
28//!  - poll_rate_sec
29//!  - metrics: Vec<MetricConfig>
30//!
31//! [`SamplerConfig`] - defined in src/diagnostics/sampler/src/config.rs
32//! The entire config for Sampler. Contains
33//!
34//!  - list of ProjectConfig
35//!  - minimum sample rate
36//!
37//! [`ProjectSampler`] - defined in src/diagnostics/sampler/src/executor.rs
38//! This contains
39//!
40//!  - several MetricConfig's
41//!  - an ArchiveReader configured with all active selectors
42//!  - a cache of previous Diagnostic values, indexed by selector strings
43//!  - FIDL proxies for Cobalt and MetricEvent loggers
44//!     - these loggers are configured with project_id and customer_id
45//!  - Poll rate
46//!  - Inspect stats (struct ProjectSamplerStats)
47//!
48//! [`ProjectSampler`] is stored in:
49//!
50//!  - [`TaskCancellation`]:     execution_context: fasync::Task<Vec<ProjectSampler>>,
51//!  - [`RebootSnapshotProcessor`]:    project_samplers: Vec<ProjectSampler>,
52//!  - [`SamplerExecutor`]:     project_samplers: Vec<ProjectSampler>,
53//!  - [`ProjectSamplerTaskExit::RebootTriggered(ProjectSampler)`],
54//!
55//! [`SamplerExecutor`] (defined in executor.rs) is built from a single [`SamplerConfig`].
56//! [`SamplerExecutor`] contains
57//!
58//!  - a list of ProjectSamplers
59//!  - an Inspect stats structure
60//!
61//! [`SamplerExecutor`] only has one member function execute() which calls spawn() on each
62//! project sampler, passing it a receiver-oneshot to cancel it. The collection of
63//! oneshot-senders and spawned-tasks builds the returned TaskCancellation.
64//!
65//! [`TaskCancellation`] is then passed to a reboot_watcher (in src/diagnostics/sampler/src/lib.rs)
66//! which does nothing until the reboot service either closes (calling
67//! run_without_cancellation()) or sends a message (calling perform_reboot_cleanup()).
68//!
69//! [`ProjectSampler`] calls fasync::Task::spawn to create a task that starts a timer, then loops
70//! listening to that timer and to the reboot_oneshot. When the timer triggers, it calls
71//! self.process_next_snapshot(). If the reboot oneshot arrives, the task returns
72//! [`ProjectSamplerTaskExit::RebootTriggered(self)`].
73//!
74//!
75//! **NOTE:** Selectors are expected to match one value each. Wildcards are only allowed in the
76//! moniker segment of a selector when it is a driver in the driver collections.
77//!
78//! Selectors will become unused, either because of upload_once, or because data was found by a
79//! different selector. Rather than implement deletion in the vecs,
80//! which would add lots of bug surface and maintenance debt, each selector is an Option<> so that
81//! selectors can be deleted/disabled without changing the rest of the data structure.
82//! Once all Diagnostic data is processed, the structure is rebuilt if any selectors
83//! have been disabled; rebuilding less often would be
84//! premature optimization at this point.
85//!
86//! perform_reboot_cleanup() builds an ArchiveReader for [`RebootSnapshotProcessor`].
87//! When not rebooting, each project fetches its own data from Archivist.
88
89use crate::config::SamplerConfig;
90use crate::diagnostics::*;
91use anyhow::{format_err, Context, Error};
92use diagnostics_data::{Data, InspectHandleName};
93use diagnostics_hierarchy::{
94    ArrayContent, DiagnosticsHierarchy, ExponentialHistogram, LinearHistogram, Property,
95    SelectResult,
96};
97use diagnostics_reader::{ArchiveReader, Inspect, InspectArchiveReader, RetryConfig};
98use fidl_fuchsia_metrics::{
99    HistogramBucket, MetricEvent, MetricEventLoggerFactoryMarker, MetricEventLoggerFactoryProxy,
100    MetricEventLoggerProxy, MetricEventPayload, ProjectSpec,
101};
102use fuchsia_async as fasync;
103use fuchsia_component::client::connect_to_protocol;
104use fuchsia_inspect::{self as inspect, NumericProperty};
105use fuchsia_inspect_derive::WithInspect;
106use futures::channel::oneshot;
107use futures::future::join_all;
108use futures::stream::FuturesUnordered;
109use futures::{select, StreamExt};
110use log::{info, warn};
111use moniker::ExtendedMoniker;
112use sampler_config::runtime::{MetricConfig, ProjectConfig};
113use sampler_config::{MetricType, ProjectId};
114use selectors::SelectorExt;
115use std::cell::{Ref, RefCell, RefMut};
116use std::collections::hash_map::Entry;
117use std::collections::HashMap;
118use std::sync::Arc;
119
120/// An event to be logged to the cobalt logger. Events are generated first,
121/// then logged. (This permits unit-testing the code that generates events from
122/// Diagnostic data.)
123type EventToLog = (ProjectId, MetricEvent);
124
125pub struct TaskCancellation {
126    senders: Vec<oneshot::Sender<()>>,
127    _sampler_executor_stats: Arc<SamplerExecutorStats>,
128    execution_context: fasync::Task<Vec<ProjectSampler>>,
129}
130
131impl TaskCancellation {
132    /// It's possible the reboot register goes down. If that
133    /// happens, we want to continue driving execution with
134    /// no consideration for cancellation. This does that.
135    pub async fn run_without_cancellation(self) {
136        self.execution_context.await;
137    }
138
139    pub async fn perform_reboot_cleanup(self) {
140        // Let every sampler know that a reboot is pending and they should exit.
141        self.senders.into_iter().for_each(|sender| {
142            sender
143                .send(())
144                .unwrap_or_else(|err| warn!("Failed to send reboot over oneshot: {:?}", err))
145        });
146
147        // Get the most recently updated project samplers from all the futures. They hold the
148        // cache with the most recent values for all the metrics we need to sample and diff!
149        let project_samplers: Vec<ProjectSampler> = self.execution_context.await;
150
151        let mut reader = ArchiveReader::inspect();
152
153        for project_sampler in &project_samplers {
154            for metric in &project_sampler.metrics {
155                for selector in metric.borrow().selectors.iter() {
156                    reader.add_selector(selector.clone());
157                }
158            }
159        }
160
161        reader.retry(RetryConfig::never());
162
163        let mut reboot_processor = RebootSnapshotProcessor { reader, project_samplers };
164        // Log errors encountered in final snapshot, but always swallow errors so we can gracefully
165        // notify RebootMethodsWatcherRegister that we yield our remaining time.
166        reboot_processor
167            .process_reboot_sample()
168            .await
169            .unwrap_or_else(|e| warn!("Reboot snapshot failed! {:?}", e));
170    }
171}
172
173struct RebootSnapshotProcessor {
174    /// Reader constructed from the union of selectors
175    /// for every [`ProjectSampler`] config.
176    reader: InspectArchiveReader,
177    /// Vector of mutable [`ProjectSampler`] objects that will
178    /// process their final samples.
179    project_samplers: Vec<ProjectSampler>,
180}
181
182impl RebootSnapshotProcessor {
183    pub async fn process_reboot_sample(&mut self) -> Result<(), Error> {
184        let snapshot_data = self.reader.snapshot().await?;
185        for data_packet in snapshot_data {
186            let moniker = data_packet.moniker;
187            match data_packet.payload {
188                None => {
189                    process_schema_errors(&data_packet.metadata.errors, &moniker);
190                }
191                Some(payload) => {
192                    self.process_single_payload(payload, &data_packet.metadata.name, &moniker).await
193                }
194            }
195        }
196        Ok(())
197    }
198
199    async fn process_single_payload(
200        &mut self,
201        hierarchy: DiagnosticsHierarchy<String>,
202        inspect_handle_name: &InspectHandleName,
203        moniker: &ExtendedMoniker,
204    ) {
205        let mut projects = self
206            .project_samplers
207            .iter_mut()
208            .filter(|p| {
209                p.filter_metrics_by_moniker_and_tree_name(moniker, inspect_handle_name.as_ref())
210                    .next()
211                    .is_some()
212            })
213            .peekable();
214        if projects.peek().is_none() {
215            warn!(
216                moniker:%,
217                tree_name = inspect_handle_name.as_ref();
218                "no metrics found for moniker and tree_name combination"
219            );
220            return;
221        }
222
223        for project_sampler in projects {
224            // If processing the final sample failed, just log the
225            // error and proceed, everything's getting shut down
226            // soon anyway.
227            let maybe_err = match project_sampler.process_component_data(
228                &hierarchy,
229                inspect_handle_name,
230                moniker,
231            ) {
232                Err(err) => Some(err),
233                Ok((_selector_changes, events_to_log)) => {
234                    project_sampler.log_events(events_to_log).await.err()
235                }
236            };
237            if let Some(err) = maybe_err {
238                warn!(err:?; "A project sampler failed to process a reboot sample");
239            }
240        }
241    }
242}
243
244/// Owner of the sampler execution context.
245pub struct SamplerExecutor {
246    project_samplers: Vec<ProjectSampler>,
247    sampler_executor_stats: Arc<SamplerExecutorStats>,
248}
249
250impl SamplerExecutor {
251    /// Instantiate connection to the cobalt logger and map ProjectConfigurations
252    /// to [`ProjectSampler`] plans.
253    pub async fn new(sampler_config: SamplerConfig) -> Result<Self, Error> {
254        let metric_logger_factory: Arc<MetricEventLoggerFactoryProxy> = Arc::new(
255            connect_to_protocol::<MetricEventLoggerFactoryMarker>()
256                .context("Failed to connect to the Metric LoggerFactory")?,
257        );
258
259        let minimum_sample_rate_sec = sampler_config.minimum_sample_rate_sec;
260
261        let sampler_executor_stats = Arc::new(
262            SamplerExecutorStats::new()
263                .with_inspect(inspect::component::inspector().root(), "sampler_executor_stats")
264                .unwrap_or_else(|err| {
265                    warn!(err:?; "Failed to attach inspector to SamplerExecutorStats struct");
266                    SamplerExecutorStats::default()
267                }),
268        );
269
270        sampler_executor_stats
271            .total_project_samplers_configured
272            .add(sampler_config.project_configs.len() as u64);
273
274        let mut project_to_stats_map: HashMap<ProjectId, Arc<ProjectSamplerStats>> = HashMap::new();
275        // TODO(https://fxbug.dev/42118220): Create only one ArchiveReader for each unique poll rate so we
276        // can avoid redundant snapshots.
277        let project_sampler_futures =
278            sampler_config.project_configs.iter().cloned().map(|project_config| {
279                let project_sampler_stats =
280                    project_to_stats_map.entry(project_config.project_id).or_insert_with(|| {
281                        Arc::new(
282                            ProjectSamplerStats::new()
283                                .with_inspect(
284                                    &sampler_executor_stats.inspect_node,
285                                    format!("project_{}", project_config.project_id,),
286                                )
287                                .unwrap_or_else(|err| {
288                                    warn!(
289                                        err:?;
290                                        "Failed to attach inspector to ProjectSamplerStats struct"
291                                    );
292                                    ProjectSamplerStats::default()
293                                }),
294                        )
295                    });
296                ProjectSampler::new(
297                    project_config,
298                    metric_logger_factory.clone(),
299                    minimum_sample_rate_sec,
300                    project_sampler_stats.clone(),
301                )
302            });
303
304        let mut project_samplers: Vec<ProjectSampler> = Vec::new();
305        for project_sampler in join_all(project_sampler_futures).await.into_iter() {
306            match project_sampler {
307                Ok(project_sampler) => project_samplers.push(project_sampler),
308                Err(e) => {
309                    warn!("ProjectSampler construction failed: {:?}", e);
310                }
311            }
312        }
313        Ok(SamplerExecutor { project_samplers, sampler_executor_stats })
314    }
315
316    /// Turn each [`ProjectSampler`] plan into an [`fasync::Task`] which executes its associated plan,
317    /// and process errors if any tasks exit unexpectedly.
318    pub fn execute(self) -> TaskCancellation {
319        // Take ownership of the inspect struct so we can give it to the execution context. We do this
320        // so that the execution context can return the struct when it's halted by reboot, which allows inspect
321        // properties to survive through the reboot flow.
322        let task_cancellation_owned_stats = self.sampler_executor_stats.clone();
323        let execution_context_owned_stats = self.sampler_executor_stats.clone();
324
325        let (senders, mut spawned_tasks): (Vec<oneshot::Sender<()>>, FuturesUnordered<_>) = self
326            .project_samplers
327            .into_iter()
328            .map(|project_sampler| {
329                let (sender, receiver) = oneshot::channel::<()>();
330                (sender, project_sampler.spawn(receiver))
331            })
332            .unzip();
333
334        let execution_context = fasync::Task::local(async move {
335            let mut healthily_exited_samplers = Vec::new();
336            while let Some(sampler_result) = spawned_tasks.next().await {
337                match sampler_result {
338                    Err(e) => {
339                        // TODO(https://fxbug.dev/42118220): Consider restarting the failed sampler depending on
340                        // failure mode.
341                        warn!("A spawned sampler has failed: {:?}", e);
342                        execution_context_owned_stats.errorfully_exited_samplers.add(1);
343                    }
344                    Ok(ProjectSamplerTaskExit::RebootTriggered(sampler)) => {
345                        healthily_exited_samplers.push(sampler);
346                        execution_context_owned_stats.reboot_exited_samplers.add(1);
347                    }
348                    Ok(ProjectSamplerTaskExit::WorkCompleted) => {
349                        info!("A sampler completed its workload, and exited.");
350                        execution_context_owned_stats.healthily_exited_samplers.add(1);
351                    }
352                }
353            }
354
355            healthily_exited_samplers
356        });
357
358        TaskCancellation {
359            execution_context,
360            senders,
361            _sampler_executor_stats: task_cancellation_owned_stats,
362        }
363    }
364}
365
366pub struct ProjectSampler {
367    archive_reader: InspectArchiveReader,
368    /// The metrics used by this Project.
369    metrics: Vec<RefCell<MetricConfig>>,
370    /// Cache from Inspect selector to last sampled property. This is the selector from
371    /// [`MetricConfig`]; it may contain wildcards.
372    metric_cache: RefCell<HashMap<MetricCacheKey, Property>>,
373    /// Cobalt logger proxy using this ProjectSampler's project id. It's an Option so it doesn't
374    /// have to be created for unit tests; it will always be Some() outside unit tests.
375    metric_loggers: HashMap<ProjectId, MetricEventLoggerProxy>,
376    /// The frequency with which we snapshot Inspect properties
377    /// for this project.
378    poll_rate_sec: i64,
379    /// Inspect stats on a node namespaced by this project's associated id.
380    /// It's an arc since a single project can have multiple samplers at
381    /// different frequencies, but we want a single project to have one node.
382    project_sampler_stats: Arc<ProjectSamplerStats>,
383    /// The id of the project.
384    /// Project ID that metrics are being sampled and forwarded on behalf of.
385    project_id: ProjectId,
386    /// Records whether there are any known selectors left.
387    /// Reset in rebuild_selector_data_structures().
388    all_done: bool,
389}
390
391#[derive(Debug, Eq, Hash, PartialEq)]
392struct MetricCacheKey {
393    handle_name: InspectHandleName,
394    selector: String,
395}
396
397#[allow(clippy::large_enum_variant)] // TODO(https://fxbug.dev/401087900)
398pub enum ProjectSamplerTaskExit {
399    /// The [`ProjectSampler`] processed a reboot signal on its oneshot, and yielded
400    /// to the final-snapshot.
401    RebootTriggered(ProjectSampler),
402    /// The [`ProjectSampler`] has no more work to complete; perhaps all metrics were "upload_once"?
403    WorkCompleted,
404}
405
406pub enum ProjectSamplerEvent {
407    TimerTriggered,
408    TimerDied,
409    RebootTriggered,
410    RebootChannelClosed(Error),
411}
412
413/// Indicates whether a sampler in the project has been removed (set to None), in which case the
414/// ArchiveAccessor should be reconfigured.
415/// The selector lists may be consolidated (and thus the maps would be rebuilt), but
416/// the program will run correctly whether they are or not.
417#[derive(PartialEq)]
418enum SnapshotOutcome {
419    SelectorsChanged,
420    SelectorsUnchanged,
421}
422
423impl ProjectSampler {
424    pub async fn new(
425        config: Arc<ProjectConfig>,
426        metric_logger_factory: Arc<MetricEventLoggerFactoryProxy>,
427        minimum_sample_rate_sec: i64,
428        project_sampler_stats: Arc<ProjectSamplerStats>,
429    ) -> Result<ProjectSampler, Error> {
430        let customer_id = config.customer_id;
431        let project_id = config.project_id;
432        let poll_rate_sec = config.poll_rate_sec;
433        if poll_rate_sec < minimum_sample_rate_sec {
434            return Err(format_err!(
435                concat!(
436                    "Project with id: {:?} uses a polling rate:",
437                    " {:?} below minimum configured poll rate: {:?}"
438                ),
439                project_id,
440                poll_rate_sec,
441                minimum_sample_rate_sec,
442            ));
443        }
444
445        project_sampler_stats.project_sampler_count.add(1);
446        project_sampler_stats.metrics_configured.add(config.metrics.len() as u64);
447
448        let mut metric_loggers = HashMap::new();
449        // TODO(https://fxbug.dev/42071858): we should remove this once we support batching. There should be
450        // only one metric logger per ProjectSampler.
451        if *project_id != 0 {
452            let (metric_logger_proxy, metrics_server_end) = fidl::endpoints::create_proxy();
453            let project_spec = ProjectSpec {
454                customer_id: Some(*customer_id),
455                project_id: Some(*project_id),
456                ..ProjectSpec::default()
457            };
458            metric_logger_factory
459                .create_metric_event_logger(&project_spec, metrics_server_end)
460                .await?
461                .map_err(|e| format_err!("error response for project {}: {:?}", project_id, e))?;
462            metric_loggers.insert(project_id, metric_logger_proxy);
463        }
464        for metric in &config.metrics {
465            if let Some(metric_project_id) = metric.project_id {
466                if let Entry::Vacant(entry) = metric_loggers.entry(metric_project_id) {
467                    let (metric_logger_proxy, metrics_server_end) = fidl::endpoints::create_proxy();
468                    let project_spec = ProjectSpec {
469                        customer_id: Some(*customer_id),
470                        project_id: Some(*metric_project_id),
471                        ..ProjectSpec::default()
472                    };
473                    metric_logger_factory
474                        .create_metric_event_logger(&project_spec, metrics_server_end)
475                        .await?
476                        .map_err(|e|
477                            format_err!(
478                                "error response for project {} while creating metric logger {}: {:?}",
479                                project_id,
480                                metric_project_id,
481                                e
482                            ))?;
483                    entry.insert(metric_logger_proxy);
484                }
485            }
486        }
487
488        let mut project_sampler = ProjectSampler {
489            project_id,
490            archive_reader: ArchiveReader::inspect(),
491            metrics: config.metrics.iter().map(|m| RefCell::new(m.clone())).collect(),
492            metric_cache: RefCell::new(HashMap::new()),
493            metric_loggers,
494            poll_rate_sec,
495            project_sampler_stats,
496            all_done: true,
497        };
498        // Fill in archive_reader
499        project_sampler.rebuild_selector_data_structures();
500        Ok(project_sampler)
501    }
502
503    pub fn spawn(
504        mut self,
505        mut reboot_oneshot: oneshot::Receiver<()>,
506    ) -> fasync::Task<Result<ProjectSamplerTaskExit, Error>> {
507        fasync::Task::local(async move {
508            let mut periodic_timer =
509                fasync::Interval::new(zx::MonotonicDuration::from_seconds(self.poll_rate_sec));
510            loop {
511                let done = select! {
512                    opt = periodic_timer.next() => {
513                        if opt.is_some() {
514                            ProjectSamplerEvent::TimerTriggered
515                        } else {
516                            ProjectSamplerEvent::TimerDied
517                        }
518                    },
519                    oneshot_res = reboot_oneshot => {
520                        match oneshot_res {
521                            Ok(()) => {
522                                ProjectSamplerEvent::RebootTriggered
523                            },
524                            Err(e) => {
525                                ProjectSamplerEvent::RebootChannelClosed(
526                                    format_err!("Oneshot closure error: {:?}", e))
527                            }
528                        }
529                    }
530                };
531
532                match done {
533                    ProjectSamplerEvent::TimerDied => {
534                        return Err(format_err!(concat!(
535                            "The ProjectSampler timer died, something went wrong.",
536                        )));
537                    }
538                    ProjectSamplerEvent::RebootChannelClosed(e) => {
539                        // TODO(https://fxbug.dev/42118220): Consider differentiating errors if
540                        // we ever want to recover a sampler after a oneshot channel death.
541                        return Err(format_err!(
542                            concat!(
543                                "The Reboot signaling oneshot died, something went wrong: {:?}",
544                            ),
545                            e
546                        ));
547                    }
548                    ProjectSamplerEvent::RebootTriggered => {
549                        // The reboot oneshot triggered, meaning it's time to perform
550                        // our final snapshot. Return self to reuse most recent cache.
551                        return Ok(ProjectSamplerTaskExit::RebootTriggered(self));
552                    }
553                    ProjectSamplerEvent::TimerTriggered => {
554                        self.process_next_snapshot().await?;
555                        // Check whether this sampler
556                        // still needs to run (perhaps all metrics were
557                        // "upload_once"?). If it doesn't, we want to be
558                        // sure that it is not included in the reboot-workload.
559                        if self.is_all_done() {
560                            return Ok(ProjectSamplerTaskExit::WorkCompleted);
561                        }
562                    }
563                }
564            }
565        })
566    }
567
568    async fn process_next_snapshot(&mut self) -> Result<(), Error> {
569        let snapshot_data = self.archive_reader.snapshot().await?;
570        let events_to_log = self.process_snapshot(snapshot_data).await?;
571        self.log_events(events_to_log).await?;
572        Ok(())
573    }
574
575    async fn process_snapshot(
576        &mut self,
577        snapshot: Vec<Data<Inspect>>,
578    ) -> Result<Vec<EventToLog>, Error> {
579        let mut selectors_changed = false;
580        let mut events_to_log = vec![];
581        for data_packet in snapshot.iter() {
582            match &data_packet.payload {
583                None => {
584                    process_schema_errors(&data_packet.metadata.errors, &data_packet.moniker);
585                }
586                Some(payload) => {
587                    let (selector_outcome, mut events) = self.process_component_data(
588                        payload,
589                        &data_packet.metadata.name,
590                        &data_packet.moniker,
591                    )?;
592                    if selector_outcome == SnapshotOutcome::SelectorsChanged {
593                        selectors_changed = true;
594                    }
595                    events_to_log.append(&mut events);
596                }
597            }
598        }
599        if selectors_changed {
600            self.rebuild_selector_data_structures();
601        }
602        Ok(events_to_log)
603    }
604
605    fn is_all_done(&self) -> bool {
606        self.all_done
607    }
608
609    fn rebuild_selector_data_structures(&mut self) {
610        self.archive_reader = ArchiveReader::inspect();
611        for metric in &self.metrics {
612            for selector in metric.borrow().selectors.iter().cloned() {
613                self.archive_reader.add_selector(selector);
614                self.all_done = false;
615            }
616        }
617        self.archive_reader.retry(RetryConfig::never());
618    }
619
620    fn filter_metrics_by_moniker_and_tree_name<'a>(
621        &'a self,
622        moniker: &'a ExtendedMoniker,
623        tree_name: &'a str,
624    ) -> impl Iterator<Item = &'a RefCell<MetricConfig>> {
625        self.metrics.iter().filter(|metric| {
626            moniker
627                .match_against_selectors_and_tree_name(tree_name, metric.borrow().selectors.iter())
628                .next()
629                .is_some()
630        })
631    }
632
633    fn process_component_data(
634        &mut self,
635        payload: &DiagnosticsHierarchy,
636        inspect_handle_name: &InspectHandleName,
637        moniker: &ExtendedMoniker,
638    ) -> Result<(SnapshotOutcome, Vec<EventToLog>), Error> {
639        let filtered_metrics =
640            self.filter_metrics_by_moniker_and_tree_name(moniker, inspect_handle_name.as_ref());
641        let mut snapshot_outcome = SnapshotOutcome::SelectorsUnchanged;
642        let mut events_to_log = vec![];
643        for metric in filtered_metrics {
644            let mut selector_to_keep = None;
645            let project_id = metric.borrow().project_id.unwrap_or(self.project_id);
646            for (selector_idx, selector) in metric.borrow().selectors.iter().enumerate() {
647                let found_properties =
648                    diagnostics_hierarchy::select_from_hierarchy(payload, selector)?;
649                match found_properties {
650                    // Maybe the data hasn't been published yet. Maybe another selector in this
651                    // metric is the correct one to find the data. Either way, not-found is fine.
652                    SelectResult::Properties(p) if p.is_empty() => {}
653                    SelectResult::Properties(p) if p.len() == 1 => {
654                        let metric_cache_key = MetricCacheKey {
655                            handle_name: inspect_handle_name.clone(),
656                            selector: selectors::selector_to_string(selector, Default::default())
657                                .unwrap(),
658                        };
659                        if let Some(event) = Self::prepare_sample(
660                            metric.borrow(),
661                            self.metric_cache.borrow_mut(),
662                            metric_cache_key,
663                            p[0],
664                        )? {
665                            events_to_log.push((project_id, event));
666                        }
667                        selector_to_keep = Some(selector_idx);
668                        break;
669                    }
670                    too_many => {
671                        warn!(
672                            too_many:?,
673                            selector:?;
674                            "Too many matches for selector"
675                        );
676                    }
677                }
678            }
679
680            if let Some(selector_idx) = selector_to_keep {
681                if Self::update_selectors_for_metric(metric.borrow_mut(), selector_idx) {
682                    snapshot_outcome = SnapshotOutcome::SelectorsChanged;
683                }
684            }
685        }
686        Ok((snapshot_outcome, events_to_log))
687    }
688
689    fn update_selectors_for_metric(
690        mut metric: RefMut<'_, MetricConfig>,
691        keep_selector_idx: usize,
692    ) -> bool {
693        if metric.upload_once {
694            metric.selectors = Vec::new();
695            return true;
696        }
697        let deleted = metric.selectors.len() > 1;
698        metric.selectors = vec![metric.selectors.remove(keep_selector_idx)];
699        deleted
700    }
701
702    fn prepare_sample<'a>(
703        metric: Ref<'a, MetricConfig>,
704        metric_cache: RefMut<'a, HashMap<MetricCacheKey, Property>>,
705        metric_cache_key: MetricCacheKey,
706        new_sample: &Property,
707    ) -> Result<Option<MetricEvent>, Error> {
708        let previous_sample_opt: Option<&Property> = metric_cache.get(&metric_cache_key);
709
710        if let Some(payload) = process_sample_for_data_type(
711            new_sample,
712            previous_sample_opt,
713            &metric_cache_key,
714            &metric.metric_type,
715        ) {
716            Self::maybe_update_cache(
717                metric_cache,
718                new_sample,
719                &metric.metric_type,
720                metric_cache_key,
721            );
722            Ok(Some(MetricEvent {
723                metric_id: *metric.metric_id,
724                event_codes: metric.event_codes.iter().map(|code| **code).collect(),
725                payload,
726            }))
727        } else {
728            Ok(None)
729        }
730    }
731
732    async fn log_events(&mut self, events: Vec<EventToLog>) -> Result<(), Error> {
733        for (project_id, event) in events.into_iter() {
734            self.metric_loggers
735                .get(&project_id)
736                .as_ref()
737                .unwrap()
738                .log_metric_events(&[event])
739                .await?
740                .map_err(|e| format_err!("error from cobalt: {:?}", e))?;
741            self.project_sampler_stats.cobalt_logs_sent.add(1);
742        }
743        Ok(())
744    }
745
746    fn maybe_update_cache(
747        mut cache: RefMut<'_, HashMap<MetricCacheKey, Property>>,
748        new_sample: &Property,
749        data_type: &MetricType,
750        metric_cache_key: MetricCacheKey,
751    ) {
752        match data_type {
753            MetricType::Occurrence | MetricType::IntHistogram => {
754                cache.insert(metric_cache_key, new_sample.clone());
755            }
756            MetricType::Integer | MetricType::String => (),
757        }
758    }
759}
760
761#[cfg(test)]
762impl ProjectSampler {
763    fn push_metric(&mut self, metric: MetricConfig) {
764        self.metrics.push(RefCell::new(metric));
765    }
766}
767
768fn process_sample_for_data_type(
769    new_sample: &Property,
770    previous_sample_opt: Option<&Property>,
771    data_source: &MetricCacheKey,
772    data_type: &MetricType,
773) -> Option<MetricEventPayload> {
774    let event_payload_res = match data_type {
775        MetricType::Occurrence => process_occurence(new_sample, previous_sample_opt, data_source),
776        MetricType::IntHistogram => {
777            process_int_histogram(new_sample, previous_sample_opt, data_source)
778        }
779        MetricType::Integer => {
780            // If we previously cached a metric with an int-type, log a warning and ignore it.
781            // This may be a case of using a single selector for two metrics, one event count
782            // and one int.
783            if previous_sample_opt.is_some() {
784                warn!("Sampler has erroneously cached an Int type metric: {:?}", data_source);
785            }
786            process_int(new_sample, data_source)
787        }
788        MetricType::String => {
789            if previous_sample_opt.is_some() {
790                warn!("Sampler has erroneously cached a String type metric: {:?}", data_source);
791            }
792            process_string(new_sample, data_source)
793        }
794    };
795
796    match event_payload_res {
797        Ok(payload_opt) => payload_opt,
798        Err(err) => {
799            warn!(data_source:?, err:?; "Failed to process Inspect property for cobalt",);
800            None
801        }
802    }
803}
804
805/// It's possible for Inspect numerical properties to experience overflows/conversion
806/// errors when being mapped to Cobalt types. Sanitize these numericals, and provide
807/// meaningful errors.
808fn sanitize_unsigned_numerical(diff: u64, data_source: &MetricCacheKey) -> Result<i64, Error> {
809    match diff.try_into() {
810        Ok(diff) => Ok(diff),
811        Err(e) => Err(format_err!(
812            concat!(
813                "Selector used for EventCount type",
814                " refered to an unsigned int property,",
815                " but cobalt requires i64, and casting introduced overflow",
816                " which produces a negative int: {:?}. This could be due to",
817                " a single sample being larger than i64, or a diff between",
818                " samples being larger than i64. Error: {:?}"
819            ),
820            data_source,
821            e
822        )),
823    }
824}
825
826fn process_int_histogram(
827    new_sample: &Property,
828    prev_sample_opt: Option<&Property>,
829    data_source: &MetricCacheKey,
830) -> Result<Option<MetricEventPayload>, Error> {
831    let diff = match prev_sample_opt {
832        None => convert_inspect_histogram_to_cobalt_histogram(new_sample, data_source)?,
833        Some(prev_sample) => {
834            // If the data type changed then we just reset the cache.
835            if std::mem::discriminant(new_sample) == std::mem::discriminant(prev_sample) {
836                compute_histogram_diff(new_sample, prev_sample, data_source)?
837            } else {
838                convert_inspect_histogram_to_cobalt_histogram(new_sample, data_source)?
839            }
840        }
841    };
842
843    let non_empty_diff: Vec<HistogramBucket> = diff.into_iter().filter(|v| v.count != 0).collect();
844    if !non_empty_diff.is_empty() {
845        Ok(Some(MetricEventPayload::Histogram(non_empty_diff)))
846    } else {
847        Ok(None)
848    }
849}
850
851fn compute_histogram_diff(
852    new_sample: &Property,
853    old_sample: &Property,
854    data_source: &MetricCacheKey,
855) -> Result<Vec<HistogramBucket>, Error> {
856    let new_histogram_buckets =
857        convert_inspect_histogram_to_cobalt_histogram(new_sample, data_source)?;
858    let old_histogram_buckets =
859        convert_inspect_histogram_to_cobalt_histogram(old_sample, data_source)?;
860
861    if old_histogram_buckets.len() != new_histogram_buckets.len() {
862        return Err(format_err!(
863            concat!(
864                "Selector referenced an Inspect IntArray",
865                " that was specified as an IntHistogram type ",
866                " but the histogram bucket count changed between",
867                " samples, which is incompatible with Cobalt.",
868                " Selector: {:?}, Inspect type: {}"
869            ),
870            data_source,
871            new_sample.discriminant_name()
872        ));
873    }
874
875    new_histogram_buckets
876        .iter()
877        .zip(old_histogram_buckets)
878        .map(|(new_bucket, old_bucket)| {
879            if new_bucket.count < old_bucket.count {
880                return Err(format_err!(
881                    concat!(
882                        "Selector referenced an Inspect IntArray",
883                        " that was specified as an IntHistogram type ",
884                        " but at least one bucket saw the count decrease",
885                        " between samples, which is incompatible with Cobalt's",
886                        " need for monotonically increasing counts.",
887                        " Selector: {:?}, Inspect type: {}"
888                    ),
889                    data_source,
890                    new_sample.discriminant_name()
891                ));
892            }
893            Ok(HistogramBucket {
894                count: new_bucket.count - old_bucket.count,
895                index: new_bucket.index,
896            })
897        })
898        .collect::<Result<Vec<HistogramBucket>, Error>>()
899}
900
901fn build_cobalt_histogram(counts: impl Iterator<Item = u64>) -> Vec<HistogramBucket> {
902    counts
903        .enumerate()
904        .map(|(index, count)| HistogramBucket { index: index as u32, count })
905        .collect()
906}
907
908fn build_sparse_cobalt_histogram(
909    counts: impl Iterator<Item = u64>,
910    indexes: &[usize],
911    size: usize,
912) -> Vec<HistogramBucket> {
913    let mut histogram =
914        Vec::from_iter((0..size).map(|index| HistogramBucket { index: index as u32, count: 0 }));
915    for (index, count) in indexes.iter().zip(counts) {
916        histogram[*index].count = count;
917    }
918    histogram
919}
920
921fn convert_inspect_histogram_to_cobalt_histogram(
922    inspect_histogram: &Property,
923    data_source: &MetricCacheKey,
924) -> Result<Vec<HistogramBucket>, Error> {
925    macro_rules! err {($($message:expr),+) => {
926        Err(format_err!(
927            concat!($($message),+ , " Selector: {:?}, Inspect type: {}"),
928            data_source,
929            inspect_histogram.discriminant_name()
930        ))
931    }}
932
933    let sanitize_size = |size: usize| -> Result<(), Error> {
934        if size > u32::MAX as usize {
935            return err!(
936                "Selector referenced an Inspect array",
937                " that was specified as a histogram type ",
938                " but contained an index too large for a u32."
939            );
940        }
941        Ok(())
942    };
943
944    let sanitize_indexes = |indexes: &[usize], size: usize| -> Result<(), Error> {
945        for index in indexes.iter() {
946            if *index >= size {
947                return err!(
948                    "Selector referenced an Inspect array",
949                    " that was specified as a histogram type ",
950                    " but contained an invalid index."
951                );
952            }
953        }
954        Ok(())
955    };
956
957    let sanitize_counts = |counts: &[i64]| -> Result<(), Error> {
958        for count in counts.iter() {
959            if *count < 0 {
960                return err!(
961                    "Selector referenced an Inspect IntArray",
962                    " that was specified as an IntHistogram type ",
963                    " but a bucket contained a negative count. This",
964                    " is incompatible with Cobalt histograms which only",
965                    " support positive histogram counts."
966                );
967            }
968        }
969        Ok(())
970    };
971
972    let histogram = match inspect_histogram {
973        Property::IntArray(
974            _,
975            ArrayContent::LinearHistogram(LinearHistogram { counts, indexes, size, .. }),
976        )
977        | Property::IntArray(
978            _,
979            ArrayContent::ExponentialHistogram(ExponentialHistogram {
980                counts, indexes, size, ..
981            }),
982        ) => {
983            sanitize_size(*size)?;
984            sanitize_counts(counts)?;
985            match (indexes, counts) {
986                (None, counts) => build_cobalt_histogram(counts.iter().map(|c| *c as u64)),
987                (Some(indexes), counts) => {
988                    sanitize_indexes(indexes, *size)?;
989                    build_sparse_cobalt_histogram(counts.iter().map(|c| *c as u64), indexes, *size)
990                }
991            }
992        }
993        Property::UintArray(
994            _,
995            ArrayContent::LinearHistogram(LinearHistogram { counts, indexes, size, .. }),
996        )
997        | Property::UintArray(
998            _,
999            ArrayContent::ExponentialHistogram(ExponentialHistogram {
1000                counts, indexes, size, ..
1001            }),
1002        ) => {
1003            sanitize_size(*size)?;
1004            match (indexes, counts) {
1005                (None, counts) => build_cobalt_histogram(counts.iter().copied()),
1006                (Some(indexes), counts) => {
1007                    sanitize_indexes(indexes, *size)?;
1008                    build_sparse_cobalt_histogram(counts.iter().copied(), indexes, *size)
1009                }
1010            }
1011        }
1012        _ => {
1013            // TODO(https://fxbug.dev/42118220): Does cobalt support floors or step counts that are
1014            // not ints? if so, we can support that as well with double arrays if the
1015            // actual counts are whole numbers.
1016            return Err(format_err!(
1017                concat!(
1018                    "Selector referenced an Inspect property",
1019                    " that was specified as an IntHistogram type ",
1020                    " but is unable to be encoded in a cobalt HistogramBucket",
1021                    " vector. Selector: {:?}, Inspect type: {}"
1022                ),
1023                data_source,
1024                inspect_histogram.discriminant_name()
1025            ));
1026        }
1027    };
1028    Ok(histogram)
1029}
1030
1031fn process_int(
1032    new_sample: &Property,
1033    data_source: &MetricCacheKey,
1034) -> Result<Option<MetricEventPayload>, Error> {
1035    let sampled_int = match new_sample {
1036        Property::Uint(_, val) => sanitize_unsigned_numerical(*val, data_source)?,
1037        Property::Int(_, val) => *val,
1038        _ => {
1039            return Err(format_err!(
1040                concat!(
1041                    "Selector referenced an Inspect property",
1042                    " that was specified as an Int type ",
1043                    " but is unable to be encoded in an i64",
1044                    " Selector: {:?}, Inspect type: {}"
1045                ),
1046                data_source,
1047                new_sample.discriminant_name()
1048            ));
1049        }
1050    };
1051
1052    Ok(Some(MetricEventPayload::IntegerValue(sampled_int)))
1053}
1054
1055fn process_string(
1056    new_sample: &Property,
1057    data_source: &MetricCacheKey,
1058) -> Result<Option<MetricEventPayload>, Error> {
1059    let sampled_string = match new_sample {
1060        Property::String(_, val) => val.clone(),
1061        _ => {
1062            return Err(format_err!(
1063                concat!(
1064                    "Selector referenced an Inspect property specified as String",
1065                    " but property is not type String.",
1066                    " Selector: {:?}, Inspect type: {}"
1067                ),
1068                data_source,
1069                new_sample.discriminant_name()
1070            ));
1071        }
1072    };
1073
1074    Ok(Some(MetricEventPayload::StringValue(sampled_string)))
1075}
1076
1077fn process_occurence(
1078    new_sample: &Property,
1079    prev_sample_opt: Option<&Property>,
1080    data_source: &MetricCacheKey,
1081) -> Result<Option<MetricEventPayload>, Error> {
1082    let diff = match prev_sample_opt {
1083        None => compute_initial_event_count(new_sample, data_source)?,
1084        Some(prev_sample) => compute_event_count_diff(new_sample, prev_sample, data_source)?,
1085    };
1086
1087    if diff < 0 {
1088        return Err(format_err!(
1089            concat!(
1090                "Event count must be monotonically increasing,",
1091                " but we observed a negative event count diff for: {:?}"
1092            ),
1093            data_source
1094        ));
1095    }
1096
1097    if diff == 0 {
1098        return Ok(None);
1099    }
1100
1101    // TODO(https://fxbug.dev/42118220): Once fuchsia.cobalt is gone, we don't need to preserve
1102    // occurrence counts "fitting" into i64s.
1103    Ok(Some(MetricEventPayload::Count(diff as u64)))
1104}
1105
1106fn compute_initial_event_count(
1107    new_sample: &Property,
1108    data_source: &MetricCacheKey,
1109) -> Result<i64, Error> {
1110    match new_sample {
1111        Property::Uint(_, val) => sanitize_unsigned_numerical(*val, data_source),
1112        Property::Int(_, val) => Ok(*val),
1113        _ => Err(format_err!(
1114            concat!(
1115                "Selector referenced an Inspect property",
1116                " that is not compatible with cached",
1117                " transformation to an event count.",
1118                " Selector: {:?}, {}"
1119            ),
1120            data_source,
1121            new_sample.discriminant_name()
1122        )),
1123    }
1124}
1125
1126fn compute_event_count_diff(
1127    new_sample: &Property,
1128    old_sample: &Property,
1129    data_source: &MetricCacheKey,
1130) -> Result<i64, Error> {
1131    match (new_sample, old_sample) {
1132        // We don't need to validate that old_count and new_count are positive here.
1133        // If new_count was negative, and old_count was positive, then the diff would be
1134        // negative, which is an errorful state. It's impossible for old_count to be negative
1135        // as either it was the first sample which would make a negative diff which is an error,
1136        // or it was a negative new_count with a positive old_count, which we've already shown will
1137        // produce an errorful state.
1138        (Property::Int(_, new_count), Property::Int(_, old_count)) => Ok(new_count - old_count),
1139        (Property::Uint(_, new_count), Property::Uint(_, old_count)) => {
1140            // u64::MAX will cause sanitized_unsigned_numerical to build an
1141            // appropriate error message for a subtraction underflow.
1142            sanitize_unsigned_numerical(
1143                new_count.checked_sub(*old_count).unwrap_or(u64::MAX),
1144                data_source,
1145            )
1146        }
1147        // If we have a correctly typed new sample, but it didn't match either of the above cases,
1148        // this means the new sample changed types compared to the old sample. We should just
1149        // restart the cache, and treat the new sample as a first observation.
1150        (_, Property::Uint(_, _)) | (_, Property::Int(_, _)) => {
1151            warn!(
1152                "Inspect type of sampled data changed between samples. Restarting cache. {:?}",
1153                data_source
1154            );
1155            compute_initial_event_count(new_sample, data_source)
1156        }
1157        _ => Err(format_err!(
1158            concat!(
1159                "Inspect type of sampled data changed between samples",
1160                " to a type incompatible with event counters.",
1161                " Selector: {:?}, New type: {:?}"
1162            ),
1163            data_source,
1164            new_sample.discriminant_name()
1165        )),
1166    }
1167}
1168
1169fn process_schema_errors(
1170    errors: &Option<Vec<diagnostics_data::InspectError>>,
1171    moniker: &ExtendedMoniker,
1172) {
1173    match errors {
1174        Some(errors) => {
1175            for error in errors {
1176                warn!(moniker:%, error:?; "");
1177            }
1178        }
1179        None => {
1180            warn!(moniker:%; "Encountered null payload and no errors.");
1181        }
1182    }
1183}
1184
1185#[cfg(test)]
1186mod tests {
1187    use super::*;
1188    use diagnostics_data::{InspectDataBuilder, Timestamp};
1189    use diagnostics_hierarchy::hierarchy;
1190    use fidl_fuchsia_inspect::DEFAULT_TREE_NAME;
1191    use sampler_config::{MetricId, ProjectId};
1192
1193    #[fuchsia::test]
1194    fn test_filter_metrics() {
1195        let mut sampler = ProjectSampler {
1196            archive_reader: ArchiveReader::inspect(),
1197            metrics: vec![],
1198            metric_cache: RefCell::new(HashMap::new()),
1199            metric_loggers: HashMap::new(),
1200            project_id: ProjectId(1),
1201            poll_rate_sec: 3600,
1202            project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
1203            all_done: true,
1204        };
1205        let selector_foo: String = "core/foo:[name=foo]root/path:value".to_string();
1206        let selector_bar: String = "core/foo:[name=bar]root/path:value".to_string();
1207
1208        sampler.push_metric(MetricConfig {
1209            project_id: None,
1210            selectors: vec![selectors::parse_verbose(&selector_foo).unwrap()],
1211            metric_id: MetricId(1),
1212            // Occurrence type with a value of zero will not attempt to use any loggers.
1213            metric_type: MetricType::Occurrence,
1214            event_codes: Vec::new(),
1215            // upload_once means that process_component_data will return SelectorsChanged if
1216            // it is found in the map.
1217            upload_once: true,
1218        });
1219        sampler.push_metric(MetricConfig {
1220            project_id: None,
1221            selectors: vec![selectors::parse_verbose(&selector_bar).unwrap()],
1222            metric_id: MetricId(2),
1223            // Occurrence type with a value of zero will not attempt to use any loggers.
1224            metric_type: MetricType::Occurrence,
1225            event_codes: Vec::new(),
1226            // upload_once means that process_component_data will return SelectorsChanged if
1227            // it is found in the map.
1228            upload_once: true,
1229        });
1230
1231        sampler.rebuild_selector_data_structures();
1232
1233        let moniker = ExtendedMoniker::try_from("core/foo").unwrap();
1234
1235        let filtered_metrics =
1236            sampler.filter_metrics_by_moniker_and_tree_name(&moniker, "foo").collect::<Vec<_>>();
1237        assert_eq!(1, filtered_metrics.len());
1238        assert_eq!(MetricId(1), filtered_metrics[0].borrow().metric_id);
1239    }
1240
1241    #[fuchsia::test]
1242    fn test_filter_metrics_with_wildcards() {
1243        let mut sampler = ProjectSampler {
1244            archive_reader: ArchiveReader::inspect(),
1245            metrics: vec![],
1246            metric_cache: RefCell::new(HashMap::new()),
1247            metric_loggers: HashMap::new(),
1248            project_id: ProjectId(1),
1249            poll_rate_sec: 3600,
1250            project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
1251            all_done: true,
1252        };
1253        let selector_foo: String = r"bootstrap/*-drivers\:*:[name=foo]root/path:value".to_string();
1254        let selector_bar1: String =
1255            r"bootstrap/*-drivers\:*:[name=bar]root/path:value1".to_string();
1256        let selector_bar2: String =
1257            r"bootstrap/*-drivers\:*:[name=bar]root/path:value2".to_string();
1258
1259        sampler.push_metric(MetricConfig {
1260            project_id: None,
1261            selectors: vec![selectors::parse_verbose(&selector_foo).unwrap()],
1262            metric_id: MetricId(1),
1263            // Occurrence type with a value of zero will not attempt to use any loggers.
1264            metric_type: MetricType::Occurrence,
1265            event_codes: Vec::new(),
1266            // upload_once means that process_component_data will return SelectorsChanged if
1267            // it is found in the map.
1268            upload_once: true,
1269        });
1270        sampler.push_metric(MetricConfig {
1271            project_id: None,
1272            selectors: vec![selectors::parse_verbose(&selector_bar1).unwrap()],
1273            metric_id: MetricId(2),
1274            // Occurrence type with a value of zero will not attempt to use any loggers.
1275            metric_type: MetricType::Occurrence,
1276            event_codes: Vec::new(),
1277            // upload_once means that process_component_data will return SelectorsChanged if
1278            // it is found in the map.
1279            upload_once: true,
1280        });
1281        sampler.push_metric(MetricConfig {
1282            project_id: None,
1283            selectors: vec![selectors::parse_verbose(&selector_bar2).unwrap()],
1284            metric_id: MetricId(3),
1285            // Occurrence type with a value of zero will not attempt to use any loggers.
1286            metric_type: MetricType::Occurrence,
1287            event_codes: Vec::new(),
1288            // upload_once means that process_component_data will return SelectorsChanged if
1289            // it is found in the map.
1290            upload_once: true,
1291        });
1292
1293        sampler.rebuild_selector_data_structures();
1294
1295        let moniker = ExtendedMoniker::try_from("bootstrap/boot-drivers:098098").unwrap();
1296
1297        let filtered_metrics =
1298            sampler.filter_metrics_by_moniker_and_tree_name(&moniker, "foo").collect::<Vec<_>>();
1299        assert_eq!(1, filtered_metrics.len());
1300        assert_eq!(MetricId(1), filtered_metrics[0].borrow().metric_id);
1301
1302        let filtered_metrics =
1303            sampler.filter_metrics_by_moniker_and_tree_name(&moniker, "bar").collect::<Vec<_>>();
1304        assert_eq!(2, filtered_metrics.len());
1305        assert_eq!(MetricId(2), filtered_metrics[0].borrow().metric_id);
1306        assert_eq!(MetricId(3), filtered_metrics[1].borrow().metric_id);
1307    }
1308
1309    /// Test inserting a string into the hierarchy that requires escaping.
1310    #[fuchsia::test]
1311    fn test_process_payload_with_escapes() {
1312        let unescaped: String = "path/to".to_string();
1313        let hierarchy = hierarchy! {
1314            root: {
1315                var unescaped: {
1316                    value: 0,
1317                    "value/with:escapes": 0,
1318                }
1319            }
1320        };
1321
1322        let mut sampler = ProjectSampler {
1323            archive_reader: ArchiveReader::inspect(),
1324            metrics: vec![],
1325            metric_cache: RefCell::new(HashMap::new()),
1326            metric_loggers: HashMap::new(),
1327            project_id: ProjectId(1),
1328            poll_rate_sec: 3600,
1329            project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
1330            all_done: true,
1331        };
1332        let selector: String = "my/component:root/path\\/to:value".to_string();
1333        sampler.push_metric(MetricConfig {
1334            project_id: None,
1335            selectors: vec![selectors::parse_verbose(&selector).unwrap()],
1336            metric_id: MetricId(1),
1337            // Occurrence type with a value of zero will not attempt to use any loggers.
1338            metric_type: MetricType::Occurrence,
1339            event_codes: Vec::new(),
1340            // upload_once means that process_component_data will return SelectorsChanged if
1341            // it is found in the map.
1342            upload_once: true,
1343        });
1344        sampler.rebuild_selector_data_structures();
1345        match sampler.process_component_data(
1346            &hierarchy,
1347            &InspectHandleName::name(DEFAULT_TREE_NAME),
1348            &"my/component".try_into().unwrap(),
1349        ) {
1350            // This selector will be found and removed from the map, resulting in a
1351            // SelectorsChanged response.
1352            Ok((SnapshotOutcome::SelectorsChanged, _events)) => (),
1353            _ => panic!("Expecting SelectorsChanged from process_component_data."),
1354        }
1355
1356        let selector_with_escaped_property: String =
1357            "my/component:root/path\\/to:value\\/with\\:escapes".to_string();
1358        sampler.push_metric(MetricConfig {
1359            project_id: None,
1360            selectors: vec![selectors::parse_verbose(&selector_with_escaped_property).unwrap()],
1361            metric_id: MetricId(1),
1362            // Occurrence type with a value of zero will not attempt to use any loggers.
1363            metric_type: MetricType::Occurrence,
1364            event_codes: Vec::new(),
1365            // upload_once means that the method will return SelectorsChanged if it is found
1366            // in the map.
1367            upload_once: true,
1368        });
1369        sampler.rebuild_selector_data_structures();
1370        match sampler.process_component_data(
1371            &hierarchy,
1372            &InspectHandleName::name(DEFAULT_TREE_NAME),
1373            &"my/component".try_into().unwrap(),
1374        ) {
1375            // This selector will be found and removed from the map, resulting in a
1376            // SelectorsChanged response.
1377            Ok((SnapshotOutcome::SelectorsChanged, _events)) => (),
1378            _ => panic!("Expecting SelectorsChanged from process_component_data."),
1379        }
1380
1381        let selector_unfound: String = "my/component:root/path/to:value".to_string();
1382        sampler.push_metric(MetricConfig {
1383            project_id: None,
1384            selectors: vec![selectors::parse_verbose(&selector_unfound).unwrap()],
1385            metric_id: MetricId(1),
1386            // Occurrence type with a value of zero will not attempt to use any loggers.
1387            metric_type: MetricType::Occurrence,
1388            event_codes: Vec::new(),
1389            // upload_once means that the method will return SelectorsChanged if it is found
1390            // in the map.
1391            upload_once: true,
1392        });
1393        sampler.rebuild_selector_data_structures();
1394        match sampler.process_component_data(
1395            &hierarchy,
1396            &InspectHandleName::name(DEFAULT_TREE_NAME),
1397            &"my/component".try_into().unwrap(),
1398        ) {
1399            // This selector will not be found and removed from the map, resulting in SelectorsUnchanged.
1400            Ok((SnapshotOutcome::SelectorsUnchanged, _events)) => (),
1401            _ => panic!("Expecting SelectorsUnchanged from process_component_data."),
1402        }
1403    }
1404
1405    /// Test that a decreasing occurrence type (which is not allowed) doesn't crash due to e.g.
1406    /// unchecked unsigned subtraction overflow.
1407    #[fuchsia::test]
1408    fn decreasing_occurrence_is_correct() {
1409        let big_number = Property::Uint("foo".to_string(), 5);
1410        let small_number = Property::Uint("foo".to_string(), 2);
1411        let key = MetricCacheKey {
1412            handle_name: InspectHandleName::name("some_file"),
1413            selector: "sel".to_string(),
1414        };
1415
1416        assert_eq!(
1417            process_sample_for_data_type(
1418                &big_number,
1419                Some(&small_number),
1420                &key,
1421                &MetricType::Occurrence
1422            ),
1423            Some(MetricEventPayload::Count(3))
1424        );
1425        assert_eq!(
1426            process_sample_for_data_type(
1427                &small_number,
1428                Some(&big_number),
1429                &key,
1430                &MetricType::Occurrence
1431            ),
1432            None
1433        );
1434    }
1435
1436    /// Test removal of selectors marked with upload_once.
1437    #[fuchsia::test]
1438    fn test_upload_once() {
1439        let hierarchy = hierarchy! {
1440            root: {
1441                value_one: 0,
1442                value_two: 1,
1443            }
1444        };
1445
1446        let mut sampler = ProjectSampler {
1447            archive_reader: ArchiveReader::inspect(),
1448            metrics: vec![],
1449            metric_cache: RefCell::new(HashMap::new()),
1450            metric_loggers: HashMap::new(),
1451            project_id: ProjectId(1),
1452            poll_rate_sec: 3600,
1453            project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
1454            all_done: true,
1455        };
1456        sampler.push_metric(MetricConfig {
1457            project_id: None,
1458            selectors: vec![selectors::parse_verbose("my/component:root:value_one").unwrap()],
1459            metric_id: MetricId(1),
1460            metric_type: MetricType::Integer,
1461            event_codes: Vec::new(),
1462            upload_once: true,
1463        });
1464        sampler.push_metric(MetricConfig {
1465            project_id: None,
1466            selectors: vec![selectors::parse_verbose("my/component:root:value_two").unwrap()],
1467            metric_id: MetricId(2),
1468            metric_type: MetricType::Integer,
1469            event_codes: Vec::new(),
1470            upload_once: true,
1471        });
1472        sampler.rebuild_selector_data_structures();
1473
1474        // Both selectors should be found and removed from the map.
1475        match sampler.process_component_data(
1476            &hierarchy,
1477            &InspectHandleName::name(DEFAULT_TREE_NAME),
1478            &"my/component".try_into().unwrap(),
1479        ) {
1480            Ok((SnapshotOutcome::SelectorsChanged, _events)) => (),
1481            _ => panic!("Expecting SelectorsChanged from process_component_data."),
1482        }
1483
1484        let moniker: ExtendedMoniker = "my_component".try_into().unwrap();
1485        assert!(sampler
1486            .filter_metrics_by_moniker_and_tree_name(&moniker, DEFAULT_TREE_NAME)
1487            .collect::<Vec<_>>()
1488            .is_empty());
1489    }
1490
1491    struct EventCountTesterParams {
1492        new_val: Property,
1493        old_val: Option<Property>,
1494        process_ok: bool,
1495        event_made: bool,
1496        diff: i64,
1497    }
1498
1499    fn process_occurence_tester(params: EventCountTesterParams) {
1500        let data_source = MetricCacheKey {
1501            handle_name: InspectHandleName::name("foo.file"),
1502            selector: "test:root:count".to_string(),
1503        };
1504        let event_res = process_occurence(&params.new_val, params.old_val.as_ref(), &data_source);
1505
1506        if !params.process_ok {
1507            assert!(event_res.is_err());
1508            return;
1509        }
1510
1511        assert!(event_res.is_ok());
1512
1513        let event_opt = event_res.unwrap();
1514
1515        if !params.event_made {
1516            assert!(event_opt.is_none());
1517            return;
1518        }
1519
1520        assert!(event_opt.is_some());
1521        let event = event_opt.unwrap();
1522        match event {
1523            MetricEventPayload::Count(count) => {
1524                assert_eq!(count, params.diff as u64);
1525            }
1526            _ => panic!("Expecting event counts."),
1527        }
1528    }
1529
1530    #[fuchsia::test]
1531    fn test_normal_process_occurence() {
1532        process_occurence_tester(EventCountTesterParams {
1533            new_val: Property::Int("count".to_string(), 1),
1534            old_val: None,
1535            process_ok: true,
1536            event_made: true,
1537            diff: 1,
1538        });
1539
1540        process_occurence_tester(EventCountTesterParams {
1541            new_val: Property::Int("count".to_string(), 1),
1542            old_val: Some(Property::Int("count".to_string(), 1)),
1543            process_ok: true,
1544            event_made: false,
1545            diff: -1,
1546        });
1547
1548        process_occurence_tester(EventCountTesterParams {
1549            new_val: Property::Int("count".to_string(), 3),
1550            old_val: Some(Property::Int("count".to_string(), 1)),
1551            process_ok: true,
1552            event_made: true,
1553            diff: 2,
1554        });
1555    }
1556
1557    #[fuchsia::test]
1558    fn test_data_type_changing_process_occurence() {
1559        process_occurence_tester(EventCountTesterParams {
1560            new_val: Property::Int("count".to_string(), 1),
1561            old_val: None,
1562            process_ok: true,
1563            event_made: true,
1564            diff: 1,
1565        });
1566
1567        process_occurence_tester(EventCountTesterParams {
1568            new_val: Property::Uint("count".to_string(), 1),
1569            old_val: None,
1570            process_ok: true,
1571            event_made: true,
1572            diff: 1,
1573        });
1574
1575        process_occurence_tester(EventCountTesterParams {
1576            new_val: Property::Uint("count".to_string(), 3),
1577            old_val: Some(Property::Int("count".to_string(), 1)),
1578            process_ok: true,
1579            event_made: true,
1580            diff: 3,
1581        });
1582
1583        process_occurence_tester(EventCountTesterParams {
1584            new_val: Property::String("count".to_string(), "big_oof".to_string()),
1585            old_val: Some(Property::Int("count".to_string(), 1)),
1586            process_ok: false,
1587            event_made: false,
1588            diff: -1,
1589        });
1590    }
1591
1592    #[fuchsia::test]
1593    fn test_event_count_negatives_and_overflows() {
1594        process_occurence_tester(EventCountTesterParams {
1595            new_val: Property::Int("count".to_string(), -11),
1596            old_val: None,
1597            process_ok: false,
1598            event_made: false,
1599            diff: -1,
1600        });
1601
1602        process_occurence_tester(EventCountTesterParams {
1603            new_val: Property::Int("count".to_string(), 9),
1604            old_val: Some(Property::Int("count".to_string(), 10)),
1605            process_ok: false,
1606            event_made: false,
1607            diff: -1,
1608        });
1609
1610        process_occurence_tester(EventCountTesterParams {
1611            new_val: Property::Uint("count".to_string(), u64::MAX),
1612            old_val: None,
1613            process_ok: false,
1614            event_made: false,
1615            diff: -1,
1616        });
1617
1618        let i64_max_in_u64: u64 = i64::MAX.try_into().unwrap();
1619
1620        process_occurence_tester(EventCountTesterParams {
1621            new_val: Property::Uint("count".to_string(), i64_max_in_u64 + 1),
1622            old_val: Some(Property::Uint("count".to_string(), 1)),
1623            process_ok: true,
1624            event_made: true,
1625            diff: i64::MAX,
1626        });
1627
1628        process_occurence_tester(EventCountTesterParams {
1629            new_val: Property::Uint("count".to_string(), i64_max_in_u64 + 2),
1630            old_val: Some(Property::Uint("count".to_string(), 1)),
1631            process_ok: false,
1632            event_made: false,
1633            diff: -1,
1634        });
1635    }
1636
1637    struct IntTesterParams {
1638        new_val: Property,
1639        process_ok: bool,
1640        sample: i64,
1641    }
1642
1643    fn process_int_tester(params: IntTesterParams) {
1644        let data_source = MetricCacheKey {
1645            handle_name: InspectHandleName::name("foo.file"),
1646            selector: "test:root:count".to_string(),
1647        };
1648        let event_res = process_int(&params.new_val, &data_source);
1649
1650        if !params.process_ok {
1651            assert!(event_res.is_err());
1652            return;
1653        }
1654
1655        assert!(event_res.is_ok());
1656
1657        let event = event_res.expect("event should be Ok").expect("event should be Some");
1658        match event {
1659            MetricEventPayload::IntegerValue(val) => {
1660                assert_eq!(val, params.sample);
1661            }
1662            _ => panic!("Expecting event counts."),
1663        }
1664    }
1665
1666    #[fuchsia::test]
1667    fn test_normal_process_int() {
1668        process_int_tester(IntTesterParams {
1669            new_val: Property::Int("count".to_string(), 13),
1670            process_ok: true,
1671            sample: 13,
1672        });
1673
1674        process_int_tester(IntTesterParams {
1675            new_val: Property::Int("count".to_string(), -13),
1676            process_ok: true,
1677            sample: -13,
1678        });
1679
1680        process_int_tester(IntTesterParams {
1681            new_val: Property::Int("count".to_string(), 0),
1682            process_ok: true,
1683            sample: 0,
1684        });
1685
1686        process_int_tester(IntTesterParams {
1687            new_val: Property::Uint("count".to_string(), 13),
1688            process_ok: true,
1689            sample: 13,
1690        });
1691
1692        process_int_tester(IntTesterParams {
1693            new_val: Property::String("count".to_string(), "big_oof".to_string()),
1694            process_ok: false,
1695            sample: -1,
1696        });
1697    }
1698
1699    #[fuchsia::test]
1700    fn test_int_edge_cases() {
1701        process_int_tester(IntTesterParams {
1702            new_val: Property::Int("count".to_string(), i64::MAX),
1703            process_ok: true,
1704            sample: i64::MAX,
1705        });
1706
1707        process_int_tester(IntTesterParams {
1708            new_val: Property::Int("count".to_string(), i64::MIN),
1709            process_ok: true,
1710            sample: i64::MIN,
1711        });
1712
1713        let i64_max_in_u64: u64 = i64::MAX.try_into().unwrap();
1714
1715        process_int_tester(IntTesterParams {
1716            new_val: Property::Uint("count".to_string(), i64_max_in_u64),
1717            process_ok: true,
1718            sample: i64::MAX,
1719        });
1720
1721        process_int_tester(IntTesterParams {
1722            new_val: Property::Uint("count".to_string(), i64_max_in_u64 + 1),
1723            process_ok: false,
1724            sample: -1,
1725        });
1726    }
1727
1728    struct StringTesterParams {
1729        sample: Property,
1730        process_ok: bool,
1731        previous_sample: Option<Property>,
1732    }
1733
1734    fn process_string_tester(params: StringTesterParams) {
1735        let metric_cache_key = MetricCacheKey {
1736            handle_name: InspectHandleName::name("foo.file"),
1737            selector: "test:root:string_val".to_string(),
1738        };
1739
1740        let event = process_sample_for_data_type(
1741            &params.sample,
1742            params.previous_sample.as_ref(),
1743            &metric_cache_key,
1744            &MetricType::String,
1745        );
1746
1747        if !params.process_ok {
1748            assert!(event.is_none());
1749            return;
1750        }
1751
1752        match event.unwrap() {
1753            MetricEventPayload::StringValue(val) => {
1754                assert_eq!(val.as_str(), params.sample.string().unwrap());
1755            }
1756            _ => panic!("Expecting event with StringValue."),
1757        }
1758    }
1759
1760    #[fuchsia::test]
1761    fn test_process_string() {
1762        process_string_tester(StringTesterParams {
1763            sample: Property::String("string_val".to_string(), "Hello, world!".to_string()),
1764            process_ok: true,
1765            previous_sample: None,
1766        });
1767
1768        // Ensure any erroneously cached values are ignored (a warning is logged in this case).
1769
1770        process_string_tester(StringTesterParams {
1771            sample: Property::String("string_val".to_string(), "Hello, world!".to_string()),
1772            process_ok: true,
1773            previous_sample: Some(Property::String("string_val".to_string(), "Uh oh!".to_string())),
1774        });
1775
1776        // Ensure unsupported property types are not erroneously processed.
1777
1778        process_string_tester(StringTesterParams {
1779            sample: Property::Int("string_val".to_string(), 123),
1780            process_ok: false,
1781            previous_sample: None,
1782        });
1783
1784        process_string_tester(StringTesterParams {
1785            sample: Property::Uint("string_val".to_string(), 123),
1786            process_ok: false,
1787            previous_sample: None,
1788        });
1789    }
1790
1791    fn convert_vector_to_int_histogram(hist: Vec<i64>) -> Property {
1792        let size = hist.len();
1793        Property::IntArray(
1794            "Bloop".to_string(),
1795            ArrayContent::LinearHistogram(LinearHistogram {
1796                floor: 1,
1797                step: 1,
1798                counts: hist,
1799                size,
1800                indexes: None,
1801            }),
1802        )
1803    }
1804
1805    fn convert_vector_to_uint_histogram(hist: Vec<u64>) -> Property<String> {
1806        let size = hist.len();
1807        Property::UintArray(
1808            "Bloop".to_string(),
1809            ArrayContent::LinearHistogram(LinearHistogram {
1810                floor: 1,
1811                step: 1,
1812                counts: hist,
1813                size,
1814                indexes: None,
1815            }),
1816        )
1817    }
1818
1819    // Produce condensed histograms. Size is arbitrary 100 - indexes must be less than that.
1820    fn convert_vectors_to_int_histogram(counts: Vec<i64>, indexes: Vec<usize>) -> Property<String> {
1821        let size = 100;
1822        Property::IntArray(
1823            "Bloop".to_string(),
1824            ArrayContent::LinearHistogram(LinearHistogram {
1825                floor: 1,
1826                step: 1,
1827                counts,
1828                size,
1829                indexes: Some(indexes),
1830            }),
1831        )
1832    }
1833
1834    fn convert_vectors_to_uint_histogram(
1835        counts: Vec<u64>,
1836        indexes: Vec<usize>,
1837    ) -> Property<String> {
1838        let size = 100;
1839        Property::UintArray(
1840            "Bloop".to_string(),
1841            ArrayContent::LinearHistogram(LinearHistogram {
1842                floor: 1,
1843                step: 1,
1844                counts,
1845                size,
1846                indexes: Some(indexes),
1847            }),
1848        )
1849    }
1850
1851    struct IntHistogramTesterParams {
1852        new_val: Property,
1853        old_val: Option<Property>,
1854        process_ok: bool,
1855        event_made: bool,
1856        diff: Vec<(u32, u64)>,
1857    }
1858    fn process_int_histogram_tester(params: IntHistogramTesterParams) {
1859        let data_source = MetricCacheKey {
1860            handle_name: InspectHandleName::name("foo.file"),
1861            selector: "test:root:count".to_string(),
1862        };
1863        let event_res =
1864            process_int_histogram(&params.new_val, params.old_val.as_ref(), &data_source);
1865
1866        if !params.process_ok {
1867            assert!(event_res.is_err());
1868            return;
1869        }
1870
1871        assert!(event_res.is_ok());
1872
1873        let event_opt = event_res.unwrap();
1874        if !params.event_made {
1875            assert!(event_opt.is_none());
1876            return;
1877        }
1878
1879        assert!(event_opt.is_some());
1880
1881        let event = event_opt.unwrap();
1882        match event.clone() {
1883            MetricEventPayload::Histogram(histogram_buckets) => {
1884                assert_eq!(histogram_buckets.len(), params.diff.len());
1885
1886                let expected_histogram_buckets = params
1887                    .diff
1888                    .iter()
1889                    .map(|(index, count)| HistogramBucket { index: *index, count: *count })
1890                    .collect::<Vec<HistogramBucket>>();
1891
1892                assert_eq!(histogram_buckets, expected_histogram_buckets);
1893            }
1894            _ => panic!("Expecting int histogram."),
1895        }
1896    }
1897
1898    /// Test that simple in-bounds first-samples of both types of Inspect histograms
1899    /// produce correct event types.
1900    #[fuchsia::test]
1901    fn test_normal_process_int_histogram() {
1902        let new_i64_sample = convert_vector_to_int_histogram(vec![1, 1, 1, 1]);
1903        let new_u64_sample = convert_vector_to_uint_histogram(vec![1, 1, 1, 1]);
1904
1905        process_int_histogram_tester(IntHistogramTesterParams {
1906            new_val: new_i64_sample,
1907            old_val: None,
1908            process_ok: true,
1909            event_made: true,
1910            diff: vec![(0, 1), (1, 1), (2, 1), (3, 1)],
1911        });
1912
1913        process_int_histogram_tester(IntHistogramTesterParams {
1914            new_val: new_u64_sample,
1915            old_val: None,
1916            process_ok: true,
1917            event_made: true,
1918            diff: vec![(0, 1), (1, 1), (2, 1), (3, 1)],
1919        });
1920
1921        // Test an Inspect uint histogram at the boundaries of the type produce valid
1922        // cobalt events.
1923        let new_u64_sample = convert_vector_to_uint_histogram(vec![u64::MAX, u64::MAX, u64::MAX]);
1924        process_int_histogram_tester(IntHistogramTesterParams {
1925            new_val: new_u64_sample,
1926            old_val: None,
1927            process_ok: true,
1928            event_made: true,
1929            diff: vec![(0, u64::MAX), (1, u64::MAX), (2, u64::MAX)],
1930        });
1931
1932        // Test that an empty Inspect histogram produces no event.
1933        let new_u64_sample = convert_vector_to_uint_histogram(Vec::new());
1934        process_int_histogram_tester(IntHistogramTesterParams {
1935            new_val: new_u64_sample,
1936            old_val: None,
1937            process_ok: true,
1938            event_made: false,
1939            diff: Vec::new(),
1940        });
1941
1942        let new_u64_sample = convert_vector_to_uint_histogram(vec![0, 0, 0, 0]);
1943        process_int_histogram_tester(IntHistogramTesterParams {
1944            new_val: new_u64_sample,
1945            old_val: None,
1946            process_ok: true,
1947            event_made: false,
1948            diff: Vec::new(),
1949        });
1950
1951        // Test that monotonically increasing histograms are good!.
1952        let new_u64_sample = convert_vector_to_uint_histogram(vec![2, 1, 2, 1]);
1953        let old_u64_sample = Some(convert_vector_to_uint_histogram(vec![1, 1, 0, 1]));
1954        process_int_histogram_tester(IntHistogramTesterParams {
1955            new_val: new_u64_sample,
1956            old_val: old_u64_sample,
1957            process_ok: true,
1958            event_made: true,
1959            diff: vec![(0, 1), (2, 2)],
1960        });
1961
1962        let new_i64_sample = convert_vector_to_int_histogram(vec![5, 2, 1, 3]);
1963        let old_i64_sample = Some(convert_vector_to_int_histogram(vec![1, 1, 1, 1]));
1964        process_int_histogram_tester(IntHistogramTesterParams {
1965            new_val: new_i64_sample,
1966            old_val: old_i64_sample,
1967            process_ok: true,
1968            event_made: true,
1969            diff: vec![(0, 4), (1, 1), (3, 2)],
1970        });
1971
1972        // Test that changing the histogram type resets the cache.
1973        let new_u64_sample = convert_vector_to_uint_histogram(vec![2, 1, 1, 1]);
1974        let old_i64_sample = Some(convert_vector_to_int_histogram(vec![1, 1, 1, 1]));
1975        process_int_histogram_tester(IntHistogramTesterParams {
1976            new_val: new_u64_sample,
1977            old_val: old_i64_sample,
1978            process_ok: true,
1979            event_made: true,
1980            diff: vec![(0, 2), (1, 1), (2, 1), (3, 1)],
1981        });
1982    }
1983
1984    // Test that we can handle condensed int and uint histograms, even with indexes out of order
1985    #[fuchsia::test]
1986    fn test_normal_process_condensed_histograms() {
1987        let new_u64_sample = convert_vectors_to_int_histogram(vec![2, 6], vec![3, 5]);
1988        let old_u64_sample = Some(convert_vectors_to_int_histogram(vec![1], vec![5]));
1989        process_int_histogram_tester(IntHistogramTesterParams {
1990            new_val: new_u64_sample,
1991            old_val: old_u64_sample,
1992            process_ok: true,
1993            event_made: true,
1994            diff: vec![(3, 2), (5, 5)],
1995        });
1996        let new_i64_sample = convert_vectors_to_uint_histogram(vec![2, 4], vec![5, 3]);
1997        let old_i64_sample = None;
1998        process_int_histogram_tester(IntHistogramTesterParams {
1999            new_val: new_i64_sample,
2000            old_val: old_i64_sample,
2001            process_ok: true,
2002            event_made: true,
2003            diff: vec![(3, 4), (5, 2)],
2004        });
2005    }
2006
2007    #[fuchsia::test]
2008    fn test_errorful_process_int_histogram() {
2009        // Test that changing the histogram length is an error.
2010        let new_u64_sample = convert_vector_to_uint_histogram(vec![1, 1, 1, 1]);
2011        let old_u64_sample = Some(convert_vector_to_uint_histogram(vec![1, 1, 1, 1, 1]));
2012        process_int_histogram_tester(IntHistogramTesterParams {
2013            new_val: new_u64_sample,
2014            old_val: old_u64_sample,
2015            process_ok: false,
2016            event_made: false,
2017            diff: Vec::new(),
2018        });
2019
2020        // Test that new samples cant have negative values.
2021        let new_i64_sample = convert_vector_to_int_histogram(vec![1, 1, -1, 1]);
2022        process_int_histogram_tester(IntHistogramTesterParams {
2023            new_val: new_i64_sample,
2024            old_val: None,
2025            process_ok: false,
2026            event_made: false,
2027            diff: Vec::new(),
2028        });
2029
2030        // Test that histograms must be monotonically increasing.
2031        let new_i64_sample = convert_vector_to_int_histogram(vec![5, 2, 1, 3]);
2032        let old_i64_sample = Some(convert_vector_to_int_histogram(vec![6, 1, 1, 1]));
2033        process_int_histogram_tester(IntHistogramTesterParams {
2034            new_val: new_i64_sample,
2035            old_val: old_i64_sample,
2036            process_ok: false,
2037            event_made: false,
2038            diff: Vec::new(),
2039        });
2040    }
2041
2042    /// Ensure that data distinguished only by metadata handle name - with the same moniker and
2043    /// selector path - is kept properly separate in the previous-value cache. The same
2044    /// MetricConfig should match each data source, but the occurrence counts
2045    /// should reflect that the distinct values are individually tracked.
2046    #[fuchsia::test]
2047    async fn test_inspect_handle_name_distinguishes_data() {
2048        let mut sampler = ProjectSampler {
2049            archive_reader: ArchiveReader::inspect(),
2050            metrics: vec![],
2051            metric_cache: RefCell::new(HashMap::new()),
2052            metric_loggers: HashMap::new(),
2053            project_id: ProjectId(1),
2054            poll_rate_sec: 3600,
2055            project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
2056            all_done: true,
2057        };
2058        let selector: String = "my/component:[...]root/branch:leaf".to_string();
2059        let metric_id = MetricId(1);
2060        let event_codes = vec![];
2061        sampler.push_metric(MetricConfig {
2062            project_id: None,
2063            selectors: vec![selectors::parse_verbose(&selector).unwrap()],
2064            metric_id,
2065            metric_type: MetricType::Occurrence,
2066            event_codes,
2067            upload_once: false,
2068        });
2069        sampler.rebuild_selector_data_structures();
2070
2071        let data1_value4 = vec![InspectDataBuilder::new(
2072            "my/component".try_into().unwrap(),
2073            "component-url",
2074            Timestamp::from_nanos(0),
2075        )
2076        .with_hierarchy(hierarchy! { root: {branch: {leaf: 4i32}}})
2077        .with_name(InspectHandleName::name("name1"))
2078        .build()];
2079        let data2_value3 = vec![InspectDataBuilder::new(
2080            "my/component".try_into().unwrap(),
2081            "component-url",
2082            Timestamp::from_nanos(0),
2083        )
2084        .with_hierarchy(hierarchy! { root: {branch: {leaf: 3i32}}})
2085        .with_name(InspectHandleName::name("name2"))
2086        .build()];
2087        let data1_value6 = vec![InspectDataBuilder::new(
2088            "my/component".try_into().unwrap(),
2089            "component-url",
2090            Timestamp::from_nanos(0),
2091        )
2092        .with_hierarchy(hierarchy! { root: {branch: {leaf: 6i32}}})
2093        .with_name(InspectHandleName::name("name1"))
2094        .build()];
2095        let data2_value8 = vec![InspectDataBuilder::new(
2096            "my/component".try_into().unwrap(),
2097            "component-url",
2098            Timestamp::from_nanos(0),
2099        )
2100        .with_hierarchy(hierarchy! { root: {branch: {leaf: 8i32}}})
2101        .with_name(InspectHandleName::name("name2"))
2102        .build()];
2103
2104        fn expect_one_metric_event_value(
2105            events: Result<Vec<EventToLog>, Error>,
2106            value: u64,
2107            context: &'static str,
2108        ) {
2109            let events = events.expect(context);
2110            assert_eq!(events.len(), 1, "Events len not 1: {}: {}", context, events.len());
2111            let event = &events[0];
2112            let (project_id, MetricEvent { payload, .. }) = event;
2113            assert_eq!(*project_id, ProjectId(1));
2114            if let fidl_fuchsia_metrics::MetricEventPayload::Count(payload) = payload {
2115                assert_eq!(
2116                    payload, &value,
2117                    "Wrong payload, expected {} got {} at {}",
2118                    value, payload, context
2119                );
2120            } else {
2121                panic!("Expected MetricEventPayload::Count at {}, got {:?}", context, payload);
2122            }
2123        }
2124
2125        expect_one_metric_event_value(sampler.process_snapshot(data1_value4).await, 4, "first");
2126        expect_one_metric_event_value(sampler.process_snapshot(data2_value3).await, 3, "second");
2127        expect_one_metric_event_value(sampler.process_snapshot(data1_value6).await, 2, "third");
2128        expect_one_metric_event_value(sampler.process_snapshot(data2_value8).await, 5, "fourth");
2129    }
2130
2131    // TODO(https://fxbug.dev/42071858): we should remove this once we support batching.
2132    #[fuchsia::test]
2133    async fn project_id_can_be_overwritten_by_the_metric_project_id() {
2134        let mut sampler = ProjectSampler {
2135            archive_reader: ArchiveReader::inspect(),
2136            metrics: vec![],
2137            metric_cache: RefCell::new(HashMap::new()),
2138            metric_loggers: HashMap::new(),
2139            project_id: ProjectId(1),
2140            poll_rate_sec: 3600,
2141            project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
2142            all_done: true,
2143        };
2144        let selector: String = "my/component:[name=name1]root/branch:leaf".to_string();
2145        let metric_id = MetricId(1);
2146        let event_codes = vec![];
2147        sampler.push_metric(MetricConfig {
2148            project_id: Some(ProjectId(2)),
2149            selectors: vec![selectors::parse_verbose(&selector).unwrap()],
2150            metric_id,
2151            metric_type: MetricType::Occurrence,
2152            event_codes,
2153            upload_once: false,
2154        });
2155        sampler.rebuild_selector_data_structures();
2156
2157        let value = vec![InspectDataBuilder::new(
2158            "my/component".try_into().unwrap(),
2159            "component-url",
2160            Timestamp::from_nanos(0),
2161        )
2162        .with_hierarchy(hierarchy! { root: {branch: {leaf: 4i32}}})
2163        .with_name(InspectHandleName::name("name1"))
2164        .build()];
2165
2166        let events = sampler.process_snapshot(value).await.expect("processed snapshot");
2167        assert_eq!(events.len(), 1);
2168        let event = &events[0];
2169        let (project_id, MetricEvent { .. }) = event;
2170        assert_eq!(*project_id, ProjectId(2));
2171    }
2172}