1use crate::config::SamplerConfig;
90use crate::diagnostics::*;
91use anyhow::{format_err, Context, Error};
92use diagnostics_data::{Data, InspectHandleName};
93use diagnostics_hierarchy::{
94 ArrayContent, DiagnosticsHierarchy, ExponentialHistogram, LinearHistogram, Property,
95 SelectResult,
96};
97use diagnostics_reader::{ArchiveReader, Inspect, InspectArchiveReader, RetryConfig};
98use fidl_fuchsia_metrics::{
99 HistogramBucket, MetricEvent, MetricEventLoggerFactoryMarker, MetricEventLoggerFactoryProxy,
100 MetricEventLoggerProxy, MetricEventPayload, ProjectSpec,
101};
102use fuchsia_async as fasync;
103use fuchsia_component::client::connect_to_protocol;
104use fuchsia_inspect::{self as inspect, NumericProperty};
105use fuchsia_inspect_derive::WithInspect;
106use futures::channel::oneshot;
107use futures::future::join_all;
108use futures::stream::FuturesUnordered;
109use futures::{select, StreamExt};
110use log::{info, warn};
111use moniker::ExtendedMoniker;
112use sampler_config::runtime::{MetricConfig, ProjectConfig};
113use sampler_config::{MetricType, ProjectId};
114use selectors::SelectorExt;
115use std::cell::{Ref, RefCell, RefMut};
116use std::collections::hash_map::Entry;
117use std::collections::HashMap;
118use std::sync::Arc;
119
120type EventToLog = (ProjectId, MetricEvent);
124
125pub struct TaskCancellation {
126 senders: Vec<oneshot::Sender<()>>,
127 _sampler_executor_stats: Arc<SamplerExecutorStats>,
128 execution_context: fasync::Task<Vec<ProjectSampler>>,
129}
130
131impl TaskCancellation {
132 pub async fn run_without_cancellation(self) {
136 self.execution_context.await;
137 }
138
139 pub async fn perform_reboot_cleanup(self) {
140 self.senders.into_iter().for_each(|sender| {
142 sender
143 .send(())
144 .unwrap_or_else(|err| warn!("Failed to send reboot over oneshot: {:?}", err))
145 });
146
147 let project_samplers: Vec<ProjectSampler> = self.execution_context.await;
150
151 let mut reader = ArchiveReader::inspect();
152
153 for project_sampler in &project_samplers {
154 for metric in &project_sampler.metrics {
155 for selector in metric.borrow().selectors.iter() {
156 reader.add_selector(selector.clone());
157 }
158 }
159 }
160
161 reader.retry(RetryConfig::never());
162
163 let mut reboot_processor = RebootSnapshotProcessor { reader, project_samplers };
164 reboot_processor
167 .process_reboot_sample()
168 .await
169 .unwrap_or_else(|e| warn!("Reboot snapshot failed! {:?}", e));
170 }
171}
172
173struct RebootSnapshotProcessor {
174 reader: InspectArchiveReader,
177 project_samplers: Vec<ProjectSampler>,
180}
181
182impl RebootSnapshotProcessor {
183 pub async fn process_reboot_sample(&mut self) -> Result<(), Error> {
184 let snapshot_data = self.reader.snapshot().await?;
185 for data_packet in snapshot_data {
186 let moniker = data_packet.moniker;
187 match data_packet.payload {
188 None => {
189 process_schema_errors(&data_packet.metadata.errors, &moniker);
190 }
191 Some(payload) => {
192 self.process_single_payload(payload, &data_packet.metadata.name, &moniker).await
193 }
194 }
195 }
196 Ok(())
197 }
198
199 async fn process_single_payload(
200 &mut self,
201 hierarchy: DiagnosticsHierarchy<String>,
202 inspect_handle_name: &InspectHandleName,
203 moniker: &ExtendedMoniker,
204 ) {
205 let mut projects = self
206 .project_samplers
207 .iter_mut()
208 .filter(|p| {
209 p.filter_metrics_by_moniker_and_tree_name(moniker, inspect_handle_name.as_ref())
210 .next()
211 .is_some()
212 })
213 .peekable();
214 if projects.peek().is_none() {
215 warn!(
216 moniker:%,
217 tree_name = inspect_handle_name.as_ref();
218 "no metrics found for moniker and tree_name combination"
219 );
220 return;
221 }
222
223 for project_sampler in projects {
224 let maybe_err = match project_sampler.process_component_data(
228 &hierarchy,
229 inspect_handle_name,
230 moniker,
231 ) {
232 Err(err) => Some(err),
233 Ok((_selector_changes, events_to_log)) => {
234 project_sampler.log_events(events_to_log).await.err()
235 }
236 };
237 if let Some(err) = maybe_err {
238 warn!(err:?; "A project sampler failed to process a reboot sample");
239 }
240 }
241 }
242}
243
244pub struct SamplerExecutor {
246 project_samplers: Vec<ProjectSampler>,
247 sampler_executor_stats: Arc<SamplerExecutorStats>,
248}
249
250impl SamplerExecutor {
251 pub async fn new(sampler_config: SamplerConfig) -> Result<Self, Error> {
254 let metric_logger_factory: Arc<MetricEventLoggerFactoryProxy> = Arc::new(
255 connect_to_protocol::<MetricEventLoggerFactoryMarker>()
256 .context("Failed to connect to the Metric LoggerFactory")?,
257 );
258
259 let minimum_sample_rate_sec = sampler_config.minimum_sample_rate_sec;
260
261 let sampler_executor_stats = Arc::new(
262 SamplerExecutorStats::new()
263 .with_inspect(inspect::component::inspector().root(), "sampler_executor_stats")
264 .unwrap_or_else(|err| {
265 warn!(err:?; "Failed to attach inspector to SamplerExecutorStats struct");
266 SamplerExecutorStats::default()
267 }),
268 );
269
270 sampler_executor_stats
271 .total_project_samplers_configured
272 .add(sampler_config.project_configs.len() as u64);
273
274 let mut project_to_stats_map: HashMap<ProjectId, Arc<ProjectSamplerStats>> = HashMap::new();
275 let project_sampler_futures =
278 sampler_config.project_configs.iter().cloned().map(|project_config| {
279 let project_sampler_stats =
280 project_to_stats_map.entry(project_config.project_id).or_insert_with(|| {
281 Arc::new(
282 ProjectSamplerStats::new()
283 .with_inspect(
284 &sampler_executor_stats.inspect_node,
285 format!("project_{}", project_config.project_id,),
286 )
287 .unwrap_or_else(|err| {
288 warn!(
289 err:?;
290 "Failed to attach inspector to ProjectSamplerStats struct"
291 );
292 ProjectSamplerStats::default()
293 }),
294 )
295 });
296 ProjectSampler::new(
297 project_config,
298 metric_logger_factory.clone(),
299 minimum_sample_rate_sec,
300 project_sampler_stats.clone(),
301 )
302 });
303
304 let mut project_samplers: Vec<ProjectSampler> = Vec::new();
305 for project_sampler in join_all(project_sampler_futures).await.into_iter() {
306 match project_sampler {
307 Ok(project_sampler) => project_samplers.push(project_sampler),
308 Err(e) => {
309 warn!("ProjectSampler construction failed: {:?}", e);
310 }
311 }
312 }
313 Ok(SamplerExecutor { project_samplers, sampler_executor_stats })
314 }
315
316 pub fn execute(self) -> TaskCancellation {
319 let task_cancellation_owned_stats = self.sampler_executor_stats.clone();
323 let execution_context_owned_stats = self.sampler_executor_stats.clone();
324
325 let (senders, mut spawned_tasks): (Vec<oneshot::Sender<()>>, FuturesUnordered<_>) = self
326 .project_samplers
327 .into_iter()
328 .map(|project_sampler| {
329 let (sender, receiver) = oneshot::channel::<()>();
330 (sender, project_sampler.spawn(receiver))
331 })
332 .unzip();
333
334 let execution_context = fasync::Task::local(async move {
335 let mut healthily_exited_samplers = Vec::new();
336 while let Some(sampler_result) = spawned_tasks.next().await {
337 match sampler_result {
338 Err(e) => {
339 warn!("A spawned sampler has failed: {:?}", e);
342 execution_context_owned_stats.errorfully_exited_samplers.add(1);
343 }
344 Ok(ProjectSamplerTaskExit::RebootTriggered(sampler)) => {
345 healthily_exited_samplers.push(sampler);
346 execution_context_owned_stats.reboot_exited_samplers.add(1);
347 }
348 Ok(ProjectSamplerTaskExit::WorkCompleted) => {
349 info!("A sampler completed its workload, and exited.");
350 execution_context_owned_stats.healthily_exited_samplers.add(1);
351 }
352 }
353 }
354
355 healthily_exited_samplers
356 });
357
358 TaskCancellation {
359 execution_context,
360 senders,
361 _sampler_executor_stats: task_cancellation_owned_stats,
362 }
363 }
364}
365
366pub struct ProjectSampler {
367 archive_reader: InspectArchiveReader,
368 metrics: Vec<RefCell<MetricConfig>>,
370 metric_cache: RefCell<HashMap<MetricCacheKey, Property>>,
373 metric_loggers: HashMap<ProjectId, MetricEventLoggerProxy>,
376 poll_rate_sec: i64,
379 project_sampler_stats: Arc<ProjectSamplerStats>,
383 project_id: ProjectId,
386 all_done: bool,
389}
390
391#[derive(Debug, Eq, Hash, PartialEq)]
392struct MetricCacheKey {
393 handle_name: InspectHandleName,
394 selector: String,
395}
396
397#[allow(clippy::large_enum_variant)] pub enum ProjectSamplerTaskExit {
399 RebootTriggered(ProjectSampler),
402 WorkCompleted,
404}
405
406pub enum ProjectSamplerEvent {
407 TimerTriggered,
408 TimerDied,
409 RebootTriggered,
410 RebootChannelClosed(Error),
411}
412
413#[derive(PartialEq)]
418enum SnapshotOutcome {
419 SelectorsChanged,
420 SelectorsUnchanged,
421}
422
423impl ProjectSampler {
424 pub async fn new(
425 config: Arc<ProjectConfig>,
426 metric_logger_factory: Arc<MetricEventLoggerFactoryProxy>,
427 minimum_sample_rate_sec: i64,
428 project_sampler_stats: Arc<ProjectSamplerStats>,
429 ) -> Result<ProjectSampler, Error> {
430 let customer_id = config.customer_id;
431 let project_id = config.project_id;
432 let poll_rate_sec = config.poll_rate_sec;
433 if poll_rate_sec < minimum_sample_rate_sec {
434 return Err(format_err!(
435 concat!(
436 "Project with id: {:?} uses a polling rate:",
437 " {:?} below minimum configured poll rate: {:?}"
438 ),
439 project_id,
440 poll_rate_sec,
441 minimum_sample_rate_sec,
442 ));
443 }
444
445 project_sampler_stats.project_sampler_count.add(1);
446 project_sampler_stats.metrics_configured.add(config.metrics.len() as u64);
447
448 let mut metric_loggers = HashMap::new();
449 if *project_id != 0 {
452 let (metric_logger_proxy, metrics_server_end) = fidl::endpoints::create_proxy();
453 let project_spec = ProjectSpec {
454 customer_id: Some(*customer_id),
455 project_id: Some(*project_id),
456 ..ProjectSpec::default()
457 };
458 metric_logger_factory
459 .create_metric_event_logger(&project_spec, metrics_server_end)
460 .await?
461 .map_err(|e| format_err!("error response for project {}: {:?}", project_id, e))?;
462 metric_loggers.insert(project_id, metric_logger_proxy);
463 }
464 for metric in &config.metrics {
465 if let Some(metric_project_id) = metric.project_id {
466 if let Entry::Vacant(entry) = metric_loggers.entry(metric_project_id) {
467 let (metric_logger_proxy, metrics_server_end) = fidl::endpoints::create_proxy();
468 let project_spec = ProjectSpec {
469 customer_id: Some(*customer_id),
470 project_id: Some(*metric_project_id),
471 ..ProjectSpec::default()
472 };
473 metric_logger_factory
474 .create_metric_event_logger(&project_spec, metrics_server_end)
475 .await?
476 .map_err(|e|
477 format_err!(
478 "error response for project {} while creating metric logger {}: {:?}",
479 project_id,
480 metric_project_id,
481 e
482 ))?;
483 entry.insert(metric_logger_proxy);
484 }
485 }
486 }
487
488 let mut project_sampler = ProjectSampler {
489 project_id,
490 archive_reader: ArchiveReader::inspect(),
491 metrics: config.metrics.iter().map(|m| RefCell::new(m.clone())).collect(),
492 metric_cache: RefCell::new(HashMap::new()),
493 metric_loggers,
494 poll_rate_sec,
495 project_sampler_stats,
496 all_done: true,
497 };
498 project_sampler.rebuild_selector_data_structures();
500 Ok(project_sampler)
501 }
502
503 pub fn spawn(
504 mut self,
505 mut reboot_oneshot: oneshot::Receiver<()>,
506 ) -> fasync::Task<Result<ProjectSamplerTaskExit, Error>> {
507 fasync::Task::local(async move {
508 let mut periodic_timer =
509 fasync::Interval::new(zx::MonotonicDuration::from_seconds(self.poll_rate_sec));
510 loop {
511 let done = select! {
512 opt = periodic_timer.next() => {
513 if opt.is_some() {
514 ProjectSamplerEvent::TimerTriggered
515 } else {
516 ProjectSamplerEvent::TimerDied
517 }
518 },
519 oneshot_res = reboot_oneshot => {
520 match oneshot_res {
521 Ok(()) => {
522 ProjectSamplerEvent::RebootTriggered
523 },
524 Err(e) => {
525 ProjectSamplerEvent::RebootChannelClosed(
526 format_err!("Oneshot closure error: {:?}", e))
527 }
528 }
529 }
530 };
531
532 match done {
533 ProjectSamplerEvent::TimerDied => {
534 return Err(format_err!(
535 "The ProjectSampler timer died, something went wrong.",
536 ));
537 }
538 ProjectSamplerEvent::RebootChannelClosed(e) => {
539 return Err(format_err!(
542 "The Reboot signaling oneshot died, something went wrong: {:?}",
543 e
544 ));
545 }
546 ProjectSamplerEvent::RebootTriggered => {
547 return Ok(ProjectSamplerTaskExit::RebootTriggered(self));
550 }
551 ProjectSamplerEvent::TimerTriggered => {
552 self.process_next_snapshot().await?;
553 if self.is_all_done() {
558 return Ok(ProjectSamplerTaskExit::WorkCompleted);
559 }
560 }
561 }
562 }
563 })
564 }
565
566 async fn process_next_snapshot(&mut self) -> Result<(), Error> {
567 let snapshot_data = self.archive_reader.snapshot().await?;
568 let events_to_log = self.process_snapshot(snapshot_data).await?;
569 self.log_events(events_to_log).await?;
570 Ok(())
571 }
572
573 async fn process_snapshot(
574 &mut self,
575 snapshot: Vec<Data<Inspect>>,
576 ) -> Result<Vec<EventToLog>, Error> {
577 let mut selectors_changed = false;
578 let mut events_to_log = vec![];
579 for data_packet in snapshot.iter() {
580 match &data_packet.payload {
581 None => {
582 process_schema_errors(&data_packet.metadata.errors, &data_packet.moniker);
583 }
584 Some(payload) => {
585 let (selector_outcome, mut events) = self.process_component_data(
586 payload,
587 &data_packet.metadata.name,
588 &data_packet.moniker,
589 )?;
590 if selector_outcome == SnapshotOutcome::SelectorsChanged {
591 selectors_changed = true;
592 }
593 events_to_log.append(&mut events);
594 }
595 }
596 }
597 if selectors_changed {
598 self.rebuild_selector_data_structures();
599 }
600 Ok(events_to_log)
601 }
602
603 fn is_all_done(&self) -> bool {
604 self.all_done
605 }
606
607 fn rebuild_selector_data_structures(&mut self) {
608 self.archive_reader = ArchiveReader::inspect();
609 for metric in &self.metrics {
610 for selector in metric.borrow().selectors.iter().cloned() {
611 self.archive_reader.add_selector(selector);
612 self.all_done = false;
613 }
614 }
615 self.archive_reader.retry(RetryConfig::never());
616 }
617
618 fn filter_metrics_by_moniker_and_tree_name<'a>(
619 &'a self,
620 moniker: &'a ExtendedMoniker,
621 tree_name: &'a str,
622 ) -> impl Iterator<Item = &'a RefCell<MetricConfig>> {
623 self.metrics.iter().filter(|metric| {
624 moniker
625 .match_against_selectors_and_tree_name(tree_name, metric.borrow().selectors.iter())
626 .next()
627 .is_some()
628 })
629 }
630
631 fn process_component_data(
632 &mut self,
633 payload: &DiagnosticsHierarchy,
634 inspect_handle_name: &InspectHandleName,
635 moniker: &ExtendedMoniker,
636 ) -> Result<(SnapshotOutcome, Vec<EventToLog>), Error> {
637 let filtered_metrics =
638 self.filter_metrics_by_moniker_and_tree_name(moniker, inspect_handle_name.as_ref());
639 let mut snapshot_outcome = SnapshotOutcome::SelectorsUnchanged;
640 let mut events_to_log = vec![];
641 for metric in filtered_metrics {
642 let mut selector_to_keep = None;
643 let project_id = metric.borrow().project_id.unwrap_or(self.project_id);
644 for (selector_idx, selector) in metric.borrow().selectors.iter().enumerate() {
645 let found_properties =
646 diagnostics_hierarchy::select_from_hierarchy(payload, selector)?;
647 match found_properties {
648 SelectResult::Properties(p) if p.is_empty() => {}
651 SelectResult::Properties(p) if p.len() == 1 => {
652 let metric_cache_key = MetricCacheKey {
653 handle_name: inspect_handle_name.clone(),
654 selector: selectors::selector_to_string(selector, Default::default())
655 .unwrap(),
656 };
657 if let Some(event) = Self::prepare_sample(
658 metric.borrow(),
659 self.metric_cache.borrow_mut(),
660 metric_cache_key,
661 p[0],
662 )? {
663 events_to_log.push((project_id, event));
664 }
665 selector_to_keep = Some(selector_idx);
666 break;
667 }
668 too_many => {
669 warn!(
670 too_many:?,
671 selector:?;
672 "Too many matches for selector"
673 );
674 }
675 }
676 }
677
678 if let Some(selector_idx) = selector_to_keep {
679 if Self::update_selectors_for_metric(metric.borrow_mut(), selector_idx) {
680 snapshot_outcome = SnapshotOutcome::SelectorsChanged;
681 }
682 }
683 }
684 Ok((snapshot_outcome, events_to_log))
685 }
686
687 fn update_selectors_for_metric(
688 mut metric: RefMut<'_, MetricConfig>,
689 keep_selector_idx: usize,
690 ) -> bool {
691 if metric.upload_once {
692 metric.selectors = Vec::new();
693 return true;
694 }
695 let deleted = metric.selectors.len() > 1;
696 metric.selectors = vec![metric.selectors.remove(keep_selector_idx)];
697 deleted
698 }
699
700 fn prepare_sample<'a>(
701 metric: Ref<'a, MetricConfig>,
702 metric_cache: RefMut<'a, HashMap<MetricCacheKey, Property>>,
703 metric_cache_key: MetricCacheKey,
704 new_sample: &Property,
705 ) -> Result<Option<MetricEvent>, Error> {
706 let previous_sample_opt: Option<&Property> = metric_cache.get(&metric_cache_key);
707
708 if let Some(payload) = process_sample_for_data_type(
709 new_sample,
710 previous_sample_opt,
711 &metric_cache_key,
712 &metric.metric_type,
713 ) {
714 Self::maybe_update_cache(
715 metric_cache,
716 new_sample,
717 &metric.metric_type,
718 metric_cache_key,
719 );
720 Ok(Some(MetricEvent {
721 metric_id: *metric.metric_id,
722 event_codes: metric.event_codes.iter().map(|code| **code).collect(),
723 payload,
724 }))
725 } else {
726 Ok(None)
727 }
728 }
729
730 async fn log_events(&mut self, events: Vec<EventToLog>) -> Result<(), Error> {
731 for (project_id, event) in events.into_iter() {
732 self.metric_loggers
733 .get(&project_id)
734 .as_ref()
735 .unwrap()
736 .log_metric_events(&[event])
737 .await?
738 .map_err(|e| format_err!("error from cobalt: {:?}", e))?;
739 self.project_sampler_stats.cobalt_logs_sent.add(1);
740 }
741 Ok(())
742 }
743
744 fn maybe_update_cache(
745 mut cache: RefMut<'_, HashMap<MetricCacheKey, Property>>,
746 new_sample: &Property,
747 data_type: &MetricType,
748 metric_cache_key: MetricCacheKey,
749 ) {
750 match data_type {
751 MetricType::Occurrence | MetricType::IntHistogram => {
752 cache.insert(metric_cache_key, new_sample.clone());
753 }
754 MetricType::Integer | MetricType::String => (),
755 }
756 }
757}
758
759#[cfg(test)]
760impl ProjectSampler {
761 fn push_metric(&mut self, metric: MetricConfig) {
762 self.metrics.push(RefCell::new(metric));
763 }
764}
765
766fn process_sample_for_data_type(
767 new_sample: &Property,
768 previous_sample_opt: Option<&Property>,
769 data_source: &MetricCacheKey,
770 data_type: &MetricType,
771) -> Option<MetricEventPayload> {
772 let event_payload_res = match data_type {
773 MetricType::Occurrence => process_occurence(new_sample, previous_sample_opt, data_source),
774 MetricType::IntHistogram => {
775 process_int_histogram(new_sample, previous_sample_opt, data_source)
776 }
777 MetricType::Integer => {
778 if previous_sample_opt.is_some() {
782 warn!("Sampler has erroneously cached an Int type metric: {:?}", data_source);
783 }
784 process_int(new_sample, data_source)
785 }
786 MetricType::String => {
787 if previous_sample_opt.is_some() {
788 warn!("Sampler has erroneously cached a String type metric: {:?}", data_source);
789 }
790 process_string(new_sample, data_source)
791 }
792 };
793
794 match event_payload_res {
795 Ok(payload_opt) => payload_opt,
796 Err(err) => {
797 warn!(data_source:?, err:?; "Failed to process Inspect property for cobalt",);
798 None
799 }
800 }
801}
802
803fn sanitize_unsigned_numerical(diff: u64, data_source: &MetricCacheKey) -> Result<i64, Error> {
807 match diff.try_into() {
808 Ok(diff) => Ok(diff),
809 Err(e) => Err(format_err!(
810 "Selector used for EventCount type \
811 refered to an unsigned int property, \
812 but cobalt requires i64, and casting introduced overflow \
813 which produces a negative int: {:?}. This could be due to \
814 a single sample being larger than i64, or a diff between \
815 samples being larger than i64. Error: {:?}",
816 data_source,
817 e
818 )),
819 }
820}
821
822fn process_int_histogram(
823 new_sample: &Property,
824 prev_sample_opt: Option<&Property>,
825 data_source: &MetricCacheKey,
826) -> Result<Option<MetricEventPayload>, Error> {
827 let diff = match prev_sample_opt {
828 None => convert_inspect_histogram_to_cobalt_histogram(new_sample, data_source)?,
829 Some(prev_sample) => {
830 if std::mem::discriminant(new_sample) == std::mem::discriminant(prev_sample) {
832 compute_histogram_diff(new_sample, prev_sample, data_source)?
833 } else {
834 convert_inspect_histogram_to_cobalt_histogram(new_sample, data_source)?
835 }
836 }
837 };
838
839 let non_empty_diff: Vec<HistogramBucket> = diff.into_iter().filter(|v| v.count != 0).collect();
840 if !non_empty_diff.is_empty() {
841 Ok(Some(MetricEventPayload::Histogram(non_empty_diff)))
842 } else {
843 Ok(None)
844 }
845}
846
847fn compute_histogram_diff(
848 new_sample: &Property,
849 old_sample: &Property,
850 data_source: &MetricCacheKey,
851) -> Result<Vec<HistogramBucket>, Error> {
852 let new_histogram_buckets =
853 convert_inspect_histogram_to_cobalt_histogram(new_sample, data_source)?;
854 let old_histogram_buckets =
855 convert_inspect_histogram_to_cobalt_histogram(old_sample, data_source)?;
856
857 if old_histogram_buckets.len() != new_histogram_buckets.len() {
858 return Err(format_err!(
859 "Selector referenced an Inspect IntArray \
860 that was specified as an IntHistogram type \
861 but the histogram bucket count changed between \
862 samples, which is incompatible with Cobalt. \
863 Selector: {:?}, Inspect type: {}",
864 data_source,
865 new_sample.discriminant_name()
866 ));
867 }
868
869 new_histogram_buckets
870 .iter()
871 .zip(old_histogram_buckets)
872 .map(|(new_bucket, old_bucket)| {
873 if new_bucket.count < old_bucket.count {
874 return Err(format_err!(
875 concat!(
876 "Selector referenced an Inspect IntArray",
877 " that was specified as an IntHistogram type ",
878 " but at least one bucket saw the count decrease",
879 " between samples, which is incompatible with Cobalt's",
880 " need for monotonically increasing counts.",
881 " Selector: {:?}, Inspect type: {}"
882 ),
883 data_source,
884 new_sample.discriminant_name()
885 ));
886 }
887 Ok(HistogramBucket {
888 count: new_bucket.count - old_bucket.count,
889 index: new_bucket.index,
890 })
891 })
892 .collect::<Result<Vec<HistogramBucket>, Error>>()
893}
894
895fn build_cobalt_histogram(counts: impl Iterator<Item = u64>) -> Vec<HistogramBucket> {
896 counts
897 .enumerate()
898 .map(|(index, count)| HistogramBucket { index: index as u32, count })
899 .collect()
900}
901
902fn build_sparse_cobalt_histogram(
903 counts: impl Iterator<Item = u64>,
904 indexes: &[usize],
905 size: usize,
906) -> Vec<HistogramBucket> {
907 let mut histogram =
908 Vec::from_iter((0..size).map(|index| HistogramBucket { index: index as u32, count: 0 }));
909 for (index, count) in indexes.iter().zip(counts) {
910 histogram[*index].count = count;
911 }
912 histogram
913}
914
915fn convert_inspect_histogram_to_cobalt_histogram(
916 inspect_histogram: &Property,
917 data_source: &MetricCacheKey,
918) -> Result<Vec<HistogramBucket>, Error> {
919 macro_rules! err {($($message:expr),+) => {
920 Err(format_err!(
921 concat!($($message),+ , " Selector: {:?}, Inspect type: {}"),
922 data_source,
923 inspect_histogram.discriminant_name()
924 ))
925 }}
926
927 let sanitize_size = |size: usize| -> Result<(), Error> {
928 if size > u32::MAX as usize {
929 return err!(
930 "Selector referenced an Inspect array",
931 " that was specified as a histogram type ",
932 " but contained an index too large for a u32."
933 );
934 }
935 Ok(())
936 };
937
938 let sanitize_indexes = |indexes: &[usize], size: usize| -> Result<(), Error> {
939 for index in indexes.iter() {
940 if *index >= size {
941 return err!(
942 "Selector referenced an Inspect array",
943 " that was specified as a histogram type ",
944 " but contained an invalid index."
945 );
946 }
947 }
948 Ok(())
949 };
950
951 let sanitize_counts = |counts: &[i64]| -> Result<(), Error> {
952 for count in counts.iter() {
953 if *count < 0 {
954 return err!(
955 "Selector referenced an Inspect IntArray",
956 " that was specified as an IntHistogram type ",
957 " but a bucket contained a negative count. This",
958 " is incompatible with Cobalt histograms which only",
959 " support positive histogram counts."
960 );
961 }
962 }
963 Ok(())
964 };
965
966 let histogram = match inspect_histogram {
967 Property::IntArray(
968 _,
969 ArrayContent::LinearHistogram(LinearHistogram { counts, indexes, size, .. }),
970 )
971 | Property::IntArray(
972 _,
973 ArrayContent::ExponentialHistogram(ExponentialHistogram {
974 counts, indexes, size, ..
975 }),
976 ) => {
977 sanitize_size(*size)?;
978 sanitize_counts(counts)?;
979 match (indexes, counts) {
980 (None, counts) => build_cobalt_histogram(counts.iter().map(|c| *c as u64)),
981 (Some(indexes), counts) => {
982 sanitize_indexes(indexes, *size)?;
983 build_sparse_cobalt_histogram(counts.iter().map(|c| *c as u64), indexes, *size)
984 }
985 }
986 }
987 Property::UintArray(
988 _,
989 ArrayContent::LinearHistogram(LinearHistogram { counts, indexes, size, .. }),
990 )
991 | Property::UintArray(
992 _,
993 ArrayContent::ExponentialHistogram(ExponentialHistogram {
994 counts, indexes, size, ..
995 }),
996 ) => {
997 sanitize_size(*size)?;
998 match (indexes, counts) {
999 (None, counts) => build_cobalt_histogram(counts.iter().copied()),
1000 (Some(indexes), counts) => {
1001 sanitize_indexes(indexes, *size)?;
1002 build_sparse_cobalt_histogram(counts.iter().copied(), indexes, *size)
1003 }
1004 }
1005 }
1006 _ => {
1007 return Err(format_err!(
1011 concat!(
1012 "Selector referenced an Inspect property",
1013 " that was specified as an IntHistogram type ",
1014 " but is unable to be encoded in a cobalt HistogramBucket",
1015 " vector. Selector: {:?}, Inspect type: {}"
1016 ),
1017 data_source,
1018 inspect_histogram.discriminant_name()
1019 ));
1020 }
1021 };
1022 Ok(histogram)
1023}
1024
1025fn process_int(
1026 new_sample: &Property,
1027 data_source: &MetricCacheKey,
1028) -> Result<Option<MetricEventPayload>, Error> {
1029 let sampled_int = match new_sample {
1030 Property::Uint(_, val) => sanitize_unsigned_numerical(*val, data_source)?,
1031 Property::Int(_, val) => *val,
1032 _ => {
1033 return Err(format_err!(
1034 concat!(
1035 "Selector referenced an Inspect property",
1036 " that was specified as an Int type ",
1037 " but is unable to be encoded in an i64",
1038 " Selector: {:?}, Inspect type: {}"
1039 ),
1040 data_source,
1041 new_sample.discriminant_name()
1042 ));
1043 }
1044 };
1045
1046 Ok(Some(MetricEventPayload::IntegerValue(sampled_int)))
1047}
1048
1049fn process_string(
1050 new_sample: &Property,
1051 data_source: &MetricCacheKey,
1052) -> Result<Option<MetricEventPayload>, Error> {
1053 let sampled_string = match new_sample {
1054 Property::String(_, val) => val.clone(),
1055 _ => {
1056 return Err(format_err!(
1057 concat!(
1058 "Selector referenced an Inspect property specified as String",
1059 " but property is not type String.",
1060 " Selector: {:?}, Inspect type: {}"
1061 ),
1062 data_source,
1063 new_sample.discriminant_name()
1064 ));
1065 }
1066 };
1067
1068 Ok(Some(MetricEventPayload::StringValue(sampled_string)))
1069}
1070
1071fn process_occurence(
1072 new_sample: &Property,
1073 prev_sample_opt: Option<&Property>,
1074 data_source: &MetricCacheKey,
1075) -> Result<Option<MetricEventPayload>, Error> {
1076 let diff = match prev_sample_opt {
1077 None => compute_initial_event_count(new_sample, data_source)?,
1078 Some(prev_sample) => compute_event_count_diff(new_sample, prev_sample, data_source)?,
1079 };
1080
1081 if diff < 0 {
1082 return Err(format_err!(
1083 concat!(
1084 "Event count must be monotonically increasing,",
1085 " but we observed a negative event count diff for: {:?}"
1086 ),
1087 data_source
1088 ));
1089 }
1090
1091 if diff == 0 {
1092 return Ok(None);
1093 }
1094
1095 Ok(Some(MetricEventPayload::Count(diff as u64)))
1098}
1099
1100fn compute_initial_event_count(
1101 new_sample: &Property,
1102 data_source: &MetricCacheKey,
1103) -> Result<i64, Error> {
1104 match new_sample {
1105 Property::Uint(_, val) => sanitize_unsigned_numerical(*val, data_source),
1106 Property::Int(_, val) => Ok(*val),
1107 _ => Err(format_err!(
1108 concat!(
1109 "Selector referenced an Inspect property",
1110 " that is not compatible with cached",
1111 " transformation to an event count.",
1112 " Selector: {:?}, {}"
1113 ),
1114 data_source,
1115 new_sample.discriminant_name()
1116 )),
1117 }
1118}
1119
1120fn compute_event_count_diff(
1121 new_sample: &Property,
1122 old_sample: &Property,
1123 data_source: &MetricCacheKey,
1124) -> Result<i64, Error> {
1125 match (new_sample, old_sample) {
1126 (Property::Int(_, new_count), Property::Int(_, old_count)) => Ok(new_count - old_count),
1133 (Property::Uint(_, new_count), Property::Uint(_, old_count)) => {
1134 sanitize_unsigned_numerical(
1137 new_count.checked_sub(*old_count).unwrap_or(u64::MAX),
1138 data_source,
1139 )
1140 }
1141 (_, Property::Uint(_, _)) | (_, Property::Int(_, _)) => {
1145 warn!(
1146 "Inspect type of sampled data changed between samples. Restarting cache. {:?}",
1147 data_source
1148 );
1149 compute_initial_event_count(new_sample, data_source)
1150 }
1151 _ => Err(format_err!(
1152 concat!(
1153 "Inspect type of sampled data changed between samples",
1154 " to a type incompatible with event counters.",
1155 " Selector: {:?}, New type: {:?}"
1156 ),
1157 data_source,
1158 new_sample.discriminant_name()
1159 )),
1160 }
1161}
1162
1163fn process_schema_errors(
1164 errors: &Option<Vec<diagnostics_data::InspectError>>,
1165 moniker: &ExtendedMoniker,
1166) {
1167 match errors {
1168 Some(errors) => {
1169 for error in errors {
1170 warn!(moniker:%, error:?; "");
1171 }
1172 }
1173 None => {
1174 warn!(moniker:%; "Encountered null payload and no errors.");
1175 }
1176 }
1177}
1178
1179#[cfg(test)]
1180mod tests {
1181 use super::*;
1182 use diagnostics_data::{InspectDataBuilder, Timestamp};
1183 use diagnostics_hierarchy::hierarchy;
1184 use fidl_fuchsia_inspect::DEFAULT_TREE_NAME;
1185 use sampler_config::{MetricId, ProjectId};
1186
1187 #[fuchsia::test]
1188 fn test_filter_metrics() {
1189 let mut sampler = ProjectSampler {
1190 archive_reader: ArchiveReader::inspect(),
1191 metrics: vec![],
1192 metric_cache: RefCell::new(HashMap::new()),
1193 metric_loggers: HashMap::new(),
1194 project_id: ProjectId(1),
1195 poll_rate_sec: 3600,
1196 project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
1197 all_done: true,
1198 };
1199 let selector_foo: String = "core/foo:[name=foo]root/path:value".to_string();
1200 let selector_bar: String = "core/foo:[name=bar]root/path:value".to_string();
1201
1202 sampler.push_metric(MetricConfig {
1203 project_id: None,
1204 selectors: vec![selectors::parse_verbose(&selector_foo).unwrap()],
1205 metric_id: MetricId(1),
1206 metric_type: MetricType::Occurrence,
1208 event_codes: Vec::new(),
1209 upload_once: true,
1212 });
1213 sampler.push_metric(MetricConfig {
1214 project_id: None,
1215 selectors: vec![selectors::parse_verbose(&selector_bar).unwrap()],
1216 metric_id: MetricId(2),
1217 metric_type: MetricType::Occurrence,
1219 event_codes: Vec::new(),
1220 upload_once: true,
1223 });
1224
1225 sampler.rebuild_selector_data_structures();
1226
1227 let moniker = ExtendedMoniker::try_from("core/foo").unwrap();
1228
1229 let filtered_metrics =
1230 sampler.filter_metrics_by_moniker_and_tree_name(&moniker, "foo").collect::<Vec<_>>();
1231 assert_eq!(1, filtered_metrics.len());
1232 assert_eq!(MetricId(1), filtered_metrics[0].borrow().metric_id);
1233 }
1234
1235 #[fuchsia::test]
1236 fn test_filter_metrics_with_wildcards() {
1237 let mut sampler = ProjectSampler {
1238 archive_reader: ArchiveReader::inspect(),
1239 metrics: vec![],
1240 metric_cache: RefCell::new(HashMap::new()),
1241 metric_loggers: HashMap::new(),
1242 project_id: ProjectId(1),
1243 poll_rate_sec: 3600,
1244 project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
1245 all_done: true,
1246 };
1247 let selector_foo: String = r"bootstrap/*-drivers\:*:[name=foo]root/path:value".to_string();
1248 let selector_bar1: String =
1249 r"bootstrap/*-drivers\:*:[name=bar]root/path:value1".to_string();
1250 let selector_bar2: String =
1251 r"bootstrap/*-drivers\:*:[name=bar]root/path:value2".to_string();
1252
1253 sampler.push_metric(MetricConfig {
1254 project_id: None,
1255 selectors: vec![selectors::parse_verbose(&selector_foo).unwrap()],
1256 metric_id: MetricId(1),
1257 metric_type: MetricType::Occurrence,
1259 event_codes: Vec::new(),
1260 upload_once: true,
1263 });
1264 sampler.push_metric(MetricConfig {
1265 project_id: None,
1266 selectors: vec![selectors::parse_verbose(&selector_bar1).unwrap()],
1267 metric_id: MetricId(2),
1268 metric_type: MetricType::Occurrence,
1270 event_codes: Vec::new(),
1271 upload_once: true,
1274 });
1275 sampler.push_metric(MetricConfig {
1276 project_id: None,
1277 selectors: vec![selectors::parse_verbose(&selector_bar2).unwrap()],
1278 metric_id: MetricId(3),
1279 metric_type: MetricType::Occurrence,
1281 event_codes: Vec::new(),
1282 upload_once: true,
1285 });
1286
1287 sampler.rebuild_selector_data_structures();
1288
1289 let moniker = ExtendedMoniker::try_from("bootstrap/boot-drivers:098098").unwrap();
1290
1291 let filtered_metrics =
1292 sampler.filter_metrics_by_moniker_and_tree_name(&moniker, "foo").collect::<Vec<_>>();
1293 assert_eq!(1, filtered_metrics.len());
1294 assert_eq!(MetricId(1), filtered_metrics[0].borrow().metric_id);
1295
1296 let filtered_metrics =
1297 sampler.filter_metrics_by_moniker_and_tree_name(&moniker, "bar").collect::<Vec<_>>();
1298 assert_eq!(2, filtered_metrics.len());
1299 assert_eq!(MetricId(2), filtered_metrics[0].borrow().metric_id);
1300 assert_eq!(MetricId(3), filtered_metrics[1].borrow().metric_id);
1301 }
1302
1303 #[fuchsia::test]
1305 fn test_process_payload_with_escapes() {
1306 let unescaped: String = "path/to".to_string();
1307 let hierarchy = hierarchy! {
1308 root: {
1309 var unescaped: {
1310 value: 0,
1311 "value/with:escapes": 0,
1312 }
1313 }
1314 };
1315
1316 let mut sampler = ProjectSampler {
1317 archive_reader: ArchiveReader::inspect(),
1318 metrics: vec![],
1319 metric_cache: RefCell::new(HashMap::new()),
1320 metric_loggers: HashMap::new(),
1321 project_id: ProjectId(1),
1322 poll_rate_sec: 3600,
1323 project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
1324 all_done: true,
1325 };
1326 let selector: String = "my/component:root/path\\/to:value".to_string();
1327 sampler.push_metric(MetricConfig {
1328 project_id: None,
1329 selectors: vec![selectors::parse_verbose(&selector).unwrap()],
1330 metric_id: MetricId(1),
1331 metric_type: MetricType::Occurrence,
1333 event_codes: Vec::new(),
1334 upload_once: true,
1337 });
1338 sampler.rebuild_selector_data_structures();
1339 match sampler.process_component_data(
1340 &hierarchy,
1341 &InspectHandleName::name(DEFAULT_TREE_NAME),
1342 &"my/component".try_into().unwrap(),
1343 ) {
1344 Ok((SnapshotOutcome::SelectorsChanged, _events)) => (),
1347 _ => panic!("Expecting SelectorsChanged from process_component_data."),
1348 }
1349
1350 let selector_with_escaped_property: String =
1351 "my/component:root/path\\/to:value\\/with\\:escapes".to_string();
1352 sampler.push_metric(MetricConfig {
1353 project_id: None,
1354 selectors: vec![selectors::parse_verbose(&selector_with_escaped_property).unwrap()],
1355 metric_id: MetricId(1),
1356 metric_type: MetricType::Occurrence,
1358 event_codes: Vec::new(),
1359 upload_once: true,
1362 });
1363 sampler.rebuild_selector_data_structures();
1364 match sampler.process_component_data(
1365 &hierarchy,
1366 &InspectHandleName::name(DEFAULT_TREE_NAME),
1367 &"my/component".try_into().unwrap(),
1368 ) {
1369 Ok((SnapshotOutcome::SelectorsChanged, _events)) => (),
1372 _ => panic!("Expecting SelectorsChanged from process_component_data."),
1373 }
1374
1375 let selector_unfound: String = "my/component:root/path/to:value".to_string();
1376 sampler.push_metric(MetricConfig {
1377 project_id: None,
1378 selectors: vec![selectors::parse_verbose(&selector_unfound).unwrap()],
1379 metric_id: MetricId(1),
1380 metric_type: MetricType::Occurrence,
1382 event_codes: Vec::new(),
1383 upload_once: true,
1386 });
1387 sampler.rebuild_selector_data_structures();
1388 match sampler.process_component_data(
1389 &hierarchy,
1390 &InspectHandleName::name(DEFAULT_TREE_NAME),
1391 &"my/component".try_into().unwrap(),
1392 ) {
1393 Ok((SnapshotOutcome::SelectorsUnchanged, _events)) => (),
1395 _ => panic!("Expecting SelectorsUnchanged from process_component_data."),
1396 }
1397 }
1398
1399 #[fuchsia::test]
1402 fn decreasing_occurrence_is_correct() {
1403 let big_number = Property::Uint("foo".to_string(), 5);
1404 let small_number = Property::Uint("foo".to_string(), 2);
1405 let key = MetricCacheKey {
1406 handle_name: InspectHandleName::name("some_file"),
1407 selector: "sel".to_string(),
1408 };
1409
1410 assert_eq!(
1411 process_sample_for_data_type(
1412 &big_number,
1413 Some(&small_number),
1414 &key,
1415 &MetricType::Occurrence
1416 ),
1417 Some(MetricEventPayload::Count(3))
1418 );
1419 assert_eq!(
1420 process_sample_for_data_type(
1421 &small_number,
1422 Some(&big_number),
1423 &key,
1424 &MetricType::Occurrence
1425 ),
1426 None
1427 );
1428 }
1429
1430 #[fuchsia::test]
1432 fn test_upload_once() {
1433 let hierarchy = hierarchy! {
1434 root: {
1435 value_one: 0,
1436 value_two: 1,
1437 }
1438 };
1439
1440 let mut sampler = ProjectSampler {
1441 archive_reader: ArchiveReader::inspect(),
1442 metrics: vec![],
1443 metric_cache: RefCell::new(HashMap::new()),
1444 metric_loggers: HashMap::new(),
1445 project_id: ProjectId(1),
1446 poll_rate_sec: 3600,
1447 project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
1448 all_done: true,
1449 };
1450 sampler.push_metric(MetricConfig {
1451 project_id: None,
1452 selectors: vec![selectors::parse_verbose("my/component:root:value_one").unwrap()],
1453 metric_id: MetricId(1),
1454 metric_type: MetricType::Integer,
1455 event_codes: Vec::new(),
1456 upload_once: true,
1457 });
1458 sampler.push_metric(MetricConfig {
1459 project_id: None,
1460 selectors: vec![selectors::parse_verbose("my/component:root:value_two").unwrap()],
1461 metric_id: MetricId(2),
1462 metric_type: MetricType::Integer,
1463 event_codes: Vec::new(),
1464 upload_once: true,
1465 });
1466 sampler.rebuild_selector_data_structures();
1467
1468 match sampler.process_component_data(
1470 &hierarchy,
1471 &InspectHandleName::name(DEFAULT_TREE_NAME),
1472 &"my/component".try_into().unwrap(),
1473 ) {
1474 Ok((SnapshotOutcome::SelectorsChanged, _events)) => (),
1475 _ => panic!("Expecting SelectorsChanged from process_component_data."),
1476 }
1477
1478 let moniker: ExtendedMoniker = "my_component".try_into().unwrap();
1479 assert!(sampler
1480 .filter_metrics_by_moniker_and_tree_name(&moniker, DEFAULT_TREE_NAME)
1481 .collect::<Vec<_>>()
1482 .is_empty());
1483 }
1484
1485 struct EventCountTesterParams {
1486 new_val: Property,
1487 old_val: Option<Property>,
1488 process_ok: bool,
1489 event_made: bool,
1490 diff: i64,
1491 }
1492
1493 fn process_occurence_tester(params: EventCountTesterParams) {
1494 let data_source = MetricCacheKey {
1495 handle_name: InspectHandleName::name("foo.file"),
1496 selector: "test:root:count".to_string(),
1497 };
1498 let event_res = process_occurence(¶ms.new_val, params.old_val.as_ref(), &data_source);
1499
1500 if !params.process_ok {
1501 assert!(event_res.is_err());
1502 return;
1503 }
1504
1505 assert!(event_res.is_ok());
1506
1507 let event_opt = event_res.unwrap();
1508
1509 if !params.event_made {
1510 assert!(event_opt.is_none());
1511 return;
1512 }
1513
1514 assert!(event_opt.is_some());
1515 let event = event_opt.unwrap();
1516 match event {
1517 MetricEventPayload::Count(count) => {
1518 assert_eq!(count, params.diff as u64);
1519 }
1520 _ => panic!("Expecting event counts."),
1521 }
1522 }
1523
1524 #[fuchsia::test]
1525 fn test_normal_process_occurence() {
1526 process_occurence_tester(EventCountTesterParams {
1527 new_val: Property::Int("count".to_string(), 1),
1528 old_val: None,
1529 process_ok: true,
1530 event_made: true,
1531 diff: 1,
1532 });
1533
1534 process_occurence_tester(EventCountTesterParams {
1535 new_val: Property::Int("count".to_string(), 1),
1536 old_val: Some(Property::Int("count".to_string(), 1)),
1537 process_ok: true,
1538 event_made: false,
1539 diff: -1,
1540 });
1541
1542 process_occurence_tester(EventCountTesterParams {
1543 new_val: Property::Int("count".to_string(), 3),
1544 old_val: Some(Property::Int("count".to_string(), 1)),
1545 process_ok: true,
1546 event_made: true,
1547 diff: 2,
1548 });
1549 }
1550
1551 #[fuchsia::test]
1552 fn test_data_type_changing_process_occurence() {
1553 process_occurence_tester(EventCountTesterParams {
1554 new_val: Property::Int("count".to_string(), 1),
1555 old_val: None,
1556 process_ok: true,
1557 event_made: true,
1558 diff: 1,
1559 });
1560
1561 process_occurence_tester(EventCountTesterParams {
1562 new_val: Property::Uint("count".to_string(), 1),
1563 old_val: None,
1564 process_ok: true,
1565 event_made: true,
1566 diff: 1,
1567 });
1568
1569 process_occurence_tester(EventCountTesterParams {
1570 new_val: Property::Uint("count".to_string(), 3),
1571 old_val: Some(Property::Int("count".to_string(), 1)),
1572 process_ok: true,
1573 event_made: true,
1574 diff: 3,
1575 });
1576
1577 process_occurence_tester(EventCountTesterParams {
1578 new_val: Property::String("count".to_string(), "big_oof".to_string()),
1579 old_val: Some(Property::Int("count".to_string(), 1)),
1580 process_ok: false,
1581 event_made: false,
1582 diff: -1,
1583 });
1584 }
1585
1586 #[fuchsia::test]
1587 fn test_event_count_negatives_and_overflows() {
1588 process_occurence_tester(EventCountTesterParams {
1589 new_val: Property::Int("count".to_string(), -11),
1590 old_val: None,
1591 process_ok: false,
1592 event_made: false,
1593 diff: -1,
1594 });
1595
1596 process_occurence_tester(EventCountTesterParams {
1597 new_val: Property::Int("count".to_string(), 9),
1598 old_val: Some(Property::Int("count".to_string(), 10)),
1599 process_ok: false,
1600 event_made: false,
1601 diff: -1,
1602 });
1603
1604 process_occurence_tester(EventCountTesterParams {
1605 new_val: Property::Uint("count".to_string(), u64::MAX),
1606 old_val: None,
1607 process_ok: false,
1608 event_made: false,
1609 diff: -1,
1610 });
1611
1612 let i64_max_in_u64: u64 = i64::MAX.try_into().unwrap();
1613
1614 process_occurence_tester(EventCountTesterParams {
1615 new_val: Property::Uint("count".to_string(), i64_max_in_u64 + 1),
1616 old_val: Some(Property::Uint("count".to_string(), 1)),
1617 process_ok: true,
1618 event_made: true,
1619 diff: i64::MAX,
1620 });
1621
1622 process_occurence_tester(EventCountTesterParams {
1623 new_val: Property::Uint("count".to_string(), i64_max_in_u64 + 2),
1624 old_val: Some(Property::Uint("count".to_string(), 1)),
1625 process_ok: false,
1626 event_made: false,
1627 diff: -1,
1628 });
1629 }
1630
1631 struct IntTesterParams {
1632 new_val: Property,
1633 process_ok: bool,
1634 sample: i64,
1635 }
1636
1637 fn process_int_tester(params: IntTesterParams) {
1638 let data_source = MetricCacheKey {
1639 handle_name: InspectHandleName::name("foo.file"),
1640 selector: "test:root:count".to_string(),
1641 };
1642 let event_res = process_int(¶ms.new_val, &data_source);
1643
1644 if !params.process_ok {
1645 assert!(event_res.is_err());
1646 return;
1647 }
1648
1649 assert!(event_res.is_ok());
1650
1651 let event = event_res.expect("event should be Ok").expect("event should be Some");
1652 match event {
1653 MetricEventPayload::IntegerValue(val) => {
1654 assert_eq!(val, params.sample);
1655 }
1656 _ => panic!("Expecting event counts."),
1657 }
1658 }
1659
1660 #[fuchsia::test]
1661 fn test_normal_process_int() {
1662 process_int_tester(IntTesterParams {
1663 new_val: Property::Int("count".to_string(), 13),
1664 process_ok: true,
1665 sample: 13,
1666 });
1667
1668 process_int_tester(IntTesterParams {
1669 new_val: Property::Int("count".to_string(), -13),
1670 process_ok: true,
1671 sample: -13,
1672 });
1673
1674 process_int_tester(IntTesterParams {
1675 new_val: Property::Int("count".to_string(), 0),
1676 process_ok: true,
1677 sample: 0,
1678 });
1679
1680 process_int_tester(IntTesterParams {
1681 new_val: Property::Uint("count".to_string(), 13),
1682 process_ok: true,
1683 sample: 13,
1684 });
1685
1686 process_int_tester(IntTesterParams {
1687 new_val: Property::String("count".to_string(), "big_oof".to_string()),
1688 process_ok: false,
1689 sample: -1,
1690 });
1691 }
1692
1693 #[fuchsia::test]
1694 fn test_int_edge_cases() {
1695 process_int_tester(IntTesterParams {
1696 new_val: Property::Int("count".to_string(), i64::MAX),
1697 process_ok: true,
1698 sample: i64::MAX,
1699 });
1700
1701 process_int_tester(IntTesterParams {
1702 new_val: Property::Int("count".to_string(), i64::MIN),
1703 process_ok: true,
1704 sample: i64::MIN,
1705 });
1706
1707 let i64_max_in_u64: u64 = i64::MAX.try_into().unwrap();
1708
1709 process_int_tester(IntTesterParams {
1710 new_val: Property::Uint("count".to_string(), i64_max_in_u64),
1711 process_ok: true,
1712 sample: i64::MAX,
1713 });
1714
1715 process_int_tester(IntTesterParams {
1716 new_val: Property::Uint("count".to_string(), i64_max_in_u64 + 1),
1717 process_ok: false,
1718 sample: -1,
1719 });
1720 }
1721
1722 struct StringTesterParams {
1723 sample: Property,
1724 process_ok: bool,
1725 previous_sample: Option<Property>,
1726 }
1727
1728 fn process_string_tester(params: StringTesterParams) {
1729 let metric_cache_key = MetricCacheKey {
1730 handle_name: InspectHandleName::name("foo.file"),
1731 selector: "test:root:string_val".to_string(),
1732 };
1733
1734 let event = process_sample_for_data_type(
1735 ¶ms.sample,
1736 params.previous_sample.as_ref(),
1737 &metric_cache_key,
1738 &MetricType::String,
1739 );
1740
1741 if !params.process_ok {
1742 assert!(event.is_none());
1743 return;
1744 }
1745
1746 match event.unwrap() {
1747 MetricEventPayload::StringValue(val) => {
1748 assert_eq!(val.as_str(), params.sample.string().unwrap());
1749 }
1750 _ => panic!("Expecting event with StringValue."),
1751 }
1752 }
1753
1754 #[fuchsia::test]
1755 fn test_process_string() {
1756 process_string_tester(StringTesterParams {
1757 sample: Property::String("string_val".to_string(), "Hello, world!".to_string()),
1758 process_ok: true,
1759 previous_sample: None,
1760 });
1761
1762 process_string_tester(StringTesterParams {
1765 sample: Property::String("string_val".to_string(), "Hello, world!".to_string()),
1766 process_ok: true,
1767 previous_sample: Some(Property::String("string_val".to_string(), "Uh oh!".to_string())),
1768 });
1769
1770 process_string_tester(StringTesterParams {
1773 sample: Property::Int("string_val".to_string(), 123),
1774 process_ok: false,
1775 previous_sample: None,
1776 });
1777
1778 process_string_tester(StringTesterParams {
1779 sample: Property::Uint("string_val".to_string(), 123),
1780 process_ok: false,
1781 previous_sample: None,
1782 });
1783 }
1784
1785 fn convert_vector_to_int_histogram(hist: Vec<i64>) -> Property {
1786 let size = hist.len();
1787 Property::IntArray(
1788 "Bloop".to_string(),
1789 ArrayContent::LinearHistogram(LinearHistogram {
1790 floor: 1,
1791 step: 1,
1792 counts: hist,
1793 size,
1794 indexes: None,
1795 }),
1796 )
1797 }
1798
1799 fn convert_vector_to_uint_histogram(hist: Vec<u64>) -> Property<String> {
1800 let size = hist.len();
1801 Property::UintArray(
1802 "Bloop".to_string(),
1803 ArrayContent::LinearHistogram(LinearHistogram {
1804 floor: 1,
1805 step: 1,
1806 counts: hist,
1807 size,
1808 indexes: None,
1809 }),
1810 )
1811 }
1812
1813 fn convert_vectors_to_int_histogram(counts: Vec<i64>, indexes: Vec<usize>) -> Property<String> {
1815 let size = 100;
1816 Property::IntArray(
1817 "Bloop".to_string(),
1818 ArrayContent::LinearHistogram(LinearHistogram {
1819 floor: 1,
1820 step: 1,
1821 counts,
1822 size,
1823 indexes: Some(indexes),
1824 }),
1825 )
1826 }
1827
1828 fn convert_vectors_to_uint_histogram(
1829 counts: Vec<u64>,
1830 indexes: Vec<usize>,
1831 ) -> Property<String> {
1832 let size = 100;
1833 Property::UintArray(
1834 "Bloop".to_string(),
1835 ArrayContent::LinearHistogram(LinearHistogram {
1836 floor: 1,
1837 step: 1,
1838 counts,
1839 size,
1840 indexes: Some(indexes),
1841 }),
1842 )
1843 }
1844
1845 struct IntHistogramTesterParams {
1846 new_val: Property,
1847 old_val: Option<Property>,
1848 process_ok: bool,
1849 event_made: bool,
1850 diff: Vec<(u32, u64)>,
1851 }
1852 fn process_int_histogram_tester(params: IntHistogramTesterParams) {
1853 let data_source = MetricCacheKey {
1854 handle_name: InspectHandleName::name("foo.file"),
1855 selector: "test:root:count".to_string(),
1856 };
1857 let event_res =
1858 process_int_histogram(¶ms.new_val, params.old_val.as_ref(), &data_source);
1859
1860 if !params.process_ok {
1861 assert!(event_res.is_err());
1862 return;
1863 }
1864
1865 assert!(event_res.is_ok());
1866
1867 let event_opt = event_res.unwrap();
1868 if !params.event_made {
1869 assert!(event_opt.is_none());
1870 return;
1871 }
1872
1873 assert!(event_opt.is_some());
1874
1875 let event = event_opt.unwrap();
1876 match event.clone() {
1877 MetricEventPayload::Histogram(histogram_buckets) => {
1878 assert_eq!(histogram_buckets.len(), params.diff.len());
1879
1880 let expected_histogram_buckets = params
1881 .diff
1882 .iter()
1883 .map(|(index, count)| HistogramBucket { index: *index, count: *count })
1884 .collect::<Vec<HistogramBucket>>();
1885
1886 assert_eq!(histogram_buckets, expected_histogram_buckets);
1887 }
1888 _ => panic!("Expecting int histogram."),
1889 }
1890 }
1891
1892 #[fuchsia::test]
1895 fn test_normal_process_int_histogram() {
1896 let new_i64_sample = convert_vector_to_int_histogram(vec![1, 1, 1, 1]);
1897 let new_u64_sample = convert_vector_to_uint_histogram(vec![1, 1, 1, 1]);
1898
1899 process_int_histogram_tester(IntHistogramTesterParams {
1900 new_val: new_i64_sample,
1901 old_val: None,
1902 process_ok: true,
1903 event_made: true,
1904 diff: vec![(0, 1), (1, 1), (2, 1), (3, 1)],
1905 });
1906
1907 process_int_histogram_tester(IntHistogramTesterParams {
1908 new_val: new_u64_sample,
1909 old_val: None,
1910 process_ok: true,
1911 event_made: true,
1912 diff: vec![(0, 1), (1, 1), (2, 1), (3, 1)],
1913 });
1914
1915 let new_u64_sample = convert_vector_to_uint_histogram(vec![u64::MAX, u64::MAX, u64::MAX]);
1918 process_int_histogram_tester(IntHistogramTesterParams {
1919 new_val: new_u64_sample,
1920 old_val: None,
1921 process_ok: true,
1922 event_made: true,
1923 diff: vec![(0, u64::MAX), (1, u64::MAX), (2, u64::MAX)],
1924 });
1925
1926 let new_u64_sample = convert_vector_to_uint_histogram(Vec::new());
1928 process_int_histogram_tester(IntHistogramTesterParams {
1929 new_val: new_u64_sample,
1930 old_val: None,
1931 process_ok: true,
1932 event_made: false,
1933 diff: Vec::new(),
1934 });
1935
1936 let new_u64_sample = convert_vector_to_uint_histogram(vec![0, 0, 0, 0]);
1937 process_int_histogram_tester(IntHistogramTesterParams {
1938 new_val: new_u64_sample,
1939 old_val: None,
1940 process_ok: true,
1941 event_made: false,
1942 diff: Vec::new(),
1943 });
1944
1945 let new_u64_sample = convert_vector_to_uint_histogram(vec![2, 1, 2, 1]);
1947 let old_u64_sample = Some(convert_vector_to_uint_histogram(vec![1, 1, 0, 1]));
1948 process_int_histogram_tester(IntHistogramTesterParams {
1949 new_val: new_u64_sample,
1950 old_val: old_u64_sample,
1951 process_ok: true,
1952 event_made: true,
1953 diff: vec![(0, 1), (2, 2)],
1954 });
1955
1956 let new_i64_sample = convert_vector_to_int_histogram(vec![5, 2, 1, 3]);
1957 let old_i64_sample = Some(convert_vector_to_int_histogram(vec![1, 1, 1, 1]));
1958 process_int_histogram_tester(IntHistogramTesterParams {
1959 new_val: new_i64_sample,
1960 old_val: old_i64_sample,
1961 process_ok: true,
1962 event_made: true,
1963 diff: vec![(0, 4), (1, 1), (3, 2)],
1964 });
1965
1966 let new_u64_sample = convert_vector_to_uint_histogram(vec![2, 1, 1, 1]);
1968 let old_i64_sample = Some(convert_vector_to_int_histogram(vec![1, 1, 1, 1]));
1969 process_int_histogram_tester(IntHistogramTesterParams {
1970 new_val: new_u64_sample,
1971 old_val: old_i64_sample,
1972 process_ok: true,
1973 event_made: true,
1974 diff: vec![(0, 2), (1, 1), (2, 1), (3, 1)],
1975 });
1976 }
1977
1978 #[fuchsia::test]
1980 fn test_normal_process_condensed_histograms() {
1981 let new_u64_sample = convert_vectors_to_int_histogram(vec![2, 6], vec![3, 5]);
1982 let old_u64_sample = Some(convert_vectors_to_int_histogram(vec![1], vec![5]));
1983 process_int_histogram_tester(IntHistogramTesterParams {
1984 new_val: new_u64_sample,
1985 old_val: old_u64_sample,
1986 process_ok: true,
1987 event_made: true,
1988 diff: vec![(3, 2), (5, 5)],
1989 });
1990 let new_i64_sample = convert_vectors_to_uint_histogram(vec![2, 4], vec![5, 3]);
1991 let old_i64_sample = None;
1992 process_int_histogram_tester(IntHistogramTesterParams {
1993 new_val: new_i64_sample,
1994 old_val: old_i64_sample,
1995 process_ok: true,
1996 event_made: true,
1997 diff: vec![(3, 4), (5, 2)],
1998 });
1999 }
2000
2001 #[fuchsia::test]
2002 fn test_errorful_process_int_histogram() {
2003 let new_u64_sample = convert_vector_to_uint_histogram(vec![1, 1, 1, 1]);
2005 let old_u64_sample = Some(convert_vector_to_uint_histogram(vec![1, 1, 1, 1, 1]));
2006 process_int_histogram_tester(IntHistogramTesterParams {
2007 new_val: new_u64_sample,
2008 old_val: old_u64_sample,
2009 process_ok: false,
2010 event_made: false,
2011 diff: Vec::new(),
2012 });
2013
2014 let new_i64_sample = convert_vector_to_int_histogram(vec![1, 1, -1, 1]);
2016 process_int_histogram_tester(IntHistogramTesterParams {
2017 new_val: new_i64_sample,
2018 old_val: None,
2019 process_ok: false,
2020 event_made: false,
2021 diff: Vec::new(),
2022 });
2023
2024 let new_i64_sample = convert_vector_to_int_histogram(vec![5, 2, 1, 3]);
2026 let old_i64_sample = Some(convert_vector_to_int_histogram(vec![6, 1, 1, 1]));
2027 process_int_histogram_tester(IntHistogramTesterParams {
2028 new_val: new_i64_sample,
2029 old_val: old_i64_sample,
2030 process_ok: false,
2031 event_made: false,
2032 diff: Vec::new(),
2033 });
2034 }
2035
2036 #[fuchsia::test]
2041 async fn test_inspect_handle_name_distinguishes_data() {
2042 let mut sampler = ProjectSampler {
2043 archive_reader: ArchiveReader::inspect(),
2044 metrics: vec![],
2045 metric_cache: RefCell::new(HashMap::new()),
2046 metric_loggers: HashMap::new(),
2047 project_id: ProjectId(1),
2048 poll_rate_sec: 3600,
2049 project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
2050 all_done: true,
2051 };
2052 let selector: String = "my/component:[...]root/branch:leaf".to_string();
2053 let metric_id = MetricId(1);
2054 let event_codes = vec![];
2055 sampler.push_metric(MetricConfig {
2056 project_id: None,
2057 selectors: vec![selectors::parse_verbose(&selector).unwrap()],
2058 metric_id,
2059 metric_type: MetricType::Occurrence,
2060 event_codes,
2061 upload_once: false,
2062 });
2063 sampler.rebuild_selector_data_structures();
2064
2065 let data1_value4 = vec![InspectDataBuilder::new(
2066 "my/component".try_into().unwrap(),
2067 "component-url",
2068 Timestamp::from_nanos(0),
2069 )
2070 .with_hierarchy(hierarchy! { root: {branch: {leaf: 4i32}}})
2071 .with_name(InspectHandleName::name("name1"))
2072 .build()];
2073 let data2_value3 = vec![InspectDataBuilder::new(
2074 "my/component".try_into().unwrap(),
2075 "component-url",
2076 Timestamp::from_nanos(0),
2077 )
2078 .with_hierarchy(hierarchy! { root: {branch: {leaf: 3i32}}})
2079 .with_name(InspectHandleName::name("name2"))
2080 .build()];
2081 let data1_value6 = vec![InspectDataBuilder::new(
2082 "my/component".try_into().unwrap(),
2083 "component-url",
2084 Timestamp::from_nanos(0),
2085 )
2086 .with_hierarchy(hierarchy! { root: {branch: {leaf: 6i32}}})
2087 .with_name(InspectHandleName::name("name1"))
2088 .build()];
2089 let data2_value8 = vec![InspectDataBuilder::new(
2090 "my/component".try_into().unwrap(),
2091 "component-url",
2092 Timestamp::from_nanos(0),
2093 )
2094 .with_hierarchy(hierarchy! { root: {branch: {leaf: 8i32}}})
2095 .with_name(InspectHandleName::name("name2"))
2096 .build()];
2097
2098 fn expect_one_metric_event_value(
2099 events: Result<Vec<EventToLog>, Error>,
2100 value: u64,
2101 context: &'static str,
2102 ) {
2103 let events = events.expect(context);
2104 assert_eq!(events.len(), 1, "Events len not 1: {}: {}", context, events.len());
2105 let event = &events[0];
2106 let (project_id, MetricEvent { payload, .. }) = event;
2107 assert_eq!(*project_id, ProjectId(1));
2108 if let fidl_fuchsia_metrics::MetricEventPayload::Count(payload) = payload {
2109 assert_eq!(
2110 payload, &value,
2111 "Wrong payload, expected {value} got {payload} at {context}"
2112 );
2113 } else {
2114 panic!("Expected MetricEventPayload::Count at {context}, got {payload:?}");
2115 }
2116 }
2117
2118 expect_one_metric_event_value(sampler.process_snapshot(data1_value4).await, 4, "first");
2119 expect_one_metric_event_value(sampler.process_snapshot(data2_value3).await, 3, "second");
2120 expect_one_metric_event_value(sampler.process_snapshot(data1_value6).await, 2, "third");
2121 expect_one_metric_event_value(sampler.process_snapshot(data2_value8).await, 5, "fourth");
2122 }
2123
2124 #[fuchsia::test]
2126 async fn project_id_can_be_overwritten_by_the_metric_project_id() {
2127 let mut sampler = ProjectSampler {
2128 archive_reader: ArchiveReader::inspect(),
2129 metrics: vec![],
2130 metric_cache: RefCell::new(HashMap::new()),
2131 metric_loggers: HashMap::new(),
2132 project_id: ProjectId(1),
2133 poll_rate_sec: 3600,
2134 project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
2135 all_done: true,
2136 };
2137 let selector: String = "my/component:[name=name1]root/branch:leaf".to_string();
2138 let metric_id = MetricId(1);
2139 let event_codes = vec![];
2140 sampler.push_metric(MetricConfig {
2141 project_id: Some(ProjectId(2)),
2142 selectors: vec![selectors::parse_verbose(&selector).unwrap()],
2143 metric_id,
2144 metric_type: MetricType::Occurrence,
2145 event_codes,
2146 upload_once: false,
2147 });
2148 sampler.rebuild_selector_data_structures();
2149
2150 let value = vec![InspectDataBuilder::new(
2151 "my/component".try_into().unwrap(),
2152 "component-url",
2153 Timestamp::from_nanos(0),
2154 )
2155 .with_hierarchy(hierarchy! { root: {branch: {leaf: 4i32}}})
2156 .with_name(InspectHandleName::name("name1"))
2157 .build()];
2158
2159 let events = sampler.process_snapshot(value).await.expect("processed snapshot");
2160 assert_eq!(events.len(), 1);
2161 let event = &events[0];
2162 let (project_id, MetricEvent { .. }) = event;
2163 assert_eq!(*project_id, ProjectId(2));
2164 }
2165}