1use crate::config::SamplerConfig;
90use crate::diagnostics::*;
91use anyhow::{format_err, Context, Error};
92use diagnostics_data::{Data, InspectHandleName};
93use diagnostics_hierarchy::{
94 ArrayContent, DiagnosticsHierarchy, ExponentialHistogram, LinearHistogram, Property,
95 SelectResult,
96};
97use diagnostics_reader::{ArchiveReader, Inspect, InspectArchiveReader, RetryConfig};
98use fidl_fuchsia_metrics::{
99 HistogramBucket, MetricEvent, MetricEventLoggerFactoryMarker, MetricEventLoggerFactoryProxy,
100 MetricEventLoggerProxy, MetricEventPayload, ProjectSpec,
101};
102use fuchsia_async as fasync;
103use fuchsia_component::client::connect_to_protocol;
104use fuchsia_inspect::{self as inspect, NumericProperty};
105use fuchsia_inspect_derive::WithInspect;
106use futures::channel::oneshot;
107use futures::future::join_all;
108use futures::stream::FuturesUnordered;
109use futures::{select, StreamExt};
110use log::{info, warn};
111use moniker::ExtendedMoniker;
112use sampler_config::runtime::{MetricConfig, ProjectConfig};
113use sampler_config::{MetricType, ProjectId};
114use selectors::SelectorExt;
115use std::cell::{Ref, RefCell, RefMut};
116use std::collections::hash_map::Entry;
117use std::collections::HashMap;
118use std::sync::Arc;
119
120type EventToLog = (ProjectId, MetricEvent);
124
125pub struct TaskCancellation {
126 senders: Vec<oneshot::Sender<()>>,
127 _sampler_executor_stats: Arc<SamplerExecutorStats>,
128 execution_context: fasync::Task<Vec<ProjectSampler>>,
129}
130
131impl TaskCancellation {
132 pub async fn run_without_cancellation(self) {
136 self.execution_context.await;
137 }
138
139 pub async fn perform_reboot_cleanup(self) {
140 self.senders.into_iter().for_each(|sender| {
142 sender
143 .send(())
144 .unwrap_or_else(|err| warn!("Failed to send reboot over oneshot: {:?}", err))
145 });
146
147 let project_samplers: Vec<ProjectSampler> = self.execution_context.await;
150
151 let mut reader = ArchiveReader::inspect();
152
153 for project_sampler in &project_samplers {
154 for metric in &project_sampler.metrics {
155 for selector in metric.borrow().selectors.iter() {
156 reader.add_selector(selector.clone());
157 }
158 }
159 }
160
161 reader.retry(RetryConfig::never());
162
163 let mut reboot_processor = RebootSnapshotProcessor { reader, project_samplers };
164 reboot_processor
167 .process_reboot_sample()
168 .await
169 .unwrap_or_else(|e| warn!("Reboot snapshot failed! {:?}", e));
170 }
171}
172
173struct RebootSnapshotProcessor {
174 reader: InspectArchiveReader,
177 project_samplers: Vec<ProjectSampler>,
180}
181
182impl RebootSnapshotProcessor {
183 pub async fn process_reboot_sample(&mut self) -> Result<(), Error> {
184 let snapshot_data = self.reader.snapshot().await?;
185 for data_packet in snapshot_data {
186 let moniker = data_packet.moniker;
187 match data_packet.payload {
188 None => {
189 process_schema_errors(&data_packet.metadata.errors, &moniker);
190 }
191 Some(payload) => {
192 self.process_single_payload(payload, &data_packet.metadata.name, &moniker).await
193 }
194 }
195 }
196 Ok(())
197 }
198
199 async fn process_single_payload(
200 &mut self,
201 hierarchy: DiagnosticsHierarchy<String>,
202 inspect_handle_name: &InspectHandleName,
203 moniker: &ExtendedMoniker,
204 ) {
205 let mut projects = self
206 .project_samplers
207 .iter_mut()
208 .filter(|p| {
209 p.filter_metrics_by_moniker_and_tree_name(moniker, inspect_handle_name.as_ref())
210 .next()
211 .is_some()
212 })
213 .peekable();
214 if projects.peek().is_none() {
215 warn!(
216 moniker:%,
217 tree_name = inspect_handle_name.as_ref();
218 "no metrics found for moniker and tree_name combination"
219 );
220 return;
221 }
222
223 for project_sampler in projects {
224 let maybe_err = match project_sampler.process_component_data(
228 &hierarchy,
229 inspect_handle_name,
230 moniker,
231 ) {
232 Err(err) => Some(err),
233 Ok((_selector_changes, events_to_log)) => {
234 project_sampler.log_events(events_to_log).await.err()
235 }
236 };
237 if let Some(err) = maybe_err {
238 warn!(err:?; "A project sampler failed to process a reboot sample");
239 }
240 }
241 }
242}
243
244pub struct SamplerExecutor {
246 project_samplers: Vec<ProjectSampler>,
247 sampler_executor_stats: Arc<SamplerExecutorStats>,
248}
249
250impl SamplerExecutor {
251 pub async fn new(sampler_config: SamplerConfig) -> Result<Self, Error> {
254 let metric_logger_factory: Arc<MetricEventLoggerFactoryProxy> = Arc::new(
255 connect_to_protocol::<MetricEventLoggerFactoryMarker>()
256 .context("Failed to connect to the Metric LoggerFactory")?,
257 );
258
259 let minimum_sample_rate_sec = sampler_config.minimum_sample_rate_sec;
260
261 let sampler_executor_stats = Arc::new(
262 SamplerExecutorStats::new()
263 .with_inspect(inspect::component::inspector().root(), "sampler_executor_stats")
264 .unwrap_or_else(|err| {
265 warn!(err:?; "Failed to attach inspector to SamplerExecutorStats struct");
266 SamplerExecutorStats::default()
267 }),
268 );
269
270 sampler_executor_stats
271 .total_project_samplers_configured
272 .add(sampler_config.project_configs.len() as u64);
273
274 let mut project_to_stats_map: HashMap<ProjectId, Arc<ProjectSamplerStats>> = HashMap::new();
275 let project_sampler_futures =
278 sampler_config.project_configs.iter().cloned().map(|project_config| {
279 let project_sampler_stats =
280 project_to_stats_map.entry(project_config.project_id).or_insert_with(|| {
281 Arc::new(
282 ProjectSamplerStats::new()
283 .with_inspect(
284 &sampler_executor_stats.inspect_node,
285 format!("project_{}", project_config.project_id,),
286 )
287 .unwrap_or_else(|err| {
288 warn!(
289 err:?;
290 "Failed to attach inspector to ProjectSamplerStats struct"
291 );
292 ProjectSamplerStats::default()
293 }),
294 )
295 });
296 ProjectSampler::new(
297 project_config,
298 metric_logger_factory.clone(),
299 minimum_sample_rate_sec,
300 project_sampler_stats.clone(),
301 )
302 });
303
304 let mut project_samplers: Vec<ProjectSampler> = Vec::new();
305 for project_sampler in join_all(project_sampler_futures).await.into_iter() {
306 match project_sampler {
307 Ok(project_sampler) => project_samplers.push(project_sampler),
308 Err(e) => {
309 warn!("ProjectSampler construction failed: {:?}", e);
310 }
311 }
312 }
313 Ok(SamplerExecutor { project_samplers, sampler_executor_stats })
314 }
315
316 pub fn execute(self) -> TaskCancellation {
319 let task_cancellation_owned_stats = self.sampler_executor_stats.clone();
323 let execution_context_owned_stats = self.sampler_executor_stats.clone();
324
325 let (senders, mut spawned_tasks): (Vec<oneshot::Sender<()>>, FuturesUnordered<_>) = self
326 .project_samplers
327 .into_iter()
328 .map(|project_sampler| {
329 let (sender, receiver) = oneshot::channel::<()>();
330 (sender, project_sampler.spawn(receiver))
331 })
332 .unzip();
333
334 let execution_context = fasync::Task::local(async move {
335 let mut healthily_exited_samplers = Vec::new();
336 while let Some(sampler_result) = spawned_tasks.next().await {
337 match sampler_result {
338 Err(e) => {
339 warn!("A spawned sampler has failed: {:?}", e);
342 execution_context_owned_stats.errorfully_exited_samplers.add(1);
343 }
344 Ok(ProjectSamplerTaskExit::RebootTriggered(sampler)) => {
345 healthily_exited_samplers.push(sampler);
346 execution_context_owned_stats.reboot_exited_samplers.add(1);
347 }
348 Ok(ProjectSamplerTaskExit::WorkCompleted) => {
349 info!("A sampler completed its workload, and exited.");
350 execution_context_owned_stats.healthily_exited_samplers.add(1);
351 }
352 }
353 }
354
355 healthily_exited_samplers
356 });
357
358 TaskCancellation {
359 execution_context,
360 senders,
361 _sampler_executor_stats: task_cancellation_owned_stats,
362 }
363 }
364}
365
366pub struct ProjectSampler {
367 archive_reader: InspectArchiveReader,
368 metrics: Vec<RefCell<MetricConfig>>,
370 metric_cache: RefCell<HashMap<MetricCacheKey, Property>>,
373 metric_loggers: HashMap<ProjectId, MetricEventLoggerProxy>,
376 poll_rate_sec: i64,
379 project_sampler_stats: Arc<ProjectSamplerStats>,
383 project_id: ProjectId,
386 all_done: bool,
389}
390
391#[derive(Debug, Eq, Hash, PartialEq)]
392struct MetricCacheKey {
393 handle_name: InspectHandleName,
394 selector: String,
395}
396
397#[allow(clippy::large_enum_variant)] pub enum ProjectSamplerTaskExit {
399 RebootTriggered(ProjectSampler),
402 WorkCompleted,
404}
405
406pub enum ProjectSamplerEvent {
407 TimerTriggered,
408 TimerDied,
409 RebootTriggered,
410 RebootChannelClosed(Error),
411}
412
413#[derive(PartialEq)]
418enum SnapshotOutcome {
419 SelectorsChanged,
420 SelectorsUnchanged,
421}
422
423impl ProjectSampler {
424 pub async fn new(
425 config: Arc<ProjectConfig>,
426 metric_logger_factory: Arc<MetricEventLoggerFactoryProxy>,
427 minimum_sample_rate_sec: i64,
428 project_sampler_stats: Arc<ProjectSamplerStats>,
429 ) -> Result<ProjectSampler, Error> {
430 let customer_id = config.customer_id;
431 let project_id = config.project_id;
432 let poll_rate_sec = config.poll_rate_sec;
433 if poll_rate_sec < minimum_sample_rate_sec {
434 return Err(format_err!(
435 concat!(
436 "Project with id: {:?} uses a polling rate:",
437 " {:?} below minimum configured poll rate: {:?}"
438 ),
439 project_id,
440 poll_rate_sec,
441 minimum_sample_rate_sec,
442 ));
443 }
444
445 project_sampler_stats.project_sampler_count.add(1);
446 project_sampler_stats.metrics_configured.add(config.metrics.len() as u64);
447
448 let mut metric_loggers = HashMap::new();
449 if *project_id != 0 {
452 let (metric_logger_proxy, metrics_server_end) = fidl::endpoints::create_proxy();
453 let project_spec = ProjectSpec {
454 customer_id: Some(*customer_id),
455 project_id: Some(*project_id),
456 ..ProjectSpec::default()
457 };
458 metric_logger_factory
459 .create_metric_event_logger(&project_spec, metrics_server_end)
460 .await?
461 .map_err(|e| format_err!("error response for project {}: {:?}", project_id, e))?;
462 metric_loggers.insert(project_id, metric_logger_proxy);
463 }
464 for metric in &config.metrics {
465 if let Some(metric_project_id) = metric.project_id {
466 if let Entry::Vacant(entry) = metric_loggers.entry(metric_project_id) {
467 let (metric_logger_proxy, metrics_server_end) = fidl::endpoints::create_proxy();
468 let project_spec = ProjectSpec {
469 customer_id: Some(*customer_id),
470 project_id: Some(*metric_project_id),
471 ..ProjectSpec::default()
472 };
473 metric_logger_factory
474 .create_metric_event_logger(&project_spec, metrics_server_end)
475 .await?
476 .map_err(|e|
477 format_err!(
478 "error response for project {} while creating metric logger {}: {:?}",
479 project_id,
480 metric_project_id,
481 e
482 ))?;
483 entry.insert(metric_logger_proxy);
484 }
485 }
486 }
487
488 let mut project_sampler = ProjectSampler {
489 project_id,
490 archive_reader: ArchiveReader::inspect(),
491 metrics: config.metrics.iter().map(|m| RefCell::new(m.clone())).collect(),
492 metric_cache: RefCell::new(HashMap::new()),
493 metric_loggers,
494 poll_rate_sec,
495 project_sampler_stats,
496 all_done: true,
497 };
498 project_sampler.rebuild_selector_data_structures();
500 Ok(project_sampler)
501 }
502
503 pub fn spawn(
504 mut self,
505 mut reboot_oneshot: oneshot::Receiver<()>,
506 ) -> fasync::Task<Result<ProjectSamplerTaskExit, Error>> {
507 fasync::Task::local(async move {
508 let mut periodic_timer =
509 fasync::Interval::new(zx::MonotonicDuration::from_seconds(self.poll_rate_sec));
510 loop {
511 let done = select! {
512 opt = periodic_timer.next() => {
513 if opt.is_some() {
514 ProjectSamplerEvent::TimerTriggered
515 } else {
516 ProjectSamplerEvent::TimerDied
517 }
518 },
519 oneshot_res = reboot_oneshot => {
520 match oneshot_res {
521 Ok(()) => {
522 ProjectSamplerEvent::RebootTriggered
523 },
524 Err(e) => {
525 ProjectSamplerEvent::RebootChannelClosed(
526 format_err!("Oneshot closure error: {:?}", e))
527 }
528 }
529 }
530 };
531
532 match done {
533 ProjectSamplerEvent::TimerDied => {
534 return Err(format_err!(concat!(
535 "The ProjectSampler timer died, something went wrong.",
536 )));
537 }
538 ProjectSamplerEvent::RebootChannelClosed(e) => {
539 return Err(format_err!(
542 concat!(
543 "The Reboot signaling oneshot died, something went wrong: {:?}",
544 ),
545 e
546 ));
547 }
548 ProjectSamplerEvent::RebootTriggered => {
549 return Ok(ProjectSamplerTaskExit::RebootTriggered(self));
552 }
553 ProjectSamplerEvent::TimerTriggered => {
554 self.process_next_snapshot().await?;
555 if self.is_all_done() {
560 return Ok(ProjectSamplerTaskExit::WorkCompleted);
561 }
562 }
563 }
564 }
565 })
566 }
567
568 async fn process_next_snapshot(&mut self) -> Result<(), Error> {
569 let snapshot_data = self.archive_reader.snapshot().await?;
570 let events_to_log = self.process_snapshot(snapshot_data).await?;
571 self.log_events(events_to_log).await?;
572 Ok(())
573 }
574
575 async fn process_snapshot(
576 &mut self,
577 snapshot: Vec<Data<Inspect>>,
578 ) -> Result<Vec<EventToLog>, Error> {
579 let mut selectors_changed = false;
580 let mut events_to_log = vec![];
581 for data_packet in snapshot.iter() {
582 match &data_packet.payload {
583 None => {
584 process_schema_errors(&data_packet.metadata.errors, &data_packet.moniker);
585 }
586 Some(payload) => {
587 let (selector_outcome, mut events) = self.process_component_data(
588 payload,
589 &data_packet.metadata.name,
590 &data_packet.moniker,
591 )?;
592 if selector_outcome == SnapshotOutcome::SelectorsChanged {
593 selectors_changed = true;
594 }
595 events_to_log.append(&mut events);
596 }
597 }
598 }
599 if selectors_changed {
600 self.rebuild_selector_data_structures();
601 }
602 Ok(events_to_log)
603 }
604
605 fn is_all_done(&self) -> bool {
606 self.all_done
607 }
608
609 fn rebuild_selector_data_structures(&mut self) {
610 self.archive_reader = ArchiveReader::inspect();
611 for metric in &self.metrics {
612 for selector in metric.borrow().selectors.iter().cloned() {
613 self.archive_reader.add_selector(selector);
614 self.all_done = false;
615 }
616 }
617 self.archive_reader.retry(RetryConfig::never());
618 }
619
620 fn filter_metrics_by_moniker_and_tree_name<'a>(
621 &'a self,
622 moniker: &'a ExtendedMoniker,
623 tree_name: &'a str,
624 ) -> impl Iterator<Item = &'a RefCell<MetricConfig>> {
625 self.metrics.iter().filter(|metric| {
626 moniker
627 .match_against_selectors_and_tree_name(tree_name, metric.borrow().selectors.iter())
628 .next()
629 .is_some()
630 })
631 }
632
633 fn process_component_data(
634 &mut self,
635 payload: &DiagnosticsHierarchy,
636 inspect_handle_name: &InspectHandleName,
637 moniker: &ExtendedMoniker,
638 ) -> Result<(SnapshotOutcome, Vec<EventToLog>), Error> {
639 let filtered_metrics =
640 self.filter_metrics_by_moniker_and_tree_name(moniker, inspect_handle_name.as_ref());
641 let mut snapshot_outcome = SnapshotOutcome::SelectorsUnchanged;
642 let mut events_to_log = vec![];
643 for metric in filtered_metrics {
644 let mut selector_to_keep = None;
645 let project_id = metric.borrow().project_id.unwrap_or(self.project_id);
646 for (selector_idx, selector) in metric.borrow().selectors.iter().enumerate() {
647 let found_properties =
648 diagnostics_hierarchy::select_from_hierarchy(payload, selector)?;
649 match found_properties {
650 SelectResult::Properties(p) if p.is_empty() => {}
653 SelectResult::Properties(p) if p.len() == 1 => {
654 let metric_cache_key = MetricCacheKey {
655 handle_name: inspect_handle_name.clone(),
656 selector: selectors::selector_to_string(selector, Default::default())
657 .unwrap(),
658 };
659 if let Some(event) = Self::prepare_sample(
660 metric.borrow(),
661 self.metric_cache.borrow_mut(),
662 metric_cache_key,
663 p[0],
664 )? {
665 events_to_log.push((project_id, event));
666 }
667 selector_to_keep = Some(selector_idx);
668 break;
669 }
670 too_many => {
671 warn!(
672 too_many:?,
673 selector:?;
674 "Too many matches for selector"
675 );
676 }
677 }
678 }
679
680 if let Some(selector_idx) = selector_to_keep {
681 if Self::update_selectors_for_metric(metric.borrow_mut(), selector_idx) {
682 snapshot_outcome = SnapshotOutcome::SelectorsChanged;
683 }
684 }
685 }
686 Ok((snapshot_outcome, events_to_log))
687 }
688
689 fn update_selectors_for_metric(
690 mut metric: RefMut<'_, MetricConfig>,
691 keep_selector_idx: usize,
692 ) -> bool {
693 if metric.upload_once {
694 metric.selectors = Vec::new();
695 return true;
696 }
697 let deleted = metric.selectors.len() > 1;
698 metric.selectors = vec![metric.selectors.remove(keep_selector_idx)];
699 deleted
700 }
701
702 fn prepare_sample<'a>(
703 metric: Ref<'a, MetricConfig>,
704 metric_cache: RefMut<'a, HashMap<MetricCacheKey, Property>>,
705 metric_cache_key: MetricCacheKey,
706 new_sample: &Property,
707 ) -> Result<Option<MetricEvent>, Error> {
708 let previous_sample_opt: Option<&Property> = metric_cache.get(&metric_cache_key);
709
710 if let Some(payload) = process_sample_for_data_type(
711 new_sample,
712 previous_sample_opt,
713 &metric_cache_key,
714 &metric.metric_type,
715 ) {
716 Self::maybe_update_cache(
717 metric_cache,
718 new_sample,
719 &metric.metric_type,
720 metric_cache_key,
721 );
722 Ok(Some(MetricEvent {
723 metric_id: *metric.metric_id,
724 event_codes: metric.event_codes.iter().map(|code| **code).collect(),
725 payload,
726 }))
727 } else {
728 Ok(None)
729 }
730 }
731
732 async fn log_events(&mut self, events: Vec<EventToLog>) -> Result<(), Error> {
733 for (project_id, event) in events.into_iter() {
734 self.metric_loggers
735 .get(&project_id)
736 .as_ref()
737 .unwrap()
738 .log_metric_events(&[event])
739 .await?
740 .map_err(|e| format_err!("error from cobalt: {:?}", e))?;
741 self.project_sampler_stats.cobalt_logs_sent.add(1);
742 }
743 Ok(())
744 }
745
746 fn maybe_update_cache(
747 mut cache: RefMut<'_, HashMap<MetricCacheKey, Property>>,
748 new_sample: &Property,
749 data_type: &MetricType,
750 metric_cache_key: MetricCacheKey,
751 ) {
752 match data_type {
753 MetricType::Occurrence | MetricType::IntHistogram => {
754 cache.insert(metric_cache_key, new_sample.clone());
755 }
756 MetricType::Integer | MetricType::String => (),
757 }
758 }
759}
760
761#[cfg(test)]
762impl ProjectSampler {
763 fn push_metric(&mut self, metric: MetricConfig) {
764 self.metrics.push(RefCell::new(metric));
765 }
766}
767
768fn process_sample_for_data_type(
769 new_sample: &Property,
770 previous_sample_opt: Option<&Property>,
771 data_source: &MetricCacheKey,
772 data_type: &MetricType,
773) -> Option<MetricEventPayload> {
774 let event_payload_res = match data_type {
775 MetricType::Occurrence => process_occurence(new_sample, previous_sample_opt, data_source),
776 MetricType::IntHistogram => {
777 process_int_histogram(new_sample, previous_sample_opt, data_source)
778 }
779 MetricType::Integer => {
780 if previous_sample_opt.is_some() {
784 warn!("Sampler has erroneously cached an Int type metric: {:?}", data_source);
785 }
786 process_int(new_sample, data_source)
787 }
788 MetricType::String => {
789 if previous_sample_opt.is_some() {
790 warn!("Sampler has erroneously cached a String type metric: {:?}", data_source);
791 }
792 process_string(new_sample, data_source)
793 }
794 };
795
796 match event_payload_res {
797 Ok(payload_opt) => payload_opt,
798 Err(err) => {
799 warn!(data_source:?, err:?; "Failed to process Inspect property for cobalt",);
800 None
801 }
802 }
803}
804
805fn sanitize_unsigned_numerical(diff: u64, data_source: &MetricCacheKey) -> Result<i64, Error> {
809 match diff.try_into() {
810 Ok(diff) => Ok(diff),
811 Err(e) => Err(format_err!(
812 concat!(
813 "Selector used for EventCount type",
814 " refered to an unsigned int property,",
815 " but cobalt requires i64, and casting introduced overflow",
816 " which produces a negative int: {:?}. This could be due to",
817 " a single sample being larger than i64, or a diff between",
818 " samples being larger than i64. Error: {:?}"
819 ),
820 data_source,
821 e
822 )),
823 }
824}
825
826fn process_int_histogram(
827 new_sample: &Property,
828 prev_sample_opt: Option<&Property>,
829 data_source: &MetricCacheKey,
830) -> Result<Option<MetricEventPayload>, Error> {
831 let diff = match prev_sample_opt {
832 None => convert_inspect_histogram_to_cobalt_histogram(new_sample, data_source)?,
833 Some(prev_sample) => {
834 if std::mem::discriminant(new_sample) == std::mem::discriminant(prev_sample) {
836 compute_histogram_diff(new_sample, prev_sample, data_source)?
837 } else {
838 convert_inspect_histogram_to_cobalt_histogram(new_sample, data_source)?
839 }
840 }
841 };
842
843 let non_empty_diff: Vec<HistogramBucket> = diff.into_iter().filter(|v| v.count != 0).collect();
844 if !non_empty_diff.is_empty() {
845 Ok(Some(MetricEventPayload::Histogram(non_empty_diff)))
846 } else {
847 Ok(None)
848 }
849}
850
851fn compute_histogram_diff(
852 new_sample: &Property,
853 old_sample: &Property,
854 data_source: &MetricCacheKey,
855) -> Result<Vec<HistogramBucket>, Error> {
856 let new_histogram_buckets =
857 convert_inspect_histogram_to_cobalt_histogram(new_sample, data_source)?;
858 let old_histogram_buckets =
859 convert_inspect_histogram_to_cobalt_histogram(old_sample, data_source)?;
860
861 if old_histogram_buckets.len() != new_histogram_buckets.len() {
862 return Err(format_err!(
863 concat!(
864 "Selector referenced an Inspect IntArray",
865 " that was specified as an IntHistogram type ",
866 " but the histogram bucket count changed between",
867 " samples, which is incompatible with Cobalt.",
868 " Selector: {:?}, Inspect type: {}"
869 ),
870 data_source,
871 new_sample.discriminant_name()
872 ));
873 }
874
875 new_histogram_buckets
876 .iter()
877 .zip(old_histogram_buckets)
878 .map(|(new_bucket, old_bucket)| {
879 if new_bucket.count < old_bucket.count {
880 return Err(format_err!(
881 concat!(
882 "Selector referenced an Inspect IntArray",
883 " that was specified as an IntHistogram type ",
884 " but at least one bucket saw the count decrease",
885 " between samples, which is incompatible with Cobalt's",
886 " need for monotonically increasing counts.",
887 " Selector: {:?}, Inspect type: {}"
888 ),
889 data_source,
890 new_sample.discriminant_name()
891 ));
892 }
893 Ok(HistogramBucket {
894 count: new_bucket.count - old_bucket.count,
895 index: new_bucket.index,
896 })
897 })
898 .collect::<Result<Vec<HistogramBucket>, Error>>()
899}
900
901fn build_cobalt_histogram(counts: impl Iterator<Item = u64>) -> Vec<HistogramBucket> {
902 counts
903 .enumerate()
904 .map(|(index, count)| HistogramBucket { index: index as u32, count })
905 .collect()
906}
907
908fn build_sparse_cobalt_histogram(
909 counts: impl Iterator<Item = u64>,
910 indexes: &[usize],
911 size: usize,
912) -> Vec<HistogramBucket> {
913 let mut histogram =
914 Vec::from_iter((0..size).map(|index| HistogramBucket { index: index as u32, count: 0 }));
915 for (index, count) in indexes.iter().zip(counts) {
916 histogram[*index].count = count;
917 }
918 histogram
919}
920
921fn convert_inspect_histogram_to_cobalt_histogram(
922 inspect_histogram: &Property,
923 data_source: &MetricCacheKey,
924) -> Result<Vec<HistogramBucket>, Error> {
925 macro_rules! err {($($message:expr),+) => {
926 Err(format_err!(
927 concat!($($message),+ , " Selector: {:?}, Inspect type: {}"),
928 data_source,
929 inspect_histogram.discriminant_name()
930 ))
931 }}
932
933 let sanitize_size = |size: usize| -> Result<(), Error> {
934 if size > u32::MAX as usize {
935 return err!(
936 "Selector referenced an Inspect array",
937 " that was specified as a histogram type ",
938 " but contained an index too large for a u32."
939 );
940 }
941 Ok(())
942 };
943
944 let sanitize_indexes = |indexes: &[usize], size: usize| -> Result<(), Error> {
945 for index in indexes.iter() {
946 if *index >= size {
947 return err!(
948 "Selector referenced an Inspect array",
949 " that was specified as a histogram type ",
950 " but contained an invalid index."
951 );
952 }
953 }
954 Ok(())
955 };
956
957 let sanitize_counts = |counts: &[i64]| -> Result<(), Error> {
958 for count in counts.iter() {
959 if *count < 0 {
960 return err!(
961 "Selector referenced an Inspect IntArray",
962 " that was specified as an IntHistogram type ",
963 " but a bucket contained a negative count. This",
964 " is incompatible with Cobalt histograms which only",
965 " support positive histogram counts."
966 );
967 }
968 }
969 Ok(())
970 };
971
972 let histogram = match inspect_histogram {
973 Property::IntArray(
974 _,
975 ArrayContent::LinearHistogram(LinearHistogram { counts, indexes, size, .. }),
976 )
977 | Property::IntArray(
978 _,
979 ArrayContent::ExponentialHistogram(ExponentialHistogram {
980 counts, indexes, size, ..
981 }),
982 ) => {
983 sanitize_size(*size)?;
984 sanitize_counts(counts)?;
985 match (indexes, counts) {
986 (None, counts) => build_cobalt_histogram(counts.iter().map(|c| *c as u64)),
987 (Some(indexes), counts) => {
988 sanitize_indexes(indexes, *size)?;
989 build_sparse_cobalt_histogram(counts.iter().map(|c| *c as u64), indexes, *size)
990 }
991 }
992 }
993 Property::UintArray(
994 _,
995 ArrayContent::LinearHistogram(LinearHistogram { counts, indexes, size, .. }),
996 )
997 | Property::UintArray(
998 _,
999 ArrayContent::ExponentialHistogram(ExponentialHistogram {
1000 counts, indexes, size, ..
1001 }),
1002 ) => {
1003 sanitize_size(*size)?;
1004 match (indexes, counts) {
1005 (None, counts) => build_cobalt_histogram(counts.iter().copied()),
1006 (Some(indexes), counts) => {
1007 sanitize_indexes(indexes, *size)?;
1008 build_sparse_cobalt_histogram(counts.iter().copied(), indexes, *size)
1009 }
1010 }
1011 }
1012 _ => {
1013 return Err(format_err!(
1017 concat!(
1018 "Selector referenced an Inspect property",
1019 " that was specified as an IntHistogram type ",
1020 " but is unable to be encoded in a cobalt HistogramBucket",
1021 " vector. Selector: {:?}, Inspect type: {}"
1022 ),
1023 data_source,
1024 inspect_histogram.discriminant_name()
1025 ));
1026 }
1027 };
1028 Ok(histogram)
1029}
1030
1031fn process_int(
1032 new_sample: &Property,
1033 data_source: &MetricCacheKey,
1034) -> Result<Option<MetricEventPayload>, Error> {
1035 let sampled_int = match new_sample {
1036 Property::Uint(_, val) => sanitize_unsigned_numerical(*val, data_source)?,
1037 Property::Int(_, val) => *val,
1038 _ => {
1039 return Err(format_err!(
1040 concat!(
1041 "Selector referenced an Inspect property",
1042 " that was specified as an Int type ",
1043 " but is unable to be encoded in an i64",
1044 " Selector: {:?}, Inspect type: {}"
1045 ),
1046 data_source,
1047 new_sample.discriminant_name()
1048 ));
1049 }
1050 };
1051
1052 Ok(Some(MetricEventPayload::IntegerValue(sampled_int)))
1053}
1054
1055fn process_string(
1056 new_sample: &Property,
1057 data_source: &MetricCacheKey,
1058) -> Result<Option<MetricEventPayload>, Error> {
1059 let sampled_string = match new_sample {
1060 Property::String(_, val) => val.clone(),
1061 _ => {
1062 return Err(format_err!(
1063 concat!(
1064 "Selector referenced an Inspect property specified as String",
1065 " but property is not type String.",
1066 " Selector: {:?}, Inspect type: {}"
1067 ),
1068 data_source,
1069 new_sample.discriminant_name()
1070 ));
1071 }
1072 };
1073
1074 Ok(Some(MetricEventPayload::StringValue(sampled_string)))
1075}
1076
1077fn process_occurence(
1078 new_sample: &Property,
1079 prev_sample_opt: Option<&Property>,
1080 data_source: &MetricCacheKey,
1081) -> Result<Option<MetricEventPayload>, Error> {
1082 let diff = match prev_sample_opt {
1083 None => compute_initial_event_count(new_sample, data_source)?,
1084 Some(prev_sample) => compute_event_count_diff(new_sample, prev_sample, data_source)?,
1085 };
1086
1087 if diff < 0 {
1088 return Err(format_err!(
1089 concat!(
1090 "Event count must be monotonically increasing,",
1091 " but we observed a negative event count diff for: {:?}"
1092 ),
1093 data_source
1094 ));
1095 }
1096
1097 if diff == 0 {
1098 return Ok(None);
1099 }
1100
1101 Ok(Some(MetricEventPayload::Count(diff as u64)))
1104}
1105
1106fn compute_initial_event_count(
1107 new_sample: &Property,
1108 data_source: &MetricCacheKey,
1109) -> Result<i64, Error> {
1110 match new_sample {
1111 Property::Uint(_, val) => sanitize_unsigned_numerical(*val, data_source),
1112 Property::Int(_, val) => Ok(*val),
1113 _ => Err(format_err!(
1114 concat!(
1115 "Selector referenced an Inspect property",
1116 " that is not compatible with cached",
1117 " transformation to an event count.",
1118 " Selector: {:?}, {}"
1119 ),
1120 data_source,
1121 new_sample.discriminant_name()
1122 )),
1123 }
1124}
1125
1126fn compute_event_count_diff(
1127 new_sample: &Property,
1128 old_sample: &Property,
1129 data_source: &MetricCacheKey,
1130) -> Result<i64, Error> {
1131 match (new_sample, old_sample) {
1132 (Property::Int(_, new_count), Property::Int(_, old_count)) => Ok(new_count - old_count),
1139 (Property::Uint(_, new_count), Property::Uint(_, old_count)) => {
1140 sanitize_unsigned_numerical(
1143 new_count.checked_sub(*old_count).unwrap_or(u64::MAX),
1144 data_source,
1145 )
1146 }
1147 (_, Property::Uint(_, _)) | (_, Property::Int(_, _)) => {
1151 warn!(
1152 "Inspect type of sampled data changed between samples. Restarting cache. {:?}",
1153 data_source
1154 );
1155 compute_initial_event_count(new_sample, data_source)
1156 }
1157 _ => Err(format_err!(
1158 concat!(
1159 "Inspect type of sampled data changed between samples",
1160 " to a type incompatible with event counters.",
1161 " Selector: {:?}, New type: {:?}"
1162 ),
1163 data_source,
1164 new_sample.discriminant_name()
1165 )),
1166 }
1167}
1168
1169fn process_schema_errors(
1170 errors: &Option<Vec<diagnostics_data::InspectError>>,
1171 moniker: &ExtendedMoniker,
1172) {
1173 match errors {
1174 Some(errors) => {
1175 for error in errors {
1176 warn!(moniker:%, error:?; "");
1177 }
1178 }
1179 None => {
1180 warn!(moniker:%; "Encountered null payload and no errors.");
1181 }
1182 }
1183}
1184
1185#[cfg(test)]
1186mod tests {
1187 use super::*;
1188 use diagnostics_data::{InspectDataBuilder, Timestamp};
1189 use diagnostics_hierarchy::hierarchy;
1190 use fidl_fuchsia_inspect::DEFAULT_TREE_NAME;
1191 use sampler_config::{MetricId, ProjectId};
1192
1193 #[fuchsia::test]
1194 fn test_filter_metrics() {
1195 let mut sampler = ProjectSampler {
1196 archive_reader: ArchiveReader::inspect(),
1197 metrics: vec![],
1198 metric_cache: RefCell::new(HashMap::new()),
1199 metric_loggers: HashMap::new(),
1200 project_id: ProjectId(1),
1201 poll_rate_sec: 3600,
1202 project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
1203 all_done: true,
1204 };
1205 let selector_foo: String = "core/foo:[name=foo]root/path:value".to_string();
1206 let selector_bar: String = "core/foo:[name=bar]root/path:value".to_string();
1207
1208 sampler.push_metric(MetricConfig {
1209 project_id: None,
1210 selectors: vec![selectors::parse_verbose(&selector_foo).unwrap()],
1211 metric_id: MetricId(1),
1212 metric_type: MetricType::Occurrence,
1214 event_codes: Vec::new(),
1215 upload_once: true,
1218 });
1219 sampler.push_metric(MetricConfig {
1220 project_id: None,
1221 selectors: vec![selectors::parse_verbose(&selector_bar).unwrap()],
1222 metric_id: MetricId(2),
1223 metric_type: MetricType::Occurrence,
1225 event_codes: Vec::new(),
1226 upload_once: true,
1229 });
1230
1231 sampler.rebuild_selector_data_structures();
1232
1233 let moniker = ExtendedMoniker::try_from("core/foo").unwrap();
1234
1235 let filtered_metrics =
1236 sampler.filter_metrics_by_moniker_and_tree_name(&moniker, "foo").collect::<Vec<_>>();
1237 assert_eq!(1, filtered_metrics.len());
1238 assert_eq!(MetricId(1), filtered_metrics[0].borrow().metric_id);
1239 }
1240
1241 #[fuchsia::test]
1242 fn test_filter_metrics_with_wildcards() {
1243 let mut sampler = ProjectSampler {
1244 archive_reader: ArchiveReader::inspect(),
1245 metrics: vec![],
1246 metric_cache: RefCell::new(HashMap::new()),
1247 metric_loggers: HashMap::new(),
1248 project_id: ProjectId(1),
1249 poll_rate_sec: 3600,
1250 project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
1251 all_done: true,
1252 };
1253 let selector_foo: String = r"bootstrap/*-drivers\:*:[name=foo]root/path:value".to_string();
1254 let selector_bar1: String =
1255 r"bootstrap/*-drivers\:*:[name=bar]root/path:value1".to_string();
1256 let selector_bar2: String =
1257 r"bootstrap/*-drivers\:*:[name=bar]root/path:value2".to_string();
1258
1259 sampler.push_metric(MetricConfig {
1260 project_id: None,
1261 selectors: vec![selectors::parse_verbose(&selector_foo).unwrap()],
1262 metric_id: MetricId(1),
1263 metric_type: MetricType::Occurrence,
1265 event_codes: Vec::new(),
1266 upload_once: true,
1269 });
1270 sampler.push_metric(MetricConfig {
1271 project_id: None,
1272 selectors: vec![selectors::parse_verbose(&selector_bar1).unwrap()],
1273 metric_id: MetricId(2),
1274 metric_type: MetricType::Occurrence,
1276 event_codes: Vec::new(),
1277 upload_once: true,
1280 });
1281 sampler.push_metric(MetricConfig {
1282 project_id: None,
1283 selectors: vec![selectors::parse_verbose(&selector_bar2).unwrap()],
1284 metric_id: MetricId(3),
1285 metric_type: MetricType::Occurrence,
1287 event_codes: Vec::new(),
1288 upload_once: true,
1291 });
1292
1293 sampler.rebuild_selector_data_structures();
1294
1295 let moniker = ExtendedMoniker::try_from("bootstrap/boot-drivers:098098").unwrap();
1296
1297 let filtered_metrics =
1298 sampler.filter_metrics_by_moniker_and_tree_name(&moniker, "foo").collect::<Vec<_>>();
1299 assert_eq!(1, filtered_metrics.len());
1300 assert_eq!(MetricId(1), filtered_metrics[0].borrow().metric_id);
1301
1302 let filtered_metrics =
1303 sampler.filter_metrics_by_moniker_and_tree_name(&moniker, "bar").collect::<Vec<_>>();
1304 assert_eq!(2, filtered_metrics.len());
1305 assert_eq!(MetricId(2), filtered_metrics[0].borrow().metric_id);
1306 assert_eq!(MetricId(3), filtered_metrics[1].borrow().metric_id);
1307 }
1308
1309 #[fuchsia::test]
1311 fn test_process_payload_with_escapes() {
1312 let unescaped: String = "path/to".to_string();
1313 let hierarchy = hierarchy! {
1314 root: {
1315 var unescaped: {
1316 value: 0,
1317 "value/with:escapes": 0,
1318 }
1319 }
1320 };
1321
1322 let mut sampler = ProjectSampler {
1323 archive_reader: ArchiveReader::inspect(),
1324 metrics: vec![],
1325 metric_cache: RefCell::new(HashMap::new()),
1326 metric_loggers: HashMap::new(),
1327 project_id: ProjectId(1),
1328 poll_rate_sec: 3600,
1329 project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
1330 all_done: true,
1331 };
1332 let selector: String = "my/component:root/path\\/to:value".to_string();
1333 sampler.push_metric(MetricConfig {
1334 project_id: None,
1335 selectors: vec![selectors::parse_verbose(&selector).unwrap()],
1336 metric_id: MetricId(1),
1337 metric_type: MetricType::Occurrence,
1339 event_codes: Vec::new(),
1340 upload_once: true,
1343 });
1344 sampler.rebuild_selector_data_structures();
1345 match sampler.process_component_data(
1346 &hierarchy,
1347 &InspectHandleName::name(DEFAULT_TREE_NAME),
1348 &"my/component".try_into().unwrap(),
1349 ) {
1350 Ok((SnapshotOutcome::SelectorsChanged, _events)) => (),
1353 _ => panic!("Expecting SelectorsChanged from process_component_data."),
1354 }
1355
1356 let selector_with_escaped_property: String =
1357 "my/component:root/path\\/to:value\\/with\\:escapes".to_string();
1358 sampler.push_metric(MetricConfig {
1359 project_id: None,
1360 selectors: vec![selectors::parse_verbose(&selector_with_escaped_property).unwrap()],
1361 metric_id: MetricId(1),
1362 metric_type: MetricType::Occurrence,
1364 event_codes: Vec::new(),
1365 upload_once: true,
1368 });
1369 sampler.rebuild_selector_data_structures();
1370 match sampler.process_component_data(
1371 &hierarchy,
1372 &InspectHandleName::name(DEFAULT_TREE_NAME),
1373 &"my/component".try_into().unwrap(),
1374 ) {
1375 Ok((SnapshotOutcome::SelectorsChanged, _events)) => (),
1378 _ => panic!("Expecting SelectorsChanged from process_component_data."),
1379 }
1380
1381 let selector_unfound: String = "my/component:root/path/to:value".to_string();
1382 sampler.push_metric(MetricConfig {
1383 project_id: None,
1384 selectors: vec![selectors::parse_verbose(&selector_unfound).unwrap()],
1385 metric_id: MetricId(1),
1386 metric_type: MetricType::Occurrence,
1388 event_codes: Vec::new(),
1389 upload_once: true,
1392 });
1393 sampler.rebuild_selector_data_structures();
1394 match sampler.process_component_data(
1395 &hierarchy,
1396 &InspectHandleName::name(DEFAULT_TREE_NAME),
1397 &"my/component".try_into().unwrap(),
1398 ) {
1399 Ok((SnapshotOutcome::SelectorsUnchanged, _events)) => (),
1401 _ => panic!("Expecting SelectorsUnchanged from process_component_data."),
1402 }
1403 }
1404
1405 #[fuchsia::test]
1408 fn decreasing_occurrence_is_correct() {
1409 let big_number = Property::Uint("foo".to_string(), 5);
1410 let small_number = Property::Uint("foo".to_string(), 2);
1411 let key = MetricCacheKey {
1412 handle_name: InspectHandleName::name("some_file"),
1413 selector: "sel".to_string(),
1414 };
1415
1416 assert_eq!(
1417 process_sample_for_data_type(
1418 &big_number,
1419 Some(&small_number),
1420 &key,
1421 &MetricType::Occurrence
1422 ),
1423 Some(MetricEventPayload::Count(3))
1424 );
1425 assert_eq!(
1426 process_sample_for_data_type(
1427 &small_number,
1428 Some(&big_number),
1429 &key,
1430 &MetricType::Occurrence
1431 ),
1432 None
1433 );
1434 }
1435
1436 #[fuchsia::test]
1438 fn test_upload_once() {
1439 let hierarchy = hierarchy! {
1440 root: {
1441 value_one: 0,
1442 value_two: 1,
1443 }
1444 };
1445
1446 let mut sampler = ProjectSampler {
1447 archive_reader: ArchiveReader::inspect(),
1448 metrics: vec![],
1449 metric_cache: RefCell::new(HashMap::new()),
1450 metric_loggers: HashMap::new(),
1451 project_id: ProjectId(1),
1452 poll_rate_sec: 3600,
1453 project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
1454 all_done: true,
1455 };
1456 sampler.push_metric(MetricConfig {
1457 project_id: None,
1458 selectors: vec![selectors::parse_verbose("my/component:root:value_one").unwrap()],
1459 metric_id: MetricId(1),
1460 metric_type: MetricType::Integer,
1461 event_codes: Vec::new(),
1462 upload_once: true,
1463 });
1464 sampler.push_metric(MetricConfig {
1465 project_id: None,
1466 selectors: vec![selectors::parse_verbose("my/component:root:value_two").unwrap()],
1467 metric_id: MetricId(2),
1468 metric_type: MetricType::Integer,
1469 event_codes: Vec::new(),
1470 upload_once: true,
1471 });
1472 sampler.rebuild_selector_data_structures();
1473
1474 match sampler.process_component_data(
1476 &hierarchy,
1477 &InspectHandleName::name(DEFAULT_TREE_NAME),
1478 &"my/component".try_into().unwrap(),
1479 ) {
1480 Ok((SnapshotOutcome::SelectorsChanged, _events)) => (),
1481 _ => panic!("Expecting SelectorsChanged from process_component_data."),
1482 }
1483
1484 let moniker: ExtendedMoniker = "my_component".try_into().unwrap();
1485 assert!(sampler
1486 .filter_metrics_by_moniker_and_tree_name(&moniker, DEFAULT_TREE_NAME)
1487 .collect::<Vec<_>>()
1488 .is_empty());
1489 }
1490
1491 struct EventCountTesterParams {
1492 new_val: Property,
1493 old_val: Option<Property>,
1494 process_ok: bool,
1495 event_made: bool,
1496 diff: i64,
1497 }
1498
1499 fn process_occurence_tester(params: EventCountTesterParams) {
1500 let data_source = MetricCacheKey {
1501 handle_name: InspectHandleName::name("foo.file"),
1502 selector: "test:root:count".to_string(),
1503 };
1504 let event_res = process_occurence(¶ms.new_val, params.old_val.as_ref(), &data_source);
1505
1506 if !params.process_ok {
1507 assert!(event_res.is_err());
1508 return;
1509 }
1510
1511 assert!(event_res.is_ok());
1512
1513 let event_opt = event_res.unwrap();
1514
1515 if !params.event_made {
1516 assert!(event_opt.is_none());
1517 return;
1518 }
1519
1520 assert!(event_opt.is_some());
1521 let event = event_opt.unwrap();
1522 match event {
1523 MetricEventPayload::Count(count) => {
1524 assert_eq!(count, params.diff as u64);
1525 }
1526 _ => panic!("Expecting event counts."),
1527 }
1528 }
1529
1530 #[fuchsia::test]
1531 fn test_normal_process_occurence() {
1532 process_occurence_tester(EventCountTesterParams {
1533 new_val: Property::Int("count".to_string(), 1),
1534 old_val: None,
1535 process_ok: true,
1536 event_made: true,
1537 diff: 1,
1538 });
1539
1540 process_occurence_tester(EventCountTesterParams {
1541 new_val: Property::Int("count".to_string(), 1),
1542 old_val: Some(Property::Int("count".to_string(), 1)),
1543 process_ok: true,
1544 event_made: false,
1545 diff: -1,
1546 });
1547
1548 process_occurence_tester(EventCountTesterParams {
1549 new_val: Property::Int("count".to_string(), 3),
1550 old_val: Some(Property::Int("count".to_string(), 1)),
1551 process_ok: true,
1552 event_made: true,
1553 diff: 2,
1554 });
1555 }
1556
1557 #[fuchsia::test]
1558 fn test_data_type_changing_process_occurence() {
1559 process_occurence_tester(EventCountTesterParams {
1560 new_val: Property::Int("count".to_string(), 1),
1561 old_val: None,
1562 process_ok: true,
1563 event_made: true,
1564 diff: 1,
1565 });
1566
1567 process_occurence_tester(EventCountTesterParams {
1568 new_val: Property::Uint("count".to_string(), 1),
1569 old_val: None,
1570 process_ok: true,
1571 event_made: true,
1572 diff: 1,
1573 });
1574
1575 process_occurence_tester(EventCountTesterParams {
1576 new_val: Property::Uint("count".to_string(), 3),
1577 old_val: Some(Property::Int("count".to_string(), 1)),
1578 process_ok: true,
1579 event_made: true,
1580 diff: 3,
1581 });
1582
1583 process_occurence_tester(EventCountTesterParams {
1584 new_val: Property::String("count".to_string(), "big_oof".to_string()),
1585 old_val: Some(Property::Int("count".to_string(), 1)),
1586 process_ok: false,
1587 event_made: false,
1588 diff: -1,
1589 });
1590 }
1591
1592 #[fuchsia::test]
1593 fn test_event_count_negatives_and_overflows() {
1594 process_occurence_tester(EventCountTesterParams {
1595 new_val: Property::Int("count".to_string(), -11),
1596 old_val: None,
1597 process_ok: false,
1598 event_made: false,
1599 diff: -1,
1600 });
1601
1602 process_occurence_tester(EventCountTesterParams {
1603 new_val: Property::Int("count".to_string(), 9),
1604 old_val: Some(Property::Int("count".to_string(), 10)),
1605 process_ok: false,
1606 event_made: false,
1607 diff: -1,
1608 });
1609
1610 process_occurence_tester(EventCountTesterParams {
1611 new_val: Property::Uint("count".to_string(), u64::MAX),
1612 old_val: None,
1613 process_ok: false,
1614 event_made: false,
1615 diff: -1,
1616 });
1617
1618 let i64_max_in_u64: u64 = i64::MAX.try_into().unwrap();
1619
1620 process_occurence_tester(EventCountTesterParams {
1621 new_val: Property::Uint("count".to_string(), i64_max_in_u64 + 1),
1622 old_val: Some(Property::Uint("count".to_string(), 1)),
1623 process_ok: true,
1624 event_made: true,
1625 diff: i64::MAX,
1626 });
1627
1628 process_occurence_tester(EventCountTesterParams {
1629 new_val: Property::Uint("count".to_string(), i64_max_in_u64 + 2),
1630 old_val: Some(Property::Uint("count".to_string(), 1)),
1631 process_ok: false,
1632 event_made: false,
1633 diff: -1,
1634 });
1635 }
1636
1637 struct IntTesterParams {
1638 new_val: Property,
1639 process_ok: bool,
1640 sample: i64,
1641 }
1642
1643 fn process_int_tester(params: IntTesterParams) {
1644 let data_source = MetricCacheKey {
1645 handle_name: InspectHandleName::name("foo.file"),
1646 selector: "test:root:count".to_string(),
1647 };
1648 let event_res = process_int(¶ms.new_val, &data_source);
1649
1650 if !params.process_ok {
1651 assert!(event_res.is_err());
1652 return;
1653 }
1654
1655 assert!(event_res.is_ok());
1656
1657 let event = event_res.expect("event should be Ok").expect("event should be Some");
1658 match event {
1659 MetricEventPayload::IntegerValue(val) => {
1660 assert_eq!(val, params.sample);
1661 }
1662 _ => panic!("Expecting event counts."),
1663 }
1664 }
1665
1666 #[fuchsia::test]
1667 fn test_normal_process_int() {
1668 process_int_tester(IntTesterParams {
1669 new_val: Property::Int("count".to_string(), 13),
1670 process_ok: true,
1671 sample: 13,
1672 });
1673
1674 process_int_tester(IntTesterParams {
1675 new_val: Property::Int("count".to_string(), -13),
1676 process_ok: true,
1677 sample: -13,
1678 });
1679
1680 process_int_tester(IntTesterParams {
1681 new_val: Property::Int("count".to_string(), 0),
1682 process_ok: true,
1683 sample: 0,
1684 });
1685
1686 process_int_tester(IntTesterParams {
1687 new_val: Property::Uint("count".to_string(), 13),
1688 process_ok: true,
1689 sample: 13,
1690 });
1691
1692 process_int_tester(IntTesterParams {
1693 new_val: Property::String("count".to_string(), "big_oof".to_string()),
1694 process_ok: false,
1695 sample: -1,
1696 });
1697 }
1698
1699 #[fuchsia::test]
1700 fn test_int_edge_cases() {
1701 process_int_tester(IntTesterParams {
1702 new_val: Property::Int("count".to_string(), i64::MAX),
1703 process_ok: true,
1704 sample: i64::MAX,
1705 });
1706
1707 process_int_tester(IntTesterParams {
1708 new_val: Property::Int("count".to_string(), i64::MIN),
1709 process_ok: true,
1710 sample: i64::MIN,
1711 });
1712
1713 let i64_max_in_u64: u64 = i64::MAX.try_into().unwrap();
1714
1715 process_int_tester(IntTesterParams {
1716 new_val: Property::Uint("count".to_string(), i64_max_in_u64),
1717 process_ok: true,
1718 sample: i64::MAX,
1719 });
1720
1721 process_int_tester(IntTesterParams {
1722 new_val: Property::Uint("count".to_string(), i64_max_in_u64 + 1),
1723 process_ok: false,
1724 sample: -1,
1725 });
1726 }
1727
1728 struct StringTesterParams {
1729 sample: Property,
1730 process_ok: bool,
1731 previous_sample: Option<Property>,
1732 }
1733
1734 fn process_string_tester(params: StringTesterParams) {
1735 let metric_cache_key = MetricCacheKey {
1736 handle_name: InspectHandleName::name("foo.file"),
1737 selector: "test:root:string_val".to_string(),
1738 };
1739
1740 let event = process_sample_for_data_type(
1741 ¶ms.sample,
1742 params.previous_sample.as_ref(),
1743 &metric_cache_key,
1744 &MetricType::String,
1745 );
1746
1747 if !params.process_ok {
1748 assert!(event.is_none());
1749 return;
1750 }
1751
1752 match event.unwrap() {
1753 MetricEventPayload::StringValue(val) => {
1754 assert_eq!(val.as_str(), params.sample.string().unwrap());
1755 }
1756 _ => panic!("Expecting event with StringValue."),
1757 }
1758 }
1759
1760 #[fuchsia::test]
1761 fn test_process_string() {
1762 process_string_tester(StringTesterParams {
1763 sample: Property::String("string_val".to_string(), "Hello, world!".to_string()),
1764 process_ok: true,
1765 previous_sample: None,
1766 });
1767
1768 process_string_tester(StringTesterParams {
1771 sample: Property::String("string_val".to_string(), "Hello, world!".to_string()),
1772 process_ok: true,
1773 previous_sample: Some(Property::String("string_val".to_string(), "Uh oh!".to_string())),
1774 });
1775
1776 process_string_tester(StringTesterParams {
1779 sample: Property::Int("string_val".to_string(), 123),
1780 process_ok: false,
1781 previous_sample: None,
1782 });
1783
1784 process_string_tester(StringTesterParams {
1785 sample: Property::Uint("string_val".to_string(), 123),
1786 process_ok: false,
1787 previous_sample: None,
1788 });
1789 }
1790
1791 fn convert_vector_to_int_histogram(hist: Vec<i64>) -> Property {
1792 let size = hist.len();
1793 Property::IntArray(
1794 "Bloop".to_string(),
1795 ArrayContent::LinearHistogram(LinearHistogram {
1796 floor: 1,
1797 step: 1,
1798 counts: hist,
1799 size,
1800 indexes: None,
1801 }),
1802 )
1803 }
1804
1805 fn convert_vector_to_uint_histogram(hist: Vec<u64>) -> Property<String> {
1806 let size = hist.len();
1807 Property::UintArray(
1808 "Bloop".to_string(),
1809 ArrayContent::LinearHistogram(LinearHistogram {
1810 floor: 1,
1811 step: 1,
1812 counts: hist,
1813 size,
1814 indexes: None,
1815 }),
1816 )
1817 }
1818
1819 fn convert_vectors_to_int_histogram(counts: Vec<i64>, indexes: Vec<usize>) -> Property<String> {
1821 let size = 100;
1822 Property::IntArray(
1823 "Bloop".to_string(),
1824 ArrayContent::LinearHistogram(LinearHistogram {
1825 floor: 1,
1826 step: 1,
1827 counts,
1828 size,
1829 indexes: Some(indexes),
1830 }),
1831 )
1832 }
1833
1834 fn convert_vectors_to_uint_histogram(
1835 counts: Vec<u64>,
1836 indexes: Vec<usize>,
1837 ) -> Property<String> {
1838 let size = 100;
1839 Property::UintArray(
1840 "Bloop".to_string(),
1841 ArrayContent::LinearHistogram(LinearHistogram {
1842 floor: 1,
1843 step: 1,
1844 counts,
1845 size,
1846 indexes: Some(indexes),
1847 }),
1848 )
1849 }
1850
1851 struct IntHistogramTesterParams {
1852 new_val: Property,
1853 old_val: Option<Property>,
1854 process_ok: bool,
1855 event_made: bool,
1856 diff: Vec<(u32, u64)>,
1857 }
1858 fn process_int_histogram_tester(params: IntHistogramTesterParams) {
1859 let data_source = MetricCacheKey {
1860 handle_name: InspectHandleName::name("foo.file"),
1861 selector: "test:root:count".to_string(),
1862 };
1863 let event_res =
1864 process_int_histogram(¶ms.new_val, params.old_val.as_ref(), &data_source);
1865
1866 if !params.process_ok {
1867 assert!(event_res.is_err());
1868 return;
1869 }
1870
1871 assert!(event_res.is_ok());
1872
1873 let event_opt = event_res.unwrap();
1874 if !params.event_made {
1875 assert!(event_opt.is_none());
1876 return;
1877 }
1878
1879 assert!(event_opt.is_some());
1880
1881 let event = event_opt.unwrap();
1882 match event.clone() {
1883 MetricEventPayload::Histogram(histogram_buckets) => {
1884 assert_eq!(histogram_buckets.len(), params.diff.len());
1885
1886 let expected_histogram_buckets = params
1887 .diff
1888 .iter()
1889 .map(|(index, count)| HistogramBucket { index: *index, count: *count })
1890 .collect::<Vec<HistogramBucket>>();
1891
1892 assert_eq!(histogram_buckets, expected_histogram_buckets);
1893 }
1894 _ => panic!("Expecting int histogram."),
1895 }
1896 }
1897
1898 #[fuchsia::test]
1901 fn test_normal_process_int_histogram() {
1902 let new_i64_sample = convert_vector_to_int_histogram(vec![1, 1, 1, 1]);
1903 let new_u64_sample = convert_vector_to_uint_histogram(vec![1, 1, 1, 1]);
1904
1905 process_int_histogram_tester(IntHistogramTesterParams {
1906 new_val: new_i64_sample,
1907 old_val: None,
1908 process_ok: true,
1909 event_made: true,
1910 diff: vec![(0, 1), (1, 1), (2, 1), (3, 1)],
1911 });
1912
1913 process_int_histogram_tester(IntHistogramTesterParams {
1914 new_val: new_u64_sample,
1915 old_val: None,
1916 process_ok: true,
1917 event_made: true,
1918 diff: vec![(0, 1), (1, 1), (2, 1), (3, 1)],
1919 });
1920
1921 let new_u64_sample = convert_vector_to_uint_histogram(vec![u64::MAX, u64::MAX, u64::MAX]);
1924 process_int_histogram_tester(IntHistogramTesterParams {
1925 new_val: new_u64_sample,
1926 old_val: None,
1927 process_ok: true,
1928 event_made: true,
1929 diff: vec![(0, u64::MAX), (1, u64::MAX), (2, u64::MAX)],
1930 });
1931
1932 let new_u64_sample = convert_vector_to_uint_histogram(Vec::new());
1934 process_int_histogram_tester(IntHistogramTesterParams {
1935 new_val: new_u64_sample,
1936 old_val: None,
1937 process_ok: true,
1938 event_made: false,
1939 diff: Vec::new(),
1940 });
1941
1942 let new_u64_sample = convert_vector_to_uint_histogram(vec![0, 0, 0, 0]);
1943 process_int_histogram_tester(IntHistogramTesterParams {
1944 new_val: new_u64_sample,
1945 old_val: None,
1946 process_ok: true,
1947 event_made: false,
1948 diff: Vec::new(),
1949 });
1950
1951 let new_u64_sample = convert_vector_to_uint_histogram(vec![2, 1, 2, 1]);
1953 let old_u64_sample = Some(convert_vector_to_uint_histogram(vec![1, 1, 0, 1]));
1954 process_int_histogram_tester(IntHistogramTesterParams {
1955 new_val: new_u64_sample,
1956 old_val: old_u64_sample,
1957 process_ok: true,
1958 event_made: true,
1959 diff: vec![(0, 1), (2, 2)],
1960 });
1961
1962 let new_i64_sample = convert_vector_to_int_histogram(vec![5, 2, 1, 3]);
1963 let old_i64_sample = Some(convert_vector_to_int_histogram(vec![1, 1, 1, 1]));
1964 process_int_histogram_tester(IntHistogramTesterParams {
1965 new_val: new_i64_sample,
1966 old_val: old_i64_sample,
1967 process_ok: true,
1968 event_made: true,
1969 diff: vec![(0, 4), (1, 1), (3, 2)],
1970 });
1971
1972 let new_u64_sample = convert_vector_to_uint_histogram(vec![2, 1, 1, 1]);
1974 let old_i64_sample = Some(convert_vector_to_int_histogram(vec![1, 1, 1, 1]));
1975 process_int_histogram_tester(IntHistogramTesterParams {
1976 new_val: new_u64_sample,
1977 old_val: old_i64_sample,
1978 process_ok: true,
1979 event_made: true,
1980 diff: vec![(0, 2), (1, 1), (2, 1), (3, 1)],
1981 });
1982 }
1983
1984 #[fuchsia::test]
1986 fn test_normal_process_condensed_histograms() {
1987 let new_u64_sample = convert_vectors_to_int_histogram(vec![2, 6], vec![3, 5]);
1988 let old_u64_sample = Some(convert_vectors_to_int_histogram(vec![1], vec![5]));
1989 process_int_histogram_tester(IntHistogramTesterParams {
1990 new_val: new_u64_sample,
1991 old_val: old_u64_sample,
1992 process_ok: true,
1993 event_made: true,
1994 diff: vec![(3, 2), (5, 5)],
1995 });
1996 let new_i64_sample = convert_vectors_to_uint_histogram(vec![2, 4], vec![5, 3]);
1997 let old_i64_sample = None;
1998 process_int_histogram_tester(IntHistogramTesterParams {
1999 new_val: new_i64_sample,
2000 old_val: old_i64_sample,
2001 process_ok: true,
2002 event_made: true,
2003 diff: vec![(3, 4), (5, 2)],
2004 });
2005 }
2006
2007 #[fuchsia::test]
2008 fn test_errorful_process_int_histogram() {
2009 let new_u64_sample = convert_vector_to_uint_histogram(vec![1, 1, 1, 1]);
2011 let old_u64_sample = Some(convert_vector_to_uint_histogram(vec![1, 1, 1, 1, 1]));
2012 process_int_histogram_tester(IntHistogramTesterParams {
2013 new_val: new_u64_sample,
2014 old_val: old_u64_sample,
2015 process_ok: false,
2016 event_made: false,
2017 diff: Vec::new(),
2018 });
2019
2020 let new_i64_sample = convert_vector_to_int_histogram(vec![1, 1, -1, 1]);
2022 process_int_histogram_tester(IntHistogramTesterParams {
2023 new_val: new_i64_sample,
2024 old_val: None,
2025 process_ok: false,
2026 event_made: false,
2027 diff: Vec::new(),
2028 });
2029
2030 let new_i64_sample = convert_vector_to_int_histogram(vec![5, 2, 1, 3]);
2032 let old_i64_sample = Some(convert_vector_to_int_histogram(vec![6, 1, 1, 1]));
2033 process_int_histogram_tester(IntHistogramTesterParams {
2034 new_val: new_i64_sample,
2035 old_val: old_i64_sample,
2036 process_ok: false,
2037 event_made: false,
2038 diff: Vec::new(),
2039 });
2040 }
2041
2042 #[fuchsia::test]
2047 async fn test_inspect_handle_name_distinguishes_data() {
2048 let mut sampler = ProjectSampler {
2049 archive_reader: ArchiveReader::inspect(),
2050 metrics: vec![],
2051 metric_cache: RefCell::new(HashMap::new()),
2052 metric_loggers: HashMap::new(),
2053 project_id: ProjectId(1),
2054 poll_rate_sec: 3600,
2055 project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
2056 all_done: true,
2057 };
2058 let selector: String = "my/component:[...]root/branch:leaf".to_string();
2059 let metric_id = MetricId(1);
2060 let event_codes = vec![];
2061 sampler.push_metric(MetricConfig {
2062 project_id: None,
2063 selectors: vec![selectors::parse_verbose(&selector).unwrap()],
2064 metric_id,
2065 metric_type: MetricType::Occurrence,
2066 event_codes,
2067 upload_once: false,
2068 });
2069 sampler.rebuild_selector_data_structures();
2070
2071 let data1_value4 = vec![InspectDataBuilder::new(
2072 "my/component".try_into().unwrap(),
2073 "component-url",
2074 Timestamp::from_nanos(0),
2075 )
2076 .with_hierarchy(hierarchy! { root: {branch: {leaf: 4i32}}})
2077 .with_name(InspectHandleName::name("name1"))
2078 .build()];
2079 let data2_value3 = vec![InspectDataBuilder::new(
2080 "my/component".try_into().unwrap(),
2081 "component-url",
2082 Timestamp::from_nanos(0),
2083 )
2084 .with_hierarchy(hierarchy! { root: {branch: {leaf: 3i32}}})
2085 .with_name(InspectHandleName::name("name2"))
2086 .build()];
2087 let data1_value6 = vec![InspectDataBuilder::new(
2088 "my/component".try_into().unwrap(),
2089 "component-url",
2090 Timestamp::from_nanos(0),
2091 )
2092 .with_hierarchy(hierarchy! { root: {branch: {leaf: 6i32}}})
2093 .with_name(InspectHandleName::name("name1"))
2094 .build()];
2095 let data2_value8 = vec![InspectDataBuilder::new(
2096 "my/component".try_into().unwrap(),
2097 "component-url",
2098 Timestamp::from_nanos(0),
2099 )
2100 .with_hierarchy(hierarchy! { root: {branch: {leaf: 8i32}}})
2101 .with_name(InspectHandleName::name("name2"))
2102 .build()];
2103
2104 fn expect_one_metric_event_value(
2105 events: Result<Vec<EventToLog>, Error>,
2106 value: u64,
2107 context: &'static str,
2108 ) {
2109 let events = events.expect(context);
2110 assert_eq!(events.len(), 1, "Events len not 1: {}: {}", context, events.len());
2111 let event = &events[0];
2112 let (project_id, MetricEvent { payload, .. }) = event;
2113 assert_eq!(*project_id, ProjectId(1));
2114 if let fidl_fuchsia_metrics::MetricEventPayload::Count(payload) = payload {
2115 assert_eq!(
2116 payload, &value,
2117 "Wrong payload, expected {} got {} at {}",
2118 value, payload, context
2119 );
2120 } else {
2121 panic!("Expected MetricEventPayload::Count at {}, got {:?}", context, payload);
2122 }
2123 }
2124
2125 expect_one_metric_event_value(sampler.process_snapshot(data1_value4).await, 4, "first");
2126 expect_one_metric_event_value(sampler.process_snapshot(data2_value3).await, 3, "second");
2127 expect_one_metric_event_value(sampler.process_snapshot(data1_value6).await, 2, "third");
2128 expect_one_metric_event_value(sampler.process_snapshot(data2_value8).await, 5, "fourth");
2129 }
2130
2131 #[fuchsia::test]
2133 async fn project_id_can_be_overwritten_by_the_metric_project_id() {
2134 let mut sampler = ProjectSampler {
2135 archive_reader: ArchiveReader::inspect(),
2136 metrics: vec![],
2137 metric_cache: RefCell::new(HashMap::new()),
2138 metric_loggers: HashMap::new(),
2139 project_id: ProjectId(1),
2140 poll_rate_sec: 3600,
2141 project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
2142 all_done: true,
2143 };
2144 let selector: String = "my/component:[name=name1]root/branch:leaf".to_string();
2145 let metric_id = MetricId(1);
2146 let event_codes = vec![];
2147 sampler.push_metric(MetricConfig {
2148 project_id: Some(ProjectId(2)),
2149 selectors: vec![selectors::parse_verbose(&selector).unwrap()],
2150 metric_id,
2151 metric_type: MetricType::Occurrence,
2152 event_codes,
2153 upload_once: false,
2154 });
2155 sampler.rebuild_selector_data_structures();
2156
2157 let value = vec![InspectDataBuilder::new(
2158 "my/component".try_into().unwrap(),
2159 "component-url",
2160 Timestamp::from_nanos(0),
2161 )
2162 .with_hierarchy(hierarchy! { root: {branch: {leaf: 4i32}}})
2163 .with_name(InspectHandleName::name("name1"))
2164 .build()];
2165
2166 let events = sampler.process_snapshot(value).await.expect("processed snapshot");
2167 assert_eq!(events.len(), 1);
2168 let event = &events[0];
2169 let (project_id, MetricEvent { .. }) = event;
2170 assert_eq!(*project_id, ProjectId(2));
2171 }
2172}