1use fuchsia_inspect::{
6 ExponentialHistogramParams, HistogramProperty, LinearHistogramParams, Node, NumericProperty,
7 UintExponentialHistogramProperty, UintLinearHistogramProperty, UintProperty,
8};
9use fuchsia_sync::Mutex;
10use std::collections::BTreeMap;
11use std::ffi::CStr;
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::{Arc, OnceLock};
14use zx::{self as zx, MonotonicDuration};
15
16pub(crate) static TRACE_CATEGORY: &CStr = c"archivist";
17
18static TIME_USEC_PARAMS: ExponentialHistogramParams<u64> =
20 ExponentialHistogramParams { floor: 0, initial_step: 1, step_multiplier: 2, buckets: 26 };
21
22static MAX_SNAPSHOT_SIZE_BYTES_PARAMS: LinearHistogramParams<u64> =
25 LinearHistogramParams { floor: 0, step_size: 10000, buckets: 100 };
26
27static SNAPSHOT_SCHEMA_TRUNCATION_PARAMS: LinearHistogramParams<u64> =
30 LinearHistogramParams { floor: 0, step_size: 5, buckets: 20 };
31
32pub struct AccessorStats {
33 _node: Node,
35
36 pub global_stats: Arc<GlobalAccessorStats>,
38
39 pub inspect_stats: Arc<GlobalConnectionStats>,
42
43 pub logs_stats: Arc<GlobalConnectionStats>,
46}
47
48pub struct GlobalAccessorStats {
49 pub connections_opened: UintProperty,
51 pub connections_closed: UintProperty,
53 pub stream_diagnostics_requests: UintProperty,
56}
57
58impl AccessorStats {
59 pub fn new(node: Node) -> Self {
60 let connections_opened = node.create_uint("connections_opened", 0);
61 let connections_closed = node.create_uint("connections_closed", 0);
62
63 let stream_diagnostics_requests = node.create_uint("stream_diagnostics_requests", 0);
64
65 let inspect_stats = Arc::new(GlobalConnectionStats::new(node.create_child("inspect")));
66 let logs_stats = Arc::new(GlobalConnectionStats::new(node.create_child("logs")));
67
68 AccessorStats {
69 _node: node,
70 global_stats: Arc::new(GlobalAccessorStats {
71 connections_opened,
72 connections_closed,
73 stream_diagnostics_requests,
74 }),
75 inspect_stats,
76 logs_stats,
77 }
78 }
79
80 pub fn new_inspect_batch_iterator(&self) -> BatchIteratorConnectionStats {
81 self.inspect_stats.new_batch_iterator_connection()
82 }
83
84 pub fn new_logs_batch_iterator(&self) -> BatchIteratorConnectionStats {
85 self.logs_stats.new_batch_iterator_connection()
86 }
87}
88
89pub struct GlobalConnectionStats {
90 node: Node,
92 reader_servers_constructed: UintProperty,
95 reader_servers_destroyed: UintProperty,
97 batch_iterator: GlobalBatchIteratorStats,
99 component_timeouts_count: UintProperty,
102 schema_truncation_count: UintProperty,
105 component_time_usec: OnceLock<UintExponentialHistogramProperty>,
107 max_snapshot_sizes_bytes: UintLinearHistogramProperty,
109 snapshot_schema_truncation_percentage: UintLinearHistogramProperty,
111 processing_time_tracker: OnceLock<Mutex<ProcessingTimeTracker>>,
113 batch_iterator_connections: Node,
115 next_connection_id: AtomicUsize,
117}
118
119impl GlobalConnectionStats {
120 pub fn new(node: Node) -> Self {
121 let reader_servers_constructed = node.create_uint("reader_servers_constructed", 0);
122 let reader_servers_destroyed = node.create_uint("reader_servers_destroyed", 0);
123
124 let batch_iterator = GlobalBatchIteratorStats::new(&node);
125 let component_timeouts_count = node.create_uint("component_timeouts_count", 0);
126
127 let max_snapshot_sizes_bytes = node.create_uint_linear_histogram(
128 "max_snapshot_sizes_bytes",
129 MAX_SNAPSHOT_SIZE_BYTES_PARAMS.clone(),
130 );
131
132 let snapshot_schema_truncation_percentage = node.create_uint_linear_histogram(
133 "snapshot_schema_truncation_percentage",
134 SNAPSHOT_SCHEMA_TRUNCATION_PARAMS.clone(),
135 );
136
137 let schema_truncation_count = node.create_uint("schema_truncation_count", 0);
138 let batch_iterator_connections = node.create_child("batch_iterator_connections");
139
140 GlobalConnectionStats {
141 node,
142 reader_servers_constructed,
143 reader_servers_destroyed,
144 batch_iterator,
145 batch_iterator_connections,
146 component_timeouts_count,
147 max_snapshot_sizes_bytes,
148 snapshot_schema_truncation_percentage,
149 schema_truncation_count,
150 component_time_usec: OnceLock::new(),
151 processing_time_tracker: OnceLock::new(),
152 next_connection_id: AtomicUsize::new(0),
153 }
154 }
155
156 fn new_batch_iterator_connection(self: &Arc<Self>) -> BatchIteratorConnectionStats {
157 let node = self
158 .batch_iterator_connections
159 .create_child(self.next_connection_id.fetch_add(1, Ordering::Relaxed).to_string());
160 BatchIteratorConnectionStats::new(node, Arc::clone(self))
161 }
162
163 pub fn add_timeout(&self) {
164 self.component_timeouts_count.add(1);
165 }
166
167 pub fn timeout_counter(&self) -> &UintProperty {
168 &self.component_timeouts_count
169 }
170
171 pub fn record_percent_truncated_schemas(&self, percent_truncated_schemas: u64) {
172 self.snapshot_schema_truncation_percentage.insert(percent_truncated_schemas);
173 }
174
175 pub fn record_max_snapshot_size_config(&self, max_snapshot_size_config: u64) {
176 self.max_snapshot_sizes_bytes.insert(max_snapshot_size_config);
177 }
178
179 pub fn record_batch_duration(&self, duration: MonotonicDuration) {
181 let micros = duration.into_micros();
182 if micros >= 0 {
183 self.batch_iterator.get_next.time_usec.insert(micros as u64);
184 }
185 }
186
187 pub fn record_component_duration(&self, moniker: impl AsRef<str>, duration: MonotonicDuration) {
189 let nanos = duration.into_nanos();
190 if nanos >= 0 {
191 let component_time_usec = self.component_time_usec.get_or_init(|| {
194 self.node.create_uint_exponential_histogram(
195 "component_time_usec",
196 TIME_USEC_PARAMS.clone(),
197 )
198 });
199
200 let processing_time_tracker = self.processing_time_tracker.get_or_init(|| {
201 Mutex::new(ProcessingTimeTracker::new(
202 self.node.create_child("longest_processing_times"),
203 ))
204 });
205
206 component_time_usec.insert(nanos as u64 / 1000);
207 processing_time_tracker.lock().track(moniker.as_ref(), nanos as u64);
208 }
209 }
210}
211
212struct GlobalBatchIteratorStats {
213 _node: Node,
214 connections_opened: UintProperty,
216 connections_closed: UintProperty,
218 get_next: GlobalBatchIteratorGetNextStats,
219}
220
221impl GlobalBatchIteratorStats {
222 fn new(parent: &Node) -> Self {
223 let node = parent.create_child("batch_iterator");
224 let connections_opened = node.create_uint("connections_opened", 0);
225 let connections_closed = node.create_uint("connections_closed", 0);
226 let get_next = GlobalBatchIteratorGetNextStats::new(&node);
227 Self { _node: node, connections_opened, connections_closed, get_next }
228 }
229}
230
231struct GlobalBatchIteratorGetNextStats {
232 _node: Node,
233 requests: UintProperty,
235 responses: UintProperty,
237 result_count: UintProperty,
239 result_errors: UintProperty,
241 time_usec: UintExponentialHistogramProperty,
243}
244
245impl GlobalBatchIteratorGetNextStats {
246 fn new(parent: &Node) -> Self {
247 let node = parent.create_child("get_next");
248 let requests = node.create_uint("requests", 0);
249 let responses = node.create_uint("responses", 0);
250 let result_count = node.create_uint("result_count", 0);
251 let result_errors = node.create_uint("result_errors", 0);
252 let time_usec =
253 node.create_uint_exponential_histogram("time_usec", TIME_USEC_PARAMS.clone());
254 Self { _node: node, requests, responses, result_count, result_errors, time_usec }
255 }
256}
257
258const PROCESSING_TIME_COMPONENT_COUNT_LIMIT: usize = 20;
259
260struct ProcessingTimeTracker {
262 node: Node,
264 longest_times_by_component: BTreeMap<String, (u64, Node)>,
266 shortest_time_ns: u64,
270}
271
272impl ProcessingTimeTracker {
273 fn new(node: Node) -> Self {
274 Self { node, longest_times_by_component: BTreeMap::new(), shortest_time_ns: u64::MAX }
275 }
276 fn track(&mut self, moniker: &str, time_ns: u64) {
277 let at_capacity =
278 self.longest_times_by_component.len() >= PROCESSING_TIME_COMPONENT_COUNT_LIMIT;
279
280 if at_capacity && time_ns < self.shortest_time_ns {
283 return;
284 }
285
286 let parent_node = &self.node;
287
288 let make_entry = || {
289 let n = parent_node.create_child(moniker.to_string());
290 n.record_int("@time", zx::BootInstant::get().into_nanos());
291 n.record_double("duration_seconds", time_ns as f64 / 1e9);
292 (time_ns, n)
293 };
294
295 self.longest_times_by_component
296 .entry(moniker.to_string())
297 .and_modify(move |v| {
298 if v.0 < time_ns {
299 *v = make_entry();
300 }
301 })
302 .or_insert_with(make_entry);
303
304 while self.longest_times_by_component.len() > PROCESSING_TIME_COMPONENT_COUNT_LIMIT {
307 let mut key = "".to_string();
308 for (k, (val, _)) in &self.longest_times_by_component {
309 if *val == self.shortest_time_ns {
310 key.clone_from(k);
311 break;
312 }
313 }
314 self.longest_times_by_component.remove(&key);
315 self.shortest_time_ns =
316 self.longest_times_by_component.values().map(|v| v.0).min().unwrap_or(u64::MAX);
317 }
318
319 self.shortest_time_ns = std::cmp::min(self.shortest_time_ns, time_ns);
320 }
321}
322
323pub struct BatchIteratorConnectionStats {
324 _node: Node,
326 global_stats: Arc<GlobalConnectionStats>,
328 get_next_requests: UintProperty,
330 get_next_responses: UintProperty,
332 get_next_terminal_responses: UintProperty,
335}
336
337impl BatchIteratorConnectionStats {
338 fn new(node: Node, global_stats: Arc<GlobalConnectionStats>) -> Self {
339 global_stats.reader_servers_constructed.add(1);
341
342 let get_next = node.create_child("get_next");
343 let get_next_requests = get_next.create_uint("requests", 0);
344 let get_next_responses = get_next.create_uint("responses", 0);
345 let get_next_terminal_responses = get_next.create_uint("terminal_responses", 0);
346 node.record(get_next);
347
348 Self {
349 _node: node,
350 global_stats,
351 get_next_requests,
352 get_next_responses,
353 get_next_terminal_responses,
354 }
355 }
356
357 pub fn open_connection(&self) {
358 self.global_stats.batch_iterator.connections_opened.add(1);
359 }
360
361 pub fn close_connection(&self) {
362 self.global_stats.batch_iterator.connections_closed.add(1);
363 }
364
365 pub fn global_stats(&self) -> &Arc<GlobalConnectionStats> {
366 &self.global_stats
367 }
368
369 pub fn add_request(&self) {
370 self.global_stats.batch_iterator.get_next.requests.add(1);
371 self.get_next_requests.add(1);
372 }
373
374 pub fn add_response(&self) {
375 self.global_stats.batch_iterator.get_next.responses.add(1);
376 self.get_next_responses.add(1);
377 }
378
379 pub fn add_terminal(&self) {
380 self.get_next_terminal_responses.add(1);
381 }
382
383 pub fn add_result(&self) {
384 self.global_stats.batch_iterator.get_next.result_count.add(1);
385 }
386
387 pub fn add_result_error(&self) {
388 self.global_stats.batch_iterator.get_next.result_errors.add(1);
389 }
390
391 pub fn add_schema_truncated(&self) {
392 self.global_stats.schema_truncation_count.add(1);
393 }
394}
395
396impl Drop for BatchIteratorConnectionStats {
397 fn drop(&mut self) {
398 self.global_stats.reader_servers_destroyed.add(1);
399 }
400}
401
402#[cfg(test)]
403mod test {
404 use super::*;
405 use diagnostics_assertions::{AnyProperty, assert_data_tree};
406 use fuchsia_inspect::health::Reporter;
407 use fuchsia_inspect::{Inspector, component};
408
409 #[fuchsia::test]
410 async fn health() {
411 component::health().set_ok();
412 assert_data_tree!(component::inspector(),
413 root: {
414 "fuchsia.inspect.Health": {
415 status: "OK",
416 start_timestamp_nanos: AnyProperty,
417 }
418 });
419
420 component::health().set_unhealthy("Bad state");
421 assert_data_tree!(component::inspector(),
422 root: contains {
423 "fuchsia.inspect.Health": {
424 status: "UNHEALTHY",
425 message: "Bad state",
426 start_timestamp_nanos: AnyProperty,
427 }
428 });
429
430 component::health().set_ok();
431 assert_data_tree!(component::inspector(),
432 root: contains {
433 "fuchsia.inspect.Health": {
434 status: "OK",
435 start_timestamp_nanos: AnyProperty,
436 }
437 });
438 }
439
440 #[fuchsia::test]
441 async fn processing_time_tracker() {
442 let inspector = Inspector::default();
443 let mut tracker = ProcessingTimeTracker::new(inspector.root().create_child("test"));
444
445 tracker.track("a", 1e9 as u64);
446 assert_data_tree!(inspector,
447 root: {
448 test: {
449 a: {
450 "@time": AnyProperty,
451 duration_seconds: 1f64
452 }
453 }
454 });
455
456 tracker.track("a", 5e8 as u64);
457 assert_data_tree!(inspector,
458 root: {
459 test: {
460 a: {
461 "@time": AnyProperty,
462 duration_seconds: 1f64
463 }
464 }
465 });
466
467 tracker.track("a", 5500e6 as u64);
468 assert_data_tree!(inspector,
469 root: {
470 test: {
471 a: {
472 "@time": AnyProperty,
473 duration_seconds: 5.5f64
474 }
475 }
476 });
477
478 for time in 0..60 {
479 tracker.track(&format!("b{time}"), time * 1e9 as u64);
480 }
481
482 assert_data_tree!(inspector,
483 root: {
484 test: {
485 b40: { "@time": AnyProperty, duration_seconds: 40f64 },
486 b41: { "@time": AnyProperty, duration_seconds: 41f64 },
487 b42: { "@time": AnyProperty, duration_seconds: 42f64 },
488 b43: { "@time": AnyProperty, duration_seconds: 43f64 },
489 b44: { "@time": AnyProperty, duration_seconds: 44f64 },
490 b45: { "@time": AnyProperty, duration_seconds: 45f64 },
491 b46: { "@time": AnyProperty, duration_seconds: 46f64 },
492 b47: { "@time": AnyProperty, duration_seconds: 47f64 },
493 b48: { "@time": AnyProperty, duration_seconds: 48f64 },
494 b49: { "@time": AnyProperty, duration_seconds: 49f64 },
495 b50: { "@time": AnyProperty, duration_seconds: 50f64 },
496 b51: { "@time": AnyProperty, duration_seconds: 51f64 },
497 b52: { "@time": AnyProperty, duration_seconds: 52f64 },
498 b53: { "@time": AnyProperty, duration_seconds: 53f64 },
499 b54: { "@time": AnyProperty, duration_seconds: 54f64 },
500 b55: { "@time": AnyProperty, duration_seconds: 55f64 },
501 b56: { "@time": AnyProperty, duration_seconds: 56f64 },
502 b57: { "@time": AnyProperty, duration_seconds: 57f64 },
503 b58: { "@time": AnyProperty, duration_seconds: 58f64 },
504 b59: { "@time": AnyProperty, duration_seconds: 59f64 },
505 }
506 });
507 }
508}