sampler/
executor.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! # Data Structure and Algorithm Overview
6//!
7//! Cobalt is organized into Projects, each of which contains several Metrics.
8//!
9//! [`MetricConfig`] - defined in src/diagnostics/lib/sampler-config/src/lib.rs
10//! This is deserialized from Sampler config files or created by FIRE by interpolating
11//! component information into FIRE config files. It contains
12//!
13//!  - selectors: Vec<Selector>
14//!  - metric_id
15//!  - metric_type: MetricType
16//!  - event_codes: Vec<u32>
17//!  - upload_once boolean
18//!
19//! **NOTE:** Multiple selectors can be provided in a single metric. Only one selector is
20//! expected to fetch/match data. When one does, the other selectors will be disabled for
21//! efficiency.
22//!
23//! [`ProjectConfig`] - defined in src/diagnostics/lib/sampler-config/src/lib.rs
24//! This encodes the contents of a single config file:
25//!
26//!  - project_id
27//!  - customer_id (defaults to 1)
28//!  - poll_rate_sec
29//!  - metrics: Vec<MetricConfig>
30//!
31//! [`SamplerConfig`] - defined in src/diagnostics/sampler/src/config.rs
32//! The entire config for Sampler. Contains
33//!
34//!  - list of ProjectConfig
35//!  - minimum sample rate
36//!
37//! [`ProjectSampler`] - defined in src/diagnostics/sampler/src/executor.rs
38//! This contains
39//!
40//!  - several MetricConfig's
41//!  - an ArchiveReader configured with all active selectors
42//!  - a cache of previous Diagnostic values, indexed by selector strings
43//!  - FIDL proxies for Cobalt and MetricEvent loggers
44//!     - these loggers are configured with project_id and customer_id
45//!  - Poll rate
46//!  - Inspect stats (struct ProjectSamplerStats)
47//!
48//! [`ProjectSampler`] is stored in:
49//!
50//!  - [`TaskCancellation`]:     execution_context: fasync::Task<Vec<ProjectSampler>>,
51//!  - [`RebootSnapshotProcessor`]:    project_samplers: Vec<ProjectSampler>,
52//!  - [`SamplerExecutor`]:     project_samplers: Vec<ProjectSampler>,
53//!  - [`ProjectSamplerTaskExit::RebootTriggered(ProjectSampler)`],
54//!
55//! [`SamplerExecutor`] (defined in executor.rs) is built from a single [`SamplerConfig`].
56//! [`SamplerExecutor`] contains
57//!
58//!  - a list of ProjectSamplers
59//!  - an Inspect stats structure
60//!
61//! [`SamplerExecutor`] only has one member function execute() which calls spawn() on each
62//! project sampler, passing it a receiver-oneshot to cancel it. The collection of
63//! oneshot-senders and spawned-tasks builds the returned TaskCancellation.
64//!
65//! [`TaskCancellation`] is then passed to a reboot_watcher (in src/diagnostics/sampler/src/lib.rs)
66//! which does nothing until the reboot service either closes (calling
67//! run_without_cancellation()) or sends a message (calling perform_reboot_cleanup()).
68//!
69//! [`ProjectSampler`] calls fasync::Task::spawn to create a task that starts a timer, then loops
70//! listening to that timer and to the reboot_oneshot. When the timer triggers, it calls
71//! self.process_next_snapshot(). If the reboot oneshot arrives, the task returns
72//! [`ProjectSamplerTaskExit::RebootTriggered(self)`].
73//!
74//!
75//! **NOTE:** Selectors are expected to match one value each. Wildcards are only allowed in the
76//! moniker segment of a selector when it is a driver in the driver collections.
77//!
78//! Selectors will become unused, either because of upload_once, or because data was found by a
79//! different selector. Rather than implement deletion in the vecs,
80//! which would add lots of bug surface and maintenance debt, each selector is an Option<> so that
81//! selectors can be deleted/disabled without changing the rest of the data structure.
82//! Once all Diagnostic data is processed, the structure is rebuilt if any selectors
83//! have been disabled; rebuilding less often would be
84//! premature optimization at this point.
85//!
86//! perform_reboot_cleanup() builds an ArchiveReader for [`RebootSnapshotProcessor`].
87//! When not rebooting, each project fetches its own data from Archivist.
88
89use crate::config::SamplerConfig;
90use crate::diagnostics::*;
91use anyhow::{format_err, Context, Error};
92use diagnostics_data::{Data, InspectHandleName};
93use diagnostics_hierarchy::{
94    ArrayContent, DiagnosticsHierarchy, ExponentialHistogram, LinearHistogram, Property,
95    SelectResult,
96};
97use diagnostics_reader::{ArchiveReader, Inspect, InspectArchiveReader, RetryConfig};
98use fidl_fuchsia_metrics::{
99    HistogramBucket, MetricEvent, MetricEventLoggerFactoryMarker, MetricEventLoggerFactoryProxy,
100    MetricEventLoggerProxy, MetricEventPayload, ProjectSpec,
101};
102use fuchsia_async as fasync;
103use fuchsia_component::client::connect_to_protocol;
104use fuchsia_inspect::{self as inspect, NumericProperty};
105use fuchsia_inspect_derive::WithInspect;
106use futures::channel::oneshot;
107use futures::future::join_all;
108use futures::stream::FuturesUnordered;
109use futures::{select, StreamExt};
110use log::{info, warn};
111use moniker::ExtendedMoniker;
112use sampler_config::runtime::{MetricConfig, ProjectConfig};
113use sampler_config::{MetricType, ProjectId};
114use selectors::SelectorExt;
115use std::cell::{Ref, RefCell, RefMut};
116use std::collections::hash_map::Entry;
117use std::collections::HashMap;
118use std::sync::Arc;
119
120/// An event to be logged to the cobalt logger. Events are generated first,
121/// then logged. (This permits unit-testing the code that generates events from
122/// Diagnostic data.)
123type EventToLog = (ProjectId, MetricEvent);
124
125pub struct TaskCancellation {
126    senders: Vec<oneshot::Sender<()>>,
127    _sampler_executor_stats: Arc<SamplerExecutorStats>,
128    execution_context: fasync::Task<Vec<ProjectSampler>>,
129}
130
131impl TaskCancellation {
132    /// It's possible the reboot register goes down. If that
133    /// happens, we want to continue driving execution with
134    /// no consideration for cancellation. This does that.
135    pub async fn run_without_cancellation(self) {
136        self.execution_context.await;
137    }
138
139    pub async fn perform_reboot_cleanup(self) {
140        // Let every sampler know that a reboot is pending and they should exit.
141        self.senders.into_iter().for_each(|sender| {
142            sender
143                .send(())
144                .unwrap_or_else(|err| warn!("Failed to send reboot over oneshot: {:?}", err))
145        });
146
147        // Get the most recently updated project samplers from all the futures. They hold the
148        // cache with the most recent values for all the metrics we need to sample and diff!
149        let project_samplers: Vec<ProjectSampler> = self.execution_context.await;
150
151        let mut reader = ArchiveReader::inspect();
152
153        for project_sampler in &project_samplers {
154            for metric in &project_sampler.metrics {
155                for selector in metric.borrow().selectors.iter() {
156                    reader.add_selector(selector.clone());
157                }
158            }
159        }
160
161        reader.retry(RetryConfig::never());
162
163        let mut reboot_processor = RebootSnapshotProcessor { reader, project_samplers };
164        // Log errors encountered in final snapshot, but always swallow errors so we can gracefully
165        // notify RebootMethodsWatcherRegister that we yield our remaining time.
166        reboot_processor
167            .process_reboot_sample()
168            .await
169            .unwrap_or_else(|e| warn!("Reboot snapshot failed! {:?}", e));
170    }
171}
172
173struct RebootSnapshotProcessor {
174    /// Reader constructed from the union of selectors
175    /// for every [`ProjectSampler`] config.
176    reader: InspectArchiveReader,
177    /// Vector of mutable [`ProjectSampler`] objects that will
178    /// process their final samples.
179    project_samplers: Vec<ProjectSampler>,
180}
181
182impl RebootSnapshotProcessor {
183    pub async fn process_reboot_sample(&mut self) -> Result<(), Error> {
184        let snapshot_data = self.reader.snapshot().await?;
185        for data_packet in snapshot_data {
186            let moniker = data_packet.moniker;
187            match data_packet.payload {
188                None => {
189                    process_schema_errors(&data_packet.metadata.errors, &moniker);
190                }
191                Some(payload) => {
192                    self.process_single_payload(payload, &data_packet.metadata.name, &moniker).await
193                }
194            }
195        }
196        Ok(())
197    }
198
199    async fn process_single_payload(
200        &mut self,
201        hierarchy: DiagnosticsHierarchy<String>,
202        inspect_handle_name: &InspectHandleName,
203        moniker: &ExtendedMoniker,
204    ) {
205        let mut projects = self
206            .project_samplers
207            .iter_mut()
208            .filter(|p| {
209                p.filter_metrics_by_moniker_and_tree_name(moniker, inspect_handle_name.as_ref())
210                    .next()
211                    .is_some()
212            })
213            .peekable();
214        if projects.peek().is_none() {
215            warn!(
216                moniker:%,
217                tree_name = inspect_handle_name.as_ref();
218                "no metrics found for moniker and tree_name combination"
219            );
220            return;
221        }
222
223        for project_sampler in projects {
224            // If processing the final sample failed, just log the
225            // error and proceed, everything's getting shut down
226            // soon anyway.
227            let maybe_err = match project_sampler.process_component_data(
228                &hierarchy,
229                inspect_handle_name,
230                moniker,
231            ) {
232                Err(err) => Some(err),
233                Ok((_selector_changes, events_to_log)) => {
234                    project_sampler.log_events(events_to_log).await.err()
235                }
236            };
237            if let Some(err) = maybe_err {
238                warn!(err:?; "A project sampler failed to process a reboot sample");
239            }
240        }
241    }
242}
243
244/// Owner of the sampler execution context.
245pub struct SamplerExecutor {
246    project_samplers: Vec<ProjectSampler>,
247    sampler_executor_stats: Arc<SamplerExecutorStats>,
248}
249
250impl SamplerExecutor {
251    /// Instantiate connection to the cobalt logger and map ProjectConfigurations
252    /// to [`ProjectSampler`] plans.
253    pub async fn new(sampler_config: SamplerConfig) -> Result<Self, Error> {
254        let metric_logger_factory: Arc<MetricEventLoggerFactoryProxy> = Arc::new(
255            connect_to_protocol::<MetricEventLoggerFactoryMarker>()
256                .context("Failed to connect to the Metric LoggerFactory")?,
257        );
258
259        let minimum_sample_rate_sec = sampler_config.minimum_sample_rate_sec;
260
261        let sampler_executor_stats = Arc::new(
262            SamplerExecutorStats::new()
263                .with_inspect(inspect::component::inspector().root(), "sampler_executor_stats")
264                .unwrap_or_else(|err| {
265                    warn!(err:?; "Failed to attach inspector to SamplerExecutorStats struct");
266                    SamplerExecutorStats::default()
267                }),
268        );
269
270        sampler_executor_stats
271            .total_project_samplers_configured
272            .add(sampler_config.project_configs.len() as u64);
273
274        let mut project_to_stats_map: HashMap<ProjectId, Arc<ProjectSamplerStats>> = HashMap::new();
275        // TODO(https://fxbug.dev/42118220): Create only one ArchiveReader for each unique poll rate so we
276        // can avoid redundant snapshots.
277        let project_sampler_futures =
278            sampler_config.project_configs.iter().cloned().map(|project_config| {
279                let project_sampler_stats =
280                    project_to_stats_map.entry(project_config.project_id).or_insert_with(|| {
281                        Arc::new(
282                            ProjectSamplerStats::new()
283                                .with_inspect(
284                                    &sampler_executor_stats.inspect_node,
285                                    format!("project_{}", project_config.project_id,),
286                                )
287                                .unwrap_or_else(|err| {
288                                    warn!(
289                                        err:?;
290                                        "Failed to attach inspector to ProjectSamplerStats struct"
291                                    );
292                                    ProjectSamplerStats::default()
293                                }),
294                        )
295                    });
296                ProjectSampler::new(
297                    project_config,
298                    metric_logger_factory.clone(),
299                    minimum_sample_rate_sec,
300                    project_sampler_stats.clone(),
301                )
302            });
303
304        let mut project_samplers: Vec<ProjectSampler> = Vec::new();
305        for project_sampler in join_all(project_sampler_futures).await.into_iter() {
306            match project_sampler {
307                Ok(project_sampler) => project_samplers.push(project_sampler),
308                Err(e) => {
309                    warn!("ProjectSampler construction failed: {:?}", e);
310                }
311            }
312        }
313        Ok(SamplerExecutor { project_samplers, sampler_executor_stats })
314    }
315
316    /// Turn each [`ProjectSampler`] plan into an [`fasync::Task`] which executes its associated plan,
317    /// and process errors if any tasks exit unexpectedly.
318    pub fn execute(self) -> TaskCancellation {
319        // Take ownership of the inspect struct so we can give it to the execution context. We do this
320        // so that the execution context can return the struct when it's halted by reboot, which allows inspect
321        // properties to survive through the reboot flow.
322        let task_cancellation_owned_stats = self.sampler_executor_stats.clone();
323        let execution_context_owned_stats = self.sampler_executor_stats.clone();
324
325        let (senders, mut spawned_tasks): (Vec<oneshot::Sender<()>>, FuturesUnordered<_>) = self
326            .project_samplers
327            .into_iter()
328            .map(|project_sampler| {
329                let (sender, receiver) = oneshot::channel::<()>();
330                (sender, project_sampler.spawn(receiver))
331            })
332            .unzip();
333
334        let execution_context = fasync::Task::local(async move {
335            let mut healthily_exited_samplers = Vec::new();
336            while let Some(sampler_result) = spawned_tasks.next().await {
337                match sampler_result {
338                    Err(e) => {
339                        // TODO(https://fxbug.dev/42118220): Consider restarting the failed sampler depending on
340                        // failure mode.
341                        warn!("A spawned sampler has failed: {:?}", e);
342                        execution_context_owned_stats.errorfully_exited_samplers.add(1);
343                    }
344                    Ok(ProjectSamplerTaskExit::RebootTriggered(sampler)) => {
345                        healthily_exited_samplers.push(sampler);
346                        execution_context_owned_stats.reboot_exited_samplers.add(1);
347                    }
348                    Ok(ProjectSamplerTaskExit::WorkCompleted) => {
349                        info!("A sampler completed its workload, and exited.");
350                        execution_context_owned_stats.healthily_exited_samplers.add(1);
351                    }
352                }
353            }
354
355            healthily_exited_samplers
356        });
357
358        TaskCancellation {
359            execution_context,
360            senders,
361            _sampler_executor_stats: task_cancellation_owned_stats,
362        }
363    }
364}
365
366pub struct ProjectSampler {
367    archive_reader: InspectArchiveReader,
368    /// The metrics used by this Project.
369    metrics: Vec<RefCell<MetricConfig>>,
370    /// Cache from Inspect selector to last sampled property. This is the selector from
371    /// [`MetricConfig`]; it may contain wildcards.
372    metric_cache: RefCell<HashMap<MetricCacheKey, Property>>,
373    /// Cobalt logger proxy using this ProjectSampler's project id. It's an Option so it doesn't
374    /// have to be created for unit tests; it will always be Some() outside unit tests.
375    metric_loggers: HashMap<ProjectId, MetricEventLoggerProxy>,
376    /// The frequency with which we snapshot Inspect properties
377    /// for this project.
378    poll_rate_sec: i64,
379    /// Inspect stats on a node namespaced by this project's associated id.
380    /// It's an arc since a single project can have multiple samplers at
381    /// different frequencies, but we want a single project to have one node.
382    project_sampler_stats: Arc<ProjectSamplerStats>,
383    /// The id of the project.
384    /// Project ID that metrics are being sampled and forwarded on behalf of.
385    project_id: ProjectId,
386    /// Records whether there are any known selectors left.
387    /// Reset in rebuild_selector_data_structures().
388    all_done: bool,
389}
390
391#[derive(Debug, Eq, Hash, PartialEq)]
392struct MetricCacheKey {
393    handle_name: InspectHandleName,
394    selector: String,
395}
396
397#[allow(clippy::large_enum_variant)] // TODO(https://fxbug.dev/401087900)
398pub enum ProjectSamplerTaskExit {
399    /// The [`ProjectSampler`] processed a reboot signal on its oneshot, and yielded
400    /// to the final-snapshot.
401    RebootTriggered(ProjectSampler),
402    /// The [`ProjectSampler`] has no more work to complete; perhaps all metrics were "upload_once"?
403    WorkCompleted,
404}
405
406pub enum ProjectSamplerEvent {
407    TimerTriggered,
408    TimerDied,
409    RebootTriggered,
410    RebootChannelClosed(Error),
411}
412
413/// Indicates whether a sampler in the project has been removed (set to None), in which case the
414/// ArchiveAccessor should be reconfigured.
415/// The selector lists may be consolidated (and thus the maps would be rebuilt), but
416/// the program will run correctly whether they are or not.
417#[derive(PartialEq)]
418enum SnapshotOutcome {
419    SelectorsChanged,
420    SelectorsUnchanged,
421}
422
423impl ProjectSampler {
424    pub async fn new(
425        config: Arc<ProjectConfig>,
426        metric_logger_factory: Arc<MetricEventLoggerFactoryProxy>,
427        minimum_sample_rate_sec: i64,
428        project_sampler_stats: Arc<ProjectSamplerStats>,
429    ) -> Result<ProjectSampler, Error> {
430        let customer_id = config.customer_id;
431        let project_id = config.project_id;
432        let poll_rate_sec = config.poll_rate_sec;
433        if poll_rate_sec < minimum_sample_rate_sec {
434            return Err(format_err!(
435                concat!(
436                    "Project with id: {:?} uses a polling rate:",
437                    " {:?} below minimum configured poll rate: {:?}"
438                ),
439                project_id,
440                poll_rate_sec,
441                minimum_sample_rate_sec,
442            ));
443        }
444
445        project_sampler_stats.project_sampler_count.add(1);
446        project_sampler_stats.metrics_configured.add(config.metrics.len() as u64);
447
448        let mut metric_loggers = HashMap::new();
449        // TODO(https://fxbug.dev/42071858): we should remove this once we support batching. There should be
450        // only one metric logger per ProjectSampler.
451        if *project_id != 0 {
452            let (metric_logger_proxy, metrics_server_end) = fidl::endpoints::create_proxy();
453            let project_spec = ProjectSpec {
454                customer_id: Some(*customer_id),
455                project_id: Some(*project_id),
456                ..ProjectSpec::default()
457            };
458            metric_logger_factory
459                .create_metric_event_logger(&project_spec, metrics_server_end)
460                .await?
461                .map_err(|e| format_err!("error response for project {}: {:?}", project_id, e))?;
462            metric_loggers.insert(project_id, metric_logger_proxy);
463        }
464        for metric in &config.metrics {
465            if let Some(metric_project_id) = metric.project_id {
466                if let Entry::Vacant(entry) = metric_loggers.entry(metric_project_id) {
467                    let (metric_logger_proxy, metrics_server_end) = fidl::endpoints::create_proxy();
468                    let project_spec = ProjectSpec {
469                        customer_id: Some(*customer_id),
470                        project_id: Some(*metric_project_id),
471                        ..ProjectSpec::default()
472                    };
473                    metric_logger_factory
474                        .create_metric_event_logger(&project_spec, metrics_server_end)
475                        .await?
476                        .map_err(|e|
477                            format_err!(
478                                "error response for project {} while creating metric logger {}: {:?}",
479                                project_id,
480                                metric_project_id,
481                                e
482                            ))?;
483                    entry.insert(metric_logger_proxy);
484                }
485            }
486        }
487
488        let mut project_sampler = ProjectSampler {
489            project_id,
490            archive_reader: ArchiveReader::inspect(),
491            metrics: config.metrics.iter().map(|m| RefCell::new(m.clone())).collect(),
492            metric_cache: RefCell::new(HashMap::new()),
493            metric_loggers,
494            poll_rate_sec,
495            project_sampler_stats,
496            all_done: true,
497        };
498        // Fill in archive_reader
499        project_sampler.rebuild_selector_data_structures();
500        Ok(project_sampler)
501    }
502
503    pub fn spawn(
504        mut self,
505        mut reboot_oneshot: oneshot::Receiver<()>,
506    ) -> fasync::Task<Result<ProjectSamplerTaskExit, Error>> {
507        fasync::Task::local(async move {
508            let mut periodic_timer =
509                fasync::Interval::new(zx::MonotonicDuration::from_seconds(self.poll_rate_sec));
510            loop {
511                let done = select! {
512                    opt = periodic_timer.next() => {
513                        if opt.is_some() {
514                            ProjectSamplerEvent::TimerTriggered
515                        } else {
516                            ProjectSamplerEvent::TimerDied
517                        }
518                    },
519                    oneshot_res = reboot_oneshot => {
520                        match oneshot_res {
521                            Ok(()) => {
522                                ProjectSamplerEvent::RebootTriggered
523                            },
524                            Err(e) => {
525                                ProjectSamplerEvent::RebootChannelClosed(
526                                    format_err!("Oneshot closure error: {:?}", e))
527                            }
528                        }
529                    }
530                };
531
532                match done {
533                    ProjectSamplerEvent::TimerDied => {
534                        return Err(format_err!(
535                            "The ProjectSampler timer died, something went wrong.",
536                        ));
537                    }
538                    ProjectSamplerEvent::RebootChannelClosed(e) => {
539                        // TODO(https://fxbug.dev/42118220): Consider differentiating errors if
540                        // we ever want to recover a sampler after a oneshot channel death.
541                        return Err(format_err!(
542                            "The Reboot signaling oneshot died, something went wrong: {:?}",
543                            e
544                        ));
545                    }
546                    ProjectSamplerEvent::RebootTriggered => {
547                        // The reboot oneshot triggered, meaning it's time to perform
548                        // our final snapshot. Return self to reuse most recent cache.
549                        return Ok(ProjectSamplerTaskExit::RebootTriggered(self));
550                    }
551                    ProjectSamplerEvent::TimerTriggered => {
552                        self.process_next_snapshot().await?;
553                        // Check whether this sampler
554                        // still needs to run (perhaps all metrics were
555                        // "upload_once"?). If it doesn't, we want to be
556                        // sure that it is not included in the reboot-workload.
557                        if self.is_all_done() {
558                            return Ok(ProjectSamplerTaskExit::WorkCompleted);
559                        }
560                    }
561                }
562            }
563        })
564    }
565
566    async fn process_next_snapshot(&mut self) -> Result<(), Error> {
567        let snapshot_data = self.archive_reader.snapshot().await?;
568        let events_to_log = self.process_snapshot(snapshot_data).await?;
569        self.log_events(events_to_log).await?;
570        Ok(())
571    }
572
573    async fn process_snapshot(
574        &mut self,
575        snapshot: Vec<Data<Inspect>>,
576    ) -> Result<Vec<EventToLog>, Error> {
577        let mut selectors_changed = false;
578        let mut events_to_log = vec![];
579        for data_packet in snapshot.iter() {
580            match &data_packet.payload {
581                None => {
582                    process_schema_errors(&data_packet.metadata.errors, &data_packet.moniker);
583                }
584                Some(payload) => {
585                    let (selector_outcome, mut events) = self.process_component_data(
586                        payload,
587                        &data_packet.metadata.name,
588                        &data_packet.moniker,
589                    )?;
590                    if selector_outcome == SnapshotOutcome::SelectorsChanged {
591                        selectors_changed = true;
592                    }
593                    events_to_log.append(&mut events);
594                }
595            }
596        }
597        if selectors_changed {
598            self.rebuild_selector_data_structures();
599        }
600        Ok(events_to_log)
601    }
602
603    fn is_all_done(&self) -> bool {
604        self.all_done
605    }
606
607    fn rebuild_selector_data_structures(&mut self) {
608        self.archive_reader = ArchiveReader::inspect();
609        for metric in &self.metrics {
610            for selector in metric.borrow().selectors.iter().cloned() {
611                self.archive_reader.add_selector(selector);
612                self.all_done = false;
613            }
614        }
615        self.archive_reader.retry(RetryConfig::never());
616    }
617
618    fn filter_metrics_by_moniker_and_tree_name<'a>(
619        &'a self,
620        moniker: &'a ExtendedMoniker,
621        tree_name: &'a str,
622    ) -> impl Iterator<Item = &'a RefCell<MetricConfig>> {
623        self.metrics.iter().filter(|metric| {
624            moniker
625                .match_against_selectors_and_tree_name(tree_name, metric.borrow().selectors.iter())
626                .next()
627                .is_some()
628        })
629    }
630
631    fn process_component_data(
632        &mut self,
633        payload: &DiagnosticsHierarchy,
634        inspect_handle_name: &InspectHandleName,
635        moniker: &ExtendedMoniker,
636    ) -> Result<(SnapshotOutcome, Vec<EventToLog>), Error> {
637        let filtered_metrics =
638            self.filter_metrics_by_moniker_and_tree_name(moniker, inspect_handle_name.as_ref());
639        let mut snapshot_outcome = SnapshotOutcome::SelectorsUnchanged;
640        let mut events_to_log = vec![];
641        for metric in filtered_metrics {
642            let mut selector_to_keep = None;
643            let project_id = metric.borrow().project_id.unwrap_or(self.project_id);
644            for (selector_idx, selector) in metric.borrow().selectors.iter().enumerate() {
645                let found_properties =
646                    diagnostics_hierarchy::select_from_hierarchy(payload, selector)?;
647                match found_properties {
648                    // Maybe the data hasn't been published yet. Maybe another selector in this
649                    // metric is the correct one to find the data. Either way, not-found is fine.
650                    SelectResult::Properties(p) if p.is_empty() => {}
651                    SelectResult::Properties(p) if p.len() == 1 => {
652                        let metric_cache_key = MetricCacheKey {
653                            handle_name: inspect_handle_name.clone(),
654                            selector: selectors::selector_to_string(selector, Default::default())
655                                .unwrap(),
656                        };
657                        if let Some(event) = Self::prepare_sample(
658                            metric.borrow(),
659                            self.metric_cache.borrow_mut(),
660                            metric_cache_key,
661                            p[0],
662                        )? {
663                            events_to_log.push((project_id, event));
664                        }
665                        selector_to_keep = Some(selector_idx);
666                        break;
667                    }
668                    too_many => {
669                        warn!(
670                            too_many:?,
671                            selector:?;
672                            "Too many matches for selector"
673                        );
674                    }
675                }
676            }
677
678            if let Some(selector_idx) = selector_to_keep {
679                if Self::update_selectors_for_metric(metric.borrow_mut(), selector_idx) {
680                    snapshot_outcome = SnapshotOutcome::SelectorsChanged;
681                }
682            }
683        }
684        Ok((snapshot_outcome, events_to_log))
685    }
686
687    fn update_selectors_for_metric(
688        mut metric: RefMut<'_, MetricConfig>,
689        keep_selector_idx: usize,
690    ) -> bool {
691        if metric.upload_once {
692            metric.selectors = Vec::new();
693            return true;
694        }
695        let deleted = metric.selectors.len() > 1;
696        metric.selectors = vec![metric.selectors.remove(keep_selector_idx)];
697        deleted
698    }
699
700    fn prepare_sample<'a>(
701        metric: Ref<'a, MetricConfig>,
702        metric_cache: RefMut<'a, HashMap<MetricCacheKey, Property>>,
703        metric_cache_key: MetricCacheKey,
704        new_sample: &Property,
705    ) -> Result<Option<MetricEvent>, Error> {
706        let previous_sample_opt: Option<&Property> = metric_cache.get(&metric_cache_key);
707
708        if let Some(payload) = process_sample_for_data_type(
709            new_sample,
710            previous_sample_opt,
711            &metric_cache_key,
712            &metric.metric_type,
713        ) {
714            Self::maybe_update_cache(
715                metric_cache,
716                new_sample,
717                &metric.metric_type,
718                metric_cache_key,
719            );
720            Ok(Some(MetricEvent {
721                metric_id: *metric.metric_id,
722                event_codes: metric.event_codes.iter().map(|code| **code).collect(),
723                payload,
724            }))
725        } else {
726            Ok(None)
727        }
728    }
729
730    async fn log_events(&mut self, events: Vec<EventToLog>) -> Result<(), Error> {
731        for (project_id, event) in events.into_iter() {
732            self.metric_loggers
733                .get(&project_id)
734                .as_ref()
735                .unwrap()
736                .log_metric_events(&[event])
737                .await?
738                .map_err(|e| format_err!("error from cobalt: {:?}", e))?;
739            self.project_sampler_stats.cobalt_logs_sent.add(1);
740        }
741        Ok(())
742    }
743
744    fn maybe_update_cache(
745        mut cache: RefMut<'_, HashMap<MetricCacheKey, Property>>,
746        new_sample: &Property,
747        data_type: &MetricType,
748        metric_cache_key: MetricCacheKey,
749    ) {
750        match data_type {
751            MetricType::Occurrence | MetricType::IntHistogram => {
752                cache.insert(metric_cache_key, new_sample.clone());
753            }
754            MetricType::Integer | MetricType::String => (),
755        }
756    }
757}
758
759#[cfg(test)]
760impl ProjectSampler {
761    fn push_metric(&mut self, metric: MetricConfig) {
762        self.metrics.push(RefCell::new(metric));
763    }
764}
765
766fn process_sample_for_data_type(
767    new_sample: &Property,
768    previous_sample_opt: Option<&Property>,
769    data_source: &MetricCacheKey,
770    data_type: &MetricType,
771) -> Option<MetricEventPayload> {
772    let event_payload_res = match data_type {
773        MetricType::Occurrence => process_occurence(new_sample, previous_sample_opt, data_source),
774        MetricType::IntHistogram => {
775            process_int_histogram(new_sample, previous_sample_opt, data_source)
776        }
777        MetricType::Integer => {
778            // If we previously cached a metric with an int-type, log a warning and ignore it.
779            // This may be a case of using a single selector for two metrics, one event count
780            // and one int.
781            if previous_sample_opt.is_some() {
782                warn!("Sampler has erroneously cached an Int type metric: {:?}", data_source);
783            }
784            process_int(new_sample, data_source)
785        }
786        MetricType::String => {
787            if previous_sample_opt.is_some() {
788                warn!("Sampler has erroneously cached a String type metric: {:?}", data_source);
789            }
790            process_string(new_sample, data_source)
791        }
792    };
793
794    match event_payload_res {
795        Ok(payload_opt) => payload_opt,
796        Err(err) => {
797            warn!(data_source:?, err:?; "Failed to process Inspect property for cobalt",);
798            None
799        }
800    }
801}
802
803/// It's possible for Inspect numerical properties to experience overflows/conversion
804/// errors when being mapped to Cobalt types. Sanitize these numericals, and provide
805/// meaningful errors.
806fn sanitize_unsigned_numerical(diff: u64, data_source: &MetricCacheKey) -> Result<i64, Error> {
807    match diff.try_into() {
808        Ok(diff) => Ok(diff),
809        Err(e) => Err(format_err!(
810            "Selector used for EventCount type \
811            refered to an unsigned int property, \
812            but cobalt requires i64, and casting introduced overflow \
813            which produces a negative int: {:?}. This could be due to \
814            a single sample being larger than i64, or a diff between \
815            samples being larger than i64. Error: {:?}",
816            data_source,
817            e
818        )),
819    }
820}
821
822fn process_int_histogram(
823    new_sample: &Property,
824    prev_sample_opt: Option<&Property>,
825    data_source: &MetricCacheKey,
826) -> Result<Option<MetricEventPayload>, Error> {
827    let diff = match prev_sample_opt {
828        None => convert_inspect_histogram_to_cobalt_histogram(new_sample, data_source)?,
829        Some(prev_sample) => {
830            // If the data type changed then we just reset the cache.
831            if std::mem::discriminant(new_sample) == std::mem::discriminant(prev_sample) {
832                compute_histogram_diff(new_sample, prev_sample, data_source)?
833            } else {
834                convert_inspect_histogram_to_cobalt_histogram(new_sample, data_source)?
835            }
836        }
837    };
838
839    let non_empty_diff: Vec<HistogramBucket> = diff.into_iter().filter(|v| v.count != 0).collect();
840    if !non_empty_diff.is_empty() {
841        Ok(Some(MetricEventPayload::Histogram(non_empty_diff)))
842    } else {
843        Ok(None)
844    }
845}
846
847fn compute_histogram_diff(
848    new_sample: &Property,
849    old_sample: &Property,
850    data_source: &MetricCacheKey,
851) -> Result<Vec<HistogramBucket>, Error> {
852    let new_histogram_buckets =
853        convert_inspect_histogram_to_cobalt_histogram(new_sample, data_source)?;
854    let old_histogram_buckets =
855        convert_inspect_histogram_to_cobalt_histogram(old_sample, data_source)?;
856
857    if old_histogram_buckets.len() != new_histogram_buckets.len() {
858        return Err(format_err!(
859            "Selector referenced an Inspect IntArray \
860                that was specified as an IntHistogram type \
861                but the histogram bucket count changed between \
862                samples, which is incompatible with Cobalt. \
863                Selector: {:?}, Inspect type: {}",
864            data_source,
865            new_sample.discriminant_name()
866        ));
867    }
868
869    new_histogram_buckets
870        .iter()
871        .zip(old_histogram_buckets)
872        .map(|(new_bucket, old_bucket)| {
873            if new_bucket.count < old_bucket.count {
874                return Err(format_err!(
875                    concat!(
876                        "Selector referenced an Inspect IntArray",
877                        " that was specified as an IntHistogram type ",
878                        " but at least one bucket saw the count decrease",
879                        " between samples, which is incompatible with Cobalt's",
880                        " need for monotonically increasing counts.",
881                        " Selector: {:?}, Inspect type: {}"
882                    ),
883                    data_source,
884                    new_sample.discriminant_name()
885                ));
886            }
887            Ok(HistogramBucket {
888                count: new_bucket.count - old_bucket.count,
889                index: new_bucket.index,
890            })
891        })
892        .collect::<Result<Vec<HistogramBucket>, Error>>()
893}
894
895fn build_cobalt_histogram(counts: impl Iterator<Item = u64>) -> Vec<HistogramBucket> {
896    counts
897        .enumerate()
898        .map(|(index, count)| HistogramBucket { index: index as u32, count })
899        .collect()
900}
901
902fn build_sparse_cobalt_histogram(
903    counts: impl Iterator<Item = u64>,
904    indexes: &[usize],
905    size: usize,
906) -> Vec<HistogramBucket> {
907    let mut histogram =
908        Vec::from_iter((0..size).map(|index| HistogramBucket { index: index as u32, count: 0 }));
909    for (index, count) in indexes.iter().zip(counts) {
910        histogram[*index].count = count;
911    }
912    histogram
913}
914
915fn convert_inspect_histogram_to_cobalt_histogram(
916    inspect_histogram: &Property,
917    data_source: &MetricCacheKey,
918) -> Result<Vec<HistogramBucket>, Error> {
919    macro_rules! err {($($message:expr),+) => {
920        Err(format_err!(
921            concat!($($message),+ , " Selector: {:?}, Inspect type: {}"),
922            data_source,
923            inspect_histogram.discriminant_name()
924        ))
925    }}
926
927    let sanitize_size = |size: usize| -> Result<(), Error> {
928        if size > u32::MAX as usize {
929            return err!(
930                "Selector referenced an Inspect array",
931                " that was specified as a histogram type ",
932                " but contained an index too large for a u32."
933            );
934        }
935        Ok(())
936    };
937
938    let sanitize_indexes = |indexes: &[usize], size: usize| -> Result<(), Error> {
939        for index in indexes.iter() {
940            if *index >= size {
941                return err!(
942                    "Selector referenced an Inspect array",
943                    " that was specified as a histogram type ",
944                    " but contained an invalid index."
945                );
946            }
947        }
948        Ok(())
949    };
950
951    let sanitize_counts = |counts: &[i64]| -> Result<(), Error> {
952        for count in counts.iter() {
953            if *count < 0 {
954                return err!(
955                    "Selector referenced an Inspect IntArray",
956                    " that was specified as an IntHistogram type ",
957                    " but a bucket contained a negative count. This",
958                    " is incompatible with Cobalt histograms which only",
959                    " support positive histogram counts."
960                );
961            }
962        }
963        Ok(())
964    };
965
966    let histogram = match inspect_histogram {
967        Property::IntArray(
968            _,
969            ArrayContent::LinearHistogram(LinearHistogram { counts, indexes, size, .. }),
970        )
971        | Property::IntArray(
972            _,
973            ArrayContent::ExponentialHistogram(ExponentialHistogram {
974                counts, indexes, size, ..
975            }),
976        ) => {
977            sanitize_size(*size)?;
978            sanitize_counts(counts)?;
979            match (indexes, counts) {
980                (None, counts) => build_cobalt_histogram(counts.iter().map(|c| *c as u64)),
981                (Some(indexes), counts) => {
982                    sanitize_indexes(indexes, *size)?;
983                    build_sparse_cobalt_histogram(counts.iter().map(|c| *c as u64), indexes, *size)
984                }
985            }
986        }
987        Property::UintArray(
988            _,
989            ArrayContent::LinearHistogram(LinearHistogram { counts, indexes, size, .. }),
990        )
991        | Property::UintArray(
992            _,
993            ArrayContent::ExponentialHistogram(ExponentialHistogram {
994                counts, indexes, size, ..
995            }),
996        ) => {
997            sanitize_size(*size)?;
998            match (indexes, counts) {
999                (None, counts) => build_cobalt_histogram(counts.iter().copied()),
1000                (Some(indexes), counts) => {
1001                    sanitize_indexes(indexes, *size)?;
1002                    build_sparse_cobalt_histogram(counts.iter().copied(), indexes, *size)
1003                }
1004            }
1005        }
1006        _ => {
1007            // TODO(https://fxbug.dev/42118220): Does cobalt support floors or step counts that are
1008            // not ints? if so, we can support that as well with double arrays if the
1009            // actual counts are whole numbers.
1010            return Err(format_err!(
1011                concat!(
1012                    "Selector referenced an Inspect property",
1013                    " that was specified as an IntHistogram type ",
1014                    " but is unable to be encoded in a cobalt HistogramBucket",
1015                    " vector. Selector: {:?}, Inspect type: {}"
1016                ),
1017                data_source,
1018                inspect_histogram.discriminant_name()
1019            ));
1020        }
1021    };
1022    Ok(histogram)
1023}
1024
1025fn process_int(
1026    new_sample: &Property,
1027    data_source: &MetricCacheKey,
1028) -> Result<Option<MetricEventPayload>, Error> {
1029    let sampled_int = match new_sample {
1030        Property::Uint(_, val) => sanitize_unsigned_numerical(*val, data_source)?,
1031        Property::Int(_, val) => *val,
1032        _ => {
1033            return Err(format_err!(
1034                concat!(
1035                    "Selector referenced an Inspect property",
1036                    " that was specified as an Int type ",
1037                    " but is unable to be encoded in an i64",
1038                    " Selector: {:?}, Inspect type: {}"
1039                ),
1040                data_source,
1041                new_sample.discriminant_name()
1042            ));
1043        }
1044    };
1045
1046    Ok(Some(MetricEventPayload::IntegerValue(sampled_int)))
1047}
1048
1049fn process_string(
1050    new_sample: &Property,
1051    data_source: &MetricCacheKey,
1052) -> Result<Option<MetricEventPayload>, Error> {
1053    let sampled_string = match new_sample {
1054        Property::String(_, val) => val.clone(),
1055        _ => {
1056            return Err(format_err!(
1057                concat!(
1058                    "Selector referenced an Inspect property specified as String",
1059                    " but property is not type String.",
1060                    " Selector: {:?}, Inspect type: {}"
1061                ),
1062                data_source,
1063                new_sample.discriminant_name()
1064            ));
1065        }
1066    };
1067
1068    Ok(Some(MetricEventPayload::StringValue(sampled_string)))
1069}
1070
1071fn process_occurence(
1072    new_sample: &Property,
1073    prev_sample_opt: Option<&Property>,
1074    data_source: &MetricCacheKey,
1075) -> Result<Option<MetricEventPayload>, Error> {
1076    let diff = match prev_sample_opt {
1077        None => compute_initial_event_count(new_sample, data_source)?,
1078        Some(prev_sample) => compute_event_count_diff(new_sample, prev_sample, data_source)?,
1079    };
1080
1081    if diff < 0 {
1082        return Err(format_err!(
1083            concat!(
1084                "Event count must be monotonically increasing,",
1085                " but we observed a negative event count diff for: {:?}"
1086            ),
1087            data_source
1088        ));
1089    }
1090
1091    if diff == 0 {
1092        return Ok(None);
1093    }
1094
1095    // TODO(https://fxbug.dev/42118220): Once fuchsia.cobalt is gone, we don't need to preserve
1096    // occurrence counts "fitting" into i64s.
1097    Ok(Some(MetricEventPayload::Count(diff as u64)))
1098}
1099
1100fn compute_initial_event_count(
1101    new_sample: &Property,
1102    data_source: &MetricCacheKey,
1103) -> Result<i64, Error> {
1104    match new_sample {
1105        Property::Uint(_, val) => sanitize_unsigned_numerical(*val, data_source),
1106        Property::Int(_, val) => Ok(*val),
1107        _ => Err(format_err!(
1108            concat!(
1109                "Selector referenced an Inspect property",
1110                " that is not compatible with cached",
1111                " transformation to an event count.",
1112                " Selector: {:?}, {}"
1113            ),
1114            data_source,
1115            new_sample.discriminant_name()
1116        )),
1117    }
1118}
1119
1120fn compute_event_count_diff(
1121    new_sample: &Property,
1122    old_sample: &Property,
1123    data_source: &MetricCacheKey,
1124) -> Result<i64, Error> {
1125    match (new_sample, old_sample) {
1126        // We don't need to validate that old_count and new_count are positive here.
1127        // If new_count was negative, and old_count was positive, then the diff would be
1128        // negative, which is an errorful state. It's impossible for old_count to be negative
1129        // as either it was the first sample which would make a negative diff which is an error,
1130        // or it was a negative new_count with a positive old_count, which we've already shown will
1131        // produce an errorful state.
1132        (Property::Int(_, new_count), Property::Int(_, old_count)) => Ok(new_count - old_count),
1133        (Property::Uint(_, new_count), Property::Uint(_, old_count)) => {
1134            // u64::MAX will cause sanitized_unsigned_numerical to build an
1135            // appropriate error message for a subtraction underflow.
1136            sanitize_unsigned_numerical(
1137                new_count.checked_sub(*old_count).unwrap_or(u64::MAX),
1138                data_source,
1139            )
1140        }
1141        // If we have a correctly typed new sample, but it didn't match either of the above cases,
1142        // this means the new sample changed types compared to the old sample. We should just
1143        // restart the cache, and treat the new sample as a first observation.
1144        (_, Property::Uint(_, _)) | (_, Property::Int(_, _)) => {
1145            warn!(
1146                "Inspect type of sampled data changed between samples. Restarting cache. {:?}",
1147                data_source
1148            );
1149            compute_initial_event_count(new_sample, data_source)
1150        }
1151        _ => Err(format_err!(
1152            concat!(
1153                "Inspect type of sampled data changed between samples",
1154                " to a type incompatible with event counters.",
1155                " Selector: {:?}, New type: {:?}"
1156            ),
1157            data_source,
1158            new_sample.discriminant_name()
1159        )),
1160    }
1161}
1162
1163fn process_schema_errors(
1164    errors: &Option<Vec<diagnostics_data::InspectError>>,
1165    moniker: &ExtendedMoniker,
1166) {
1167    match errors {
1168        Some(errors) => {
1169            for error in errors {
1170                warn!(moniker:%, error:?; "");
1171            }
1172        }
1173        None => {
1174            warn!(moniker:%; "Encountered null payload and no errors.");
1175        }
1176    }
1177}
1178
1179#[cfg(test)]
1180mod tests {
1181    use super::*;
1182    use diagnostics_data::{InspectDataBuilder, Timestamp};
1183    use diagnostics_hierarchy::hierarchy;
1184    use fidl_fuchsia_inspect::DEFAULT_TREE_NAME;
1185    use sampler_config::{MetricId, ProjectId};
1186
1187    #[fuchsia::test]
1188    fn test_filter_metrics() {
1189        let mut sampler = ProjectSampler {
1190            archive_reader: ArchiveReader::inspect(),
1191            metrics: vec![],
1192            metric_cache: RefCell::new(HashMap::new()),
1193            metric_loggers: HashMap::new(),
1194            project_id: ProjectId(1),
1195            poll_rate_sec: 3600,
1196            project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
1197            all_done: true,
1198        };
1199        let selector_foo: String = "core/foo:[name=foo]root/path:value".to_string();
1200        let selector_bar: String = "core/foo:[name=bar]root/path:value".to_string();
1201
1202        sampler.push_metric(MetricConfig {
1203            project_id: None,
1204            selectors: vec![selectors::parse_verbose(&selector_foo).unwrap()],
1205            metric_id: MetricId(1),
1206            // Occurrence type with a value of zero will not attempt to use any loggers.
1207            metric_type: MetricType::Occurrence,
1208            event_codes: Vec::new(),
1209            // upload_once means that process_component_data will return SelectorsChanged if
1210            // it is found in the map.
1211            upload_once: true,
1212        });
1213        sampler.push_metric(MetricConfig {
1214            project_id: None,
1215            selectors: vec![selectors::parse_verbose(&selector_bar).unwrap()],
1216            metric_id: MetricId(2),
1217            // Occurrence type with a value of zero will not attempt to use any loggers.
1218            metric_type: MetricType::Occurrence,
1219            event_codes: Vec::new(),
1220            // upload_once means that process_component_data will return SelectorsChanged if
1221            // it is found in the map.
1222            upload_once: true,
1223        });
1224
1225        sampler.rebuild_selector_data_structures();
1226
1227        let moniker = ExtendedMoniker::try_from("core/foo").unwrap();
1228
1229        let filtered_metrics =
1230            sampler.filter_metrics_by_moniker_and_tree_name(&moniker, "foo").collect::<Vec<_>>();
1231        assert_eq!(1, filtered_metrics.len());
1232        assert_eq!(MetricId(1), filtered_metrics[0].borrow().metric_id);
1233    }
1234
1235    #[fuchsia::test]
1236    fn test_filter_metrics_with_wildcards() {
1237        let mut sampler = ProjectSampler {
1238            archive_reader: ArchiveReader::inspect(),
1239            metrics: vec![],
1240            metric_cache: RefCell::new(HashMap::new()),
1241            metric_loggers: HashMap::new(),
1242            project_id: ProjectId(1),
1243            poll_rate_sec: 3600,
1244            project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
1245            all_done: true,
1246        };
1247        let selector_foo: String = r"bootstrap/*-drivers\:*:[name=foo]root/path:value".to_string();
1248        let selector_bar1: String =
1249            r"bootstrap/*-drivers\:*:[name=bar]root/path:value1".to_string();
1250        let selector_bar2: String =
1251            r"bootstrap/*-drivers\:*:[name=bar]root/path:value2".to_string();
1252
1253        sampler.push_metric(MetricConfig {
1254            project_id: None,
1255            selectors: vec![selectors::parse_verbose(&selector_foo).unwrap()],
1256            metric_id: MetricId(1),
1257            // Occurrence type with a value of zero will not attempt to use any loggers.
1258            metric_type: MetricType::Occurrence,
1259            event_codes: Vec::new(),
1260            // upload_once means that process_component_data will return SelectorsChanged if
1261            // it is found in the map.
1262            upload_once: true,
1263        });
1264        sampler.push_metric(MetricConfig {
1265            project_id: None,
1266            selectors: vec![selectors::parse_verbose(&selector_bar1).unwrap()],
1267            metric_id: MetricId(2),
1268            // Occurrence type with a value of zero will not attempt to use any loggers.
1269            metric_type: MetricType::Occurrence,
1270            event_codes: Vec::new(),
1271            // upload_once means that process_component_data will return SelectorsChanged if
1272            // it is found in the map.
1273            upload_once: true,
1274        });
1275        sampler.push_metric(MetricConfig {
1276            project_id: None,
1277            selectors: vec![selectors::parse_verbose(&selector_bar2).unwrap()],
1278            metric_id: MetricId(3),
1279            // Occurrence type with a value of zero will not attempt to use any loggers.
1280            metric_type: MetricType::Occurrence,
1281            event_codes: Vec::new(),
1282            // upload_once means that process_component_data will return SelectorsChanged if
1283            // it is found in the map.
1284            upload_once: true,
1285        });
1286
1287        sampler.rebuild_selector_data_structures();
1288
1289        let moniker = ExtendedMoniker::try_from("bootstrap/boot-drivers:098098").unwrap();
1290
1291        let filtered_metrics =
1292            sampler.filter_metrics_by_moniker_and_tree_name(&moniker, "foo").collect::<Vec<_>>();
1293        assert_eq!(1, filtered_metrics.len());
1294        assert_eq!(MetricId(1), filtered_metrics[0].borrow().metric_id);
1295
1296        let filtered_metrics =
1297            sampler.filter_metrics_by_moniker_and_tree_name(&moniker, "bar").collect::<Vec<_>>();
1298        assert_eq!(2, filtered_metrics.len());
1299        assert_eq!(MetricId(2), filtered_metrics[0].borrow().metric_id);
1300        assert_eq!(MetricId(3), filtered_metrics[1].borrow().metric_id);
1301    }
1302
1303    /// Test inserting a string into the hierarchy that requires escaping.
1304    #[fuchsia::test]
1305    fn test_process_payload_with_escapes() {
1306        let unescaped: String = "path/to".to_string();
1307        let hierarchy = hierarchy! {
1308            root: {
1309                var unescaped: {
1310                    value: 0,
1311                    "value/with:escapes": 0,
1312                }
1313            }
1314        };
1315
1316        let mut sampler = ProjectSampler {
1317            archive_reader: ArchiveReader::inspect(),
1318            metrics: vec![],
1319            metric_cache: RefCell::new(HashMap::new()),
1320            metric_loggers: HashMap::new(),
1321            project_id: ProjectId(1),
1322            poll_rate_sec: 3600,
1323            project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
1324            all_done: true,
1325        };
1326        let selector: String = "my/component:root/path\\/to:value".to_string();
1327        sampler.push_metric(MetricConfig {
1328            project_id: None,
1329            selectors: vec![selectors::parse_verbose(&selector).unwrap()],
1330            metric_id: MetricId(1),
1331            // Occurrence type with a value of zero will not attempt to use any loggers.
1332            metric_type: MetricType::Occurrence,
1333            event_codes: Vec::new(),
1334            // upload_once means that process_component_data will return SelectorsChanged if
1335            // it is found in the map.
1336            upload_once: true,
1337        });
1338        sampler.rebuild_selector_data_structures();
1339        match sampler.process_component_data(
1340            &hierarchy,
1341            &InspectHandleName::name(DEFAULT_TREE_NAME),
1342            &"my/component".try_into().unwrap(),
1343        ) {
1344            // This selector will be found and removed from the map, resulting in a
1345            // SelectorsChanged response.
1346            Ok((SnapshotOutcome::SelectorsChanged, _events)) => (),
1347            _ => panic!("Expecting SelectorsChanged from process_component_data."),
1348        }
1349
1350        let selector_with_escaped_property: String =
1351            "my/component:root/path\\/to:value\\/with\\:escapes".to_string();
1352        sampler.push_metric(MetricConfig {
1353            project_id: None,
1354            selectors: vec![selectors::parse_verbose(&selector_with_escaped_property).unwrap()],
1355            metric_id: MetricId(1),
1356            // Occurrence type with a value of zero will not attempt to use any loggers.
1357            metric_type: MetricType::Occurrence,
1358            event_codes: Vec::new(),
1359            // upload_once means that the method will return SelectorsChanged if it is found
1360            // in the map.
1361            upload_once: true,
1362        });
1363        sampler.rebuild_selector_data_structures();
1364        match sampler.process_component_data(
1365            &hierarchy,
1366            &InspectHandleName::name(DEFAULT_TREE_NAME),
1367            &"my/component".try_into().unwrap(),
1368        ) {
1369            // This selector will be found and removed from the map, resulting in a
1370            // SelectorsChanged response.
1371            Ok((SnapshotOutcome::SelectorsChanged, _events)) => (),
1372            _ => panic!("Expecting SelectorsChanged from process_component_data."),
1373        }
1374
1375        let selector_unfound: String = "my/component:root/path/to:value".to_string();
1376        sampler.push_metric(MetricConfig {
1377            project_id: None,
1378            selectors: vec![selectors::parse_verbose(&selector_unfound).unwrap()],
1379            metric_id: MetricId(1),
1380            // Occurrence type with a value of zero will not attempt to use any loggers.
1381            metric_type: MetricType::Occurrence,
1382            event_codes: Vec::new(),
1383            // upload_once means that the method will return SelectorsChanged if it is found
1384            // in the map.
1385            upload_once: true,
1386        });
1387        sampler.rebuild_selector_data_structures();
1388        match sampler.process_component_data(
1389            &hierarchy,
1390            &InspectHandleName::name(DEFAULT_TREE_NAME),
1391            &"my/component".try_into().unwrap(),
1392        ) {
1393            // This selector will not be found and removed from the map, resulting in SelectorsUnchanged.
1394            Ok((SnapshotOutcome::SelectorsUnchanged, _events)) => (),
1395            _ => panic!("Expecting SelectorsUnchanged from process_component_data."),
1396        }
1397    }
1398
1399    /// Test that a decreasing occurrence type (which is not allowed) doesn't crash due to e.g.
1400    /// unchecked unsigned subtraction overflow.
1401    #[fuchsia::test]
1402    fn decreasing_occurrence_is_correct() {
1403        let big_number = Property::Uint("foo".to_string(), 5);
1404        let small_number = Property::Uint("foo".to_string(), 2);
1405        let key = MetricCacheKey {
1406            handle_name: InspectHandleName::name("some_file"),
1407            selector: "sel".to_string(),
1408        };
1409
1410        assert_eq!(
1411            process_sample_for_data_type(
1412                &big_number,
1413                Some(&small_number),
1414                &key,
1415                &MetricType::Occurrence
1416            ),
1417            Some(MetricEventPayload::Count(3))
1418        );
1419        assert_eq!(
1420            process_sample_for_data_type(
1421                &small_number,
1422                Some(&big_number),
1423                &key,
1424                &MetricType::Occurrence
1425            ),
1426            None
1427        );
1428    }
1429
1430    /// Test removal of selectors marked with upload_once.
1431    #[fuchsia::test]
1432    fn test_upload_once() {
1433        let hierarchy = hierarchy! {
1434            root: {
1435                value_one: 0,
1436                value_two: 1,
1437            }
1438        };
1439
1440        let mut sampler = ProjectSampler {
1441            archive_reader: ArchiveReader::inspect(),
1442            metrics: vec![],
1443            metric_cache: RefCell::new(HashMap::new()),
1444            metric_loggers: HashMap::new(),
1445            project_id: ProjectId(1),
1446            poll_rate_sec: 3600,
1447            project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
1448            all_done: true,
1449        };
1450        sampler.push_metric(MetricConfig {
1451            project_id: None,
1452            selectors: vec![selectors::parse_verbose("my/component:root:value_one").unwrap()],
1453            metric_id: MetricId(1),
1454            metric_type: MetricType::Integer,
1455            event_codes: Vec::new(),
1456            upload_once: true,
1457        });
1458        sampler.push_metric(MetricConfig {
1459            project_id: None,
1460            selectors: vec![selectors::parse_verbose("my/component:root:value_two").unwrap()],
1461            metric_id: MetricId(2),
1462            metric_type: MetricType::Integer,
1463            event_codes: Vec::new(),
1464            upload_once: true,
1465        });
1466        sampler.rebuild_selector_data_structures();
1467
1468        // Both selectors should be found and removed from the map.
1469        match sampler.process_component_data(
1470            &hierarchy,
1471            &InspectHandleName::name(DEFAULT_TREE_NAME),
1472            &"my/component".try_into().unwrap(),
1473        ) {
1474            Ok((SnapshotOutcome::SelectorsChanged, _events)) => (),
1475            _ => panic!("Expecting SelectorsChanged from process_component_data."),
1476        }
1477
1478        let moniker: ExtendedMoniker = "my_component".try_into().unwrap();
1479        assert!(sampler
1480            .filter_metrics_by_moniker_and_tree_name(&moniker, DEFAULT_TREE_NAME)
1481            .collect::<Vec<_>>()
1482            .is_empty());
1483    }
1484
1485    struct EventCountTesterParams {
1486        new_val: Property,
1487        old_val: Option<Property>,
1488        process_ok: bool,
1489        event_made: bool,
1490        diff: i64,
1491    }
1492
1493    fn process_occurence_tester(params: EventCountTesterParams) {
1494        let data_source = MetricCacheKey {
1495            handle_name: InspectHandleName::name("foo.file"),
1496            selector: "test:root:count".to_string(),
1497        };
1498        let event_res = process_occurence(&params.new_val, params.old_val.as_ref(), &data_source);
1499
1500        if !params.process_ok {
1501            assert!(event_res.is_err());
1502            return;
1503        }
1504
1505        assert!(event_res.is_ok());
1506
1507        let event_opt = event_res.unwrap();
1508
1509        if !params.event_made {
1510            assert!(event_opt.is_none());
1511            return;
1512        }
1513
1514        assert!(event_opt.is_some());
1515        let event = event_opt.unwrap();
1516        match event {
1517            MetricEventPayload::Count(count) => {
1518                assert_eq!(count, params.diff as u64);
1519            }
1520            _ => panic!("Expecting event counts."),
1521        }
1522    }
1523
1524    #[fuchsia::test]
1525    fn test_normal_process_occurence() {
1526        process_occurence_tester(EventCountTesterParams {
1527            new_val: Property::Int("count".to_string(), 1),
1528            old_val: None,
1529            process_ok: true,
1530            event_made: true,
1531            diff: 1,
1532        });
1533
1534        process_occurence_tester(EventCountTesterParams {
1535            new_val: Property::Int("count".to_string(), 1),
1536            old_val: Some(Property::Int("count".to_string(), 1)),
1537            process_ok: true,
1538            event_made: false,
1539            diff: -1,
1540        });
1541
1542        process_occurence_tester(EventCountTesterParams {
1543            new_val: Property::Int("count".to_string(), 3),
1544            old_val: Some(Property::Int("count".to_string(), 1)),
1545            process_ok: true,
1546            event_made: true,
1547            diff: 2,
1548        });
1549    }
1550
1551    #[fuchsia::test]
1552    fn test_data_type_changing_process_occurence() {
1553        process_occurence_tester(EventCountTesterParams {
1554            new_val: Property::Int("count".to_string(), 1),
1555            old_val: None,
1556            process_ok: true,
1557            event_made: true,
1558            diff: 1,
1559        });
1560
1561        process_occurence_tester(EventCountTesterParams {
1562            new_val: Property::Uint("count".to_string(), 1),
1563            old_val: None,
1564            process_ok: true,
1565            event_made: true,
1566            diff: 1,
1567        });
1568
1569        process_occurence_tester(EventCountTesterParams {
1570            new_val: Property::Uint("count".to_string(), 3),
1571            old_val: Some(Property::Int("count".to_string(), 1)),
1572            process_ok: true,
1573            event_made: true,
1574            diff: 3,
1575        });
1576
1577        process_occurence_tester(EventCountTesterParams {
1578            new_val: Property::String("count".to_string(), "big_oof".to_string()),
1579            old_val: Some(Property::Int("count".to_string(), 1)),
1580            process_ok: false,
1581            event_made: false,
1582            diff: -1,
1583        });
1584    }
1585
1586    #[fuchsia::test]
1587    fn test_event_count_negatives_and_overflows() {
1588        process_occurence_tester(EventCountTesterParams {
1589            new_val: Property::Int("count".to_string(), -11),
1590            old_val: None,
1591            process_ok: false,
1592            event_made: false,
1593            diff: -1,
1594        });
1595
1596        process_occurence_tester(EventCountTesterParams {
1597            new_val: Property::Int("count".to_string(), 9),
1598            old_val: Some(Property::Int("count".to_string(), 10)),
1599            process_ok: false,
1600            event_made: false,
1601            diff: -1,
1602        });
1603
1604        process_occurence_tester(EventCountTesterParams {
1605            new_val: Property::Uint("count".to_string(), u64::MAX),
1606            old_val: None,
1607            process_ok: false,
1608            event_made: false,
1609            diff: -1,
1610        });
1611
1612        let i64_max_in_u64: u64 = i64::MAX.try_into().unwrap();
1613
1614        process_occurence_tester(EventCountTesterParams {
1615            new_val: Property::Uint("count".to_string(), i64_max_in_u64 + 1),
1616            old_val: Some(Property::Uint("count".to_string(), 1)),
1617            process_ok: true,
1618            event_made: true,
1619            diff: i64::MAX,
1620        });
1621
1622        process_occurence_tester(EventCountTesterParams {
1623            new_val: Property::Uint("count".to_string(), i64_max_in_u64 + 2),
1624            old_val: Some(Property::Uint("count".to_string(), 1)),
1625            process_ok: false,
1626            event_made: false,
1627            diff: -1,
1628        });
1629    }
1630
1631    struct IntTesterParams {
1632        new_val: Property,
1633        process_ok: bool,
1634        sample: i64,
1635    }
1636
1637    fn process_int_tester(params: IntTesterParams) {
1638        let data_source = MetricCacheKey {
1639            handle_name: InspectHandleName::name("foo.file"),
1640            selector: "test:root:count".to_string(),
1641        };
1642        let event_res = process_int(&params.new_val, &data_source);
1643
1644        if !params.process_ok {
1645            assert!(event_res.is_err());
1646            return;
1647        }
1648
1649        assert!(event_res.is_ok());
1650
1651        let event = event_res.expect("event should be Ok").expect("event should be Some");
1652        match event {
1653            MetricEventPayload::IntegerValue(val) => {
1654                assert_eq!(val, params.sample);
1655            }
1656            _ => panic!("Expecting event counts."),
1657        }
1658    }
1659
1660    #[fuchsia::test]
1661    fn test_normal_process_int() {
1662        process_int_tester(IntTesterParams {
1663            new_val: Property::Int("count".to_string(), 13),
1664            process_ok: true,
1665            sample: 13,
1666        });
1667
1668        process_int_tester(IntTesterParams {
1669            new_val: Property::Int("count".to_string(), -13),
1670            process_ok: true,
1671            sample: -13,
1672        });
1673
1674        process_int_tester(IntTesterParams {
1675            new_val: Property::Int("count".to_string(), 0),
1676            process_ok: true,
1677            sample: 0,
1678        });
1679
1680        process_int_tester(IntTesterParams {
1681            new_val: Property::Uint("count".to_string(), 13),
1682            process_ok: true,
1683            sample: 13,
1684        });
1685
1686        process_int_tester(IntTesterParams {
1687            new_val: Property::String("count".to_string(), "big_oof".to_string()),
1688            process_ok: false,
1689            sample: -1,
1690        });
1691    }
1692
1693    #[fuchsia::test]
1694    fn test_int_edge_cases() {
1695        process_int_tester(IntTesterParams {
1696            new_val: Property::Int("count".to_string(), i64::MAX),
1697            process_ok: true,
1698            sample: i64::MAX,
1699        });
1700
1701        process_int_tester(IntTesterParams {
1702            new_val: Property::Int("count".to_string(), i64::MIN),
1703            process_ok: true,
1704            sample: i64::MIN,
1705        });
1706
1707        let i64_max_in_u64: u64 = i64::MAX.try_into().unwrap();
1708
1709        process_int_tester(IntTesterParams {
1710            new_val: Property::Uint("count".to_string(), i64_max_in_u64),
1711            process_ok: true,
1712            sample: i64::MAX,
1713        });
1714
1715        process_int_tester(IntTesterParams {
1716            new_val: Property::Uint("count".to_string(), i64_max_in_u64 + 1),
1717            process_ok: false,
1718            sample: -1,
1719        });
1720    }
1721
1722    struct StringTesterParams {
1723        sample: Property,
1724        process_ok: bool,
1725        previous_sample: Option<Property>,
1726    }
1727
1728    fn process_string_tester(params: StringTesterParams) {
1729        let metric_cache_key = MetricCacheKey {
1730            handle_name: InspectHandleName::name("foo.file"),
1731            selector: "test:root:string_val".to_string(),
1732        };
1733
1734        let event = process_sample_for_data_type(
1735            &params.sample,
1736            params.previous_sample.as_ref(),
1737            &metric_cache_key,
1738            &MetricType::String,
1739        );
1740
1741        if !params.process_ok {
1742            assert!(event.is_none());
1743            return;
1744        }
1745
1746        match event.unwrap() {
1747            MetricEventPayload::StringValue(val) => {
1748                assert_eq!(val.as_str(), params.sample.string().unwrap());
1749            }
1750            _ => panic!("Expecting event with StringValue."),
1751        }
1752    }
1753
1754    #[fuchsia::test]
1755    fn test_process_string() {
1756        process_string_tester(StringTesterParams {
1757            sample: Property::String("string_val".to_string(), "Hello, world!".to_string()),
1758            process_ok: true,
1759            previous_sample: None,
1760        });
1761
1762        // Ensure any erroneously cached values are ignored (a warning is logged in this case).
1763
1764        process_string_tester(StringTesterParams {
1765            sample: Property::String("string_val".to_string(), "Hello, world!".to_string()),
1766            process_ok: true,
1767            previous_sample: Some(Property::String("string_val".to_string(), "Uh oh!".to_string())),
1768        });
1769
1770        // Ensure unsupported property types are not erroneously processed.
1771
1772        process_string_tester(StringTesterParams {
1773            sample: Property::Int("string_val".to_string(), 123),
1774            process_ok: false,
1775            previous_sample: None,
1776        });
1777
1778        process_string_tester(StringTesterParams {
1779            sample: Property::Uint("string_val".to_string(), 123),
1780            process_ok: false,
1781            previous_sample: None,
1782        });
1783    }
1784
1785    fn convert_vector_to_int_histogram(hist: Vec<i64>) -> Property {
1786        let size = hist.len();
1787        Property::IntArray(
1788            "Bloop".to_string(),
1789            ArrayContent::LinearHistogram(LinearHistogram {
1790                floor: 1,
1791                step: 1,
1792                counts: hist,
1793                size,
1794                indexes: None,
1795            }),
1796        )
1797    }
1798
1799    fn convert_vector_to_uint_histogram(hist: Vec<u64>) -> Property<String> {
1800        let size = hist.len();
1801        Property::UintArray(
1802            "Bloop".to_string(),
1803            ArrayContent::LinearHistogram(LinearHistogram {
1804                floor: 1,
1805                step: 1,
1806                counts: hist,
1807                size,
1808                indexes: None,
1809            }),
1810        )
1811    }
1812
1813    // Produce condensed histograms. Size is arbitrary 100 - indexes must be less than that.
1814    fn convert_vectors_to_int_histogram(counts: Vec<i64>, indexes: Vec<usize>) -> Property<String> {
1815        let size = 100;
1816        Property::IntArray(
1817            "Bloop".to_string(),
1818            ArrayContent::LinearHistogram(LinearHistogram {
1819                floor: 1,
1820                step: 1,
1821                counts,
1822                size,
1823                indexes: Some(indexes),
1824            }),
1825        )
1826    }
1827
1828    fn convert_vectors_to_uint_histogram(
1829        counts: Vec<u64>,
1830        indexes: Vec<usize>,
1831    ) -> Property<String> {
1832        let size = 100;
1833        Property::UintArray(
1834            "Bloop".to_string(),
1835            ArrayContent::LinearHistogram(LinearHistogram {
1836                floor: 1,
1837                step: 1,
1838                counts,
1839                size,
1840                indexes: Some(indexes),
1841            }),
1842        )
1843    }
1844
1845    struct IntHistogramTesterParams {
1846        new_val: Property,
1847        old_val: Option<Property>,
1848        process_ok: bool,
1849        event_made: bool,
1850        diff: Vec<(u32, u64)>,
1851    }
1852    fn process_int_histogram_tester(params: IntHistogramTesterParams) {
1853        let data_source = MetricCacheKey {
1854            handle_name: InspectHandleName::name("foo.file"),
1855            selector: "test:root:count".to_string(),
1856        };
1857        let event_res =
1858            process_int_histogram(&params.new_val, params.old_val.as_ref(), &data_source);
1859
1860        if !params.process_ok {
1861            assert!(event_res.is_err());
1862            return;
1863        }
1864
1865        assert!(event_res.is_ok());
1866
1867        let event_opt = event_res.unwrap();
1868        if !params.event_made {
1869            assert!(event_opt.is_none());
1870            return;
1871        }
1872
1873        assert!(event_opt.is_some());
1874
1875        let event = event_opt.unwrap();
1876        match event.clone() {
1877            MetricEventPayload::Histogram(histogram_buckets) => {
1878                assert_eq!(histogram_buckets.len(), params.diff.len());
1879
1880                let expected_histogram_buckets = params
1881                    .diff
1882                    .iter()
1883                    .map(|(index, count)| HistogramBucket { index: *index, count: *count })
1884                    .collect::<Vec<HistogramBucket>>();
1885
1886                assert_eq!(histogram_buckets, expected_histogram_buckets);
1887            }
1888            _ => panic!("Expecting int histogram."),
1889        }
1890    }
1891
1892    /// Test that simple in-bounds first-samples of both types of Inspect histograms
1893    /// produce correct event types.
1894    #[fuchsia::test]
1895    fn test_normal_process_int_histogram() {
1896        let new_i64_sample = convert_vector_to_int_histogram(vec![1, 1, 1, 1]);
1897        let new_u64_sample = convert_vector_to_uint_histogram(vec![1, 1, 1, 1]);
1898
1899        process_int_histogram_tester(IntHistogramTesterParams {
1900            new_val: new_i64_sample,
1901            old_val: None,
1902            process_ok: true,
1903            event_made: true,
1904            diff: vec![(0, 1), (1, 1), (2, 1), (3, 1)],
1905        });
1906
1907        process_int_histogram_tester(IntHistogramTesterParams {
1908            new_val: new_u64_sample,
1909            old_val: None,
1910            process_ok: true,
1911            event_made: true,
1912            diff: vec![(0, 1), (1, 1), (2, 1), (3, 1)],
1913        });
1914
1915        // Test an Inspect uint histogram at the boundaries of the type produce valid
1916        // cobalt events.
1917        let new_u64_sample = convert_vector_to_uint_histogram(vec![u64::MAX, u64::MAX, u64::MAX]);
1918        process_int_histogram_tester(IntHistogramTesterParams {
1919            new_val: new_u64_sample,
1920            old_val: None,
1921            process_ok: true,
1922            event_made: true,
1923            diff: vec![(0, u64::MAX), (1, u64::MAX), (2, u64::MAX)],
1924        });
1925
1926        // Test that an empty Inspect histogram produces no event.
1927        let new_u64_sample = convert_vector_to_uint_histogram(Vec::new());
1928        process_int_histogram_tester(IntHistogramTesterParams {
1929            new_val: new_u64_sample,
1930            old_val: None,
1931            process_ok: true,
1932            event_made: false,
1933            diff: Vec::new(),
1934        });
1935
1936        let new_u64_sample = convert_vector_to_uint_histogram(vec![0, 0, 0, 0]);
1937        process_int_histogram_tester(IntHistogramTesterParams {
1938            new_val: new_u64_sample,
1939            old_val: None,
1940            process_ok: true,
1941            event_made: false,
1942            diff: Vec::new(),
1943        });
1944
1945        // Test that monotonically increasing histograms are good!.
1946        let new_u64_sample = convert_vector_to_uint_histogram(vec![2, 1, 2, 1]);
1947        let old_u64_sample = Some(convert_vector_to_uint_histogram(vec![1, 1, 0, 1]));
1948        process_int_histogram_tester(IntHistogramTesterParams {
1949            new_val: new_u64_sample,
1950            old_val: old_u64_sample,
1951            process_ok: true,
1952            event_made: true,
1953            diff: vec![(0, 1), (2, 2)],
1954        });
1955
1956        let new_i64_sample = convert_vector_to_int_histogram(vec![5, 2, 1, 3]);
1957        let old_i64_sample = Some(convert_vector_to_int_histogram(vec![1, 1, 1, 1]));
1958        process_int_histogram_tester(IntHistogramTesterParams {
1959            new_val: new_i64_sample,
1960            old_val: old_i64_sample,
1961            process_ok: true,
1962            event_made: true,
1963            diff: vec![(0, 4), (1, 1), (3, 2)],
1964        });
1965
1966        // Test that changing the histogram type resets the cache.
1967        let new_u64_sample = convert_vector_to_uint_histogram(vec![2, 1, 1, 1]);
1968        let old_i64_sample = Some(convert_vector_to_int_histogram(vec![1, 1, 1, 1]));
1969        process_int_histogram_tester(IntHistogramTesterParams {
1970            new_val: new_u64_sample,
1971            old_val: old_i64_sample,
1972            process_ok: true,
1973            event_made: true,
1974            diff: vec![(0, 2), (1, 1), (2, 1), (3, 1)],
1975        });
1976    }
1977
1978    // Test that we can handle condensed int and uint histograms, even with indexes out of order
1979    #[fuchsia::test]
1980    fn test_normal_process_condensed_histograms() {
1981        let new_u64_sample = convert_vectors_to_int_histogram(vec![2, 6], vec![3, 5]);
1982        let old_u64_sample = Some(convert_vectors_to_int_histogram(vec![1], vec![5]));
1983        process_int_histogram_tester(IntHistogramTesterParams {
1984            new_val: new_u64_sample,
1985            old_val: old_u64_sample,
1986            process_ok: true,
1987            event_made: true,
1988            diff: vec![(3, 2), (5, 5)],
1989        });
1990        let new_i64_sample = convert_vectors_to_uint_histogram(vec![2, 4], vec![5, 3]);
1991        let old_i64_sample = None;
1992        process_int_histogram_tester(IntHistogramTesterParams {
1993            new_val: new_i64_sample,
1994            old_val: old_i64_sample,
1995            process_ok: true,
1996            event_made: true,
1997            diff: vec![(3, 4), (5, 2)],
1998        });
1999    }
2000
2001    #[fuchsia::test]
2002    fn test_errorful_process_int_histogram() {
2003        // Test that changing the histogram length is an error.
2004        let new_u64_sample = convert_vector_to_uint_histogram(vec![1, 1, 1, 1]);
2005        let old_u64_sample = Some(convert_vector_to_uint_histogram(vec![1, 1, 1, 1, 1]));
2006        process_int_histogram_tester(IntHistogramTesterParams {
2007            new_val: new_u64_sample,
2008            old_val: old_u64_sample,
2009            process_ok: false,
2010            event_made: false,
2011            diff: Vec::new(),
2012        });
2013
2014        // Test that new samples cant have negative values.
2015        let new_i64_sample = convert_vector_to_int_histogram(vec![1, 1, -1, 1]);
2016        process_int_histogram_tester(IntHistogramTesterParams {
2017            new_val: new_i64_sample,
2018            old_val: None,
2019            process_ok: false,
2020            event_made: false,
2021            diff: Vec::new(),
2022        });
2023
2024        // Test that histograms must be monotonically increasing.
2025        let new_i64_sample = convert_vector_to_int_histogram(vec![5, 2, 1, 3]);
2026        let old_i64_sample = Some(convert_vector_to_int_histogram(vec![6, 1, 1, 1]));
2027        process_int_histogram_tester(IntHistogramTesterParams {
2028            new_val: new_i64_sample,
2029            old_val: old_i64_sample,
2030            process_ok: false,
2031            event_made: false,
2032            diff: Vec::new(),
2033        });
2034    }
2035
2036    /// Ensure that data distinguished only by metadata handle name - with the same moniker and
2037    /// selector path - is kept properly separate in the previous-value cache. The same
2038    /// MetricConfig should match each data source, but the occurrence counts
2039    /// should reflect that the distinct values are individually tracked.
2040    #[fuchsia::test]
2041    async fn test_inspect_handle_name_distinguishes_data() {
2042        let mut sampler = ProjectSampler {
2043            archive_reader: ArchiveReader::inspect(),
2044            metrics: vec![],
2045            metric_cache: RefCell::new(HashMap::new()),
2046            metric_loggers: HashMap::new(),
2047            project_id: ProjectId(1),
2048            poll_rate_sec: 3600,
2049            project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
2050            all_done: true,
2051        };
2052        let selector: String = "my/component:[...]root/branch:leaf".to_string();
2053        let metric_id = MetricId(1);
2054        let event_codes = vec![];
2055        sampler.push_metric(MetricConfig {
2056            project_id: None,
2057            selectors: vec![selectors::parse_verbose(&selector).unwrap()],
2058            metric_id,
2059            metric_type: MetricType::Occurrence,
2060            event_codes,
2061            upload_once: false,
2062        });
2063        sampler.rebuild_selector_data_structures();
2064
2065        let data1_value4 = vec![InspectDataBuilder::new(
2066            "my/component".try_into().unwrap(),
2067            "component-url",
2068            Timestamp::from_nanos(0),
2069        )
2070        .with_hierarchy(hierarchy! { root: {branch: {leaf: 4i32}}})
2071        .with_name(InspectHandleName::name("name1"))
2072        .build()];
2073        let data2_value3 = vec![InspectDataBuilder::new(
2074            "my/component".try_into().unwrap(),
2075            "component-url",
2076            Timestamp::from_nanos(0),
2077        )
2078        .with_hierarchy(hierarchy! { root: {branch: {leaf: 3i32}}})
2079        .with_name(InspectHandleName::name("name2"))
2080        .build()];
2081        let data1_value6 = vec![InspectDataBuilder::new(
2082            "my/component".try_into().unwrap(),
2083            "component-url",
2084            Timestamp::from_nanos(0),
2085        )
2086        .with_hierarchy(hierarchy! { root: {branch: {leaf: 6i32}}})
2087        .with_name(InspectHandleName::name("name1"))
2088        .build()];
2089        let data2_value8 = vec![InspectDataBuilder::new(
2090            "my/component".try_into().unwrap(),
2091            "component-url",
2092            Timestamp::from_nanos(0),
2093        )
2094        .with_hierarchy(hierarchy! { root: {branch: {leaf: 8i32}}})
2095        .with_name(InspectHandleName::name("name2"))
2096        .build()];
2097
2098        fn expect_one_metric_event_value(
2099            events: Result<Vec<EventToLog>, Error>,
2100            value: u64,
2101            context: &'static str,
2102        ) {
2103            let events = events.expect(context);
2104            assert_eq!(events.len(), 1, "Events len not 1: {}: {}", context, events.len());
2105            let event = &events[0];
2106            let (project_id, MetricEvent { payload, .. }) = event;
2107            assert_eq!(*project_id, ProjectId(1));
2108            if let fidl_fuchsia_metrics::MetricEventPayload::Count(payload) = payload {
2109                assert_eq!(
2110                    payload, &value,
2111                    "Wrong payload, expected {value} got {payload} at {context}"
2112                );
2113            } else {
2114                panic!("Expected MetricEventPayload::Count at {context}, got {payload:?}");
2115            }
2116        }
2117
2118        expect_one_metric_event_value(sampler.process_snapshot(data1_value4).await, 4, "first");
2119        expect_one_metric_event_value(sampler.process_snapshot(data2_value3).await, 3, "second");
2120        expect_one_metric_event_value(sampler.process_snapshot(data1_value6).await, 2, "third");
2121        expect_one_metric_event_value(sampler.process_snapshot(data2_value8).await, 5, "fourth");
2122    }
2123
2124    // TODO(https://fxbug.dev/42071858): we should remove this once we support batching.
2125    #[fuchsia::test]
2126    async fn project_id_can_be_overwritten_by_the_metric_project_id() {
2127        let mut sampler = ProjectSampler {
2128            archive_reader: ArchiveReader::inspect(),
2129            metrics: vec![],
2130            metric_cache: RefCell::new(HashMap::new()),
2131            metric_loggers: HashMap::new(),
2132            project_id: ProjectId(1),
2133            poll_rate_sec: 3600,
2134            project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
2135            all_done: true,
2136        };
2137        let selector: String = "my/component:[name=name1]root/branch:leaf".to_string();
2138        let metric_id = MetricId(1);
2139        let event_codes = vec![];
2140        sampler.push_metric(MetricConfig {
2141            project_id: Some(ProjectId(2)),
2142            selectors: vec![selectors::parse_verbose(&selector).unwrap()],
2143            metric_id,
2144            metric_type: MetricType::Occurrence,
2145            event_codes,
2146            upload_once: false,
2147        });
2148        sampler.rebuild_selector_data_structures();
2149
2150        let value = vec![InspectDataBuilder::new(
2151            "my/component".try_into().unwrap(),
2152            "component-url",
2153            Timestamp::from_nanos(0),
2154        )
2155        .with_hierarchy(hierarchy! { root: {branch: {leaf: 4i32}}})
2156        .with_name(InspectHandleName::name("name1"))
2157        .build()];
2158
2159        let events = sampler.process_snapshot(value).await.expect("processed snapshot");
2160        assert_eq!(events.len(), 1);
2161        let event = &events[0];
2162        let (project_id, MetricEvent { .. }) = event;
2163        assert_eq!(*project_id, ProjectId(2));
2164    }
2165}