1use crate::constants::FORMATTED_CONTENT_CHUNK_SIZE_TARGET;
6use crate::diagnostics::{BatchIteratorConnectionStats, TRACE_CATEGORY};
7use crate::error::AccessorError;
8use crate::formatter::{
9 new_batcher, FXTPacketSerializer, FormattedStream, JsonPacketSerializer, SerializedVmo,
10};
11use crate::inspect::repository::InspectRepository;
12use crate::inspect::ReaderServer;
13use crate::logs::container::CursorItem;
14use crate::logs::repository::LogsRepository;
15use crate::pipeline::Pipeline;
16use diagnostics_data::{Data, DiagnosticsData, ExtendedMoniker, Metadata};
17use fidl::endpoints::{ControlHandle, RequestStream};
18use fidl_fuchsia_diagnostics::{
19 ArchiveAccessorRequest, ArchiveAccessorRequestStream, BatchIteratorControlHandle,
20 BatchIteratorRequest, BatchIteratorRequestStream, ClientSelectorConfiguration, DataType,
21 Format, FormattedContent, PerformanceConfiguration, Selector, SelectorArgument, StreamMode,
22 StreamParameters, StringSelector, TreeSelector, TreeSelectorUnknown,
23};
24use fidl_fuchsia_mem::Buffer;
25use fuchsia_inspect::NumericProperty;
26use fuchsia_sync::Mutex;
27use futures::future::{select, Either};
28use futures::prelude::*;
29use futures::stream::Peekable;
30use futures::{pin_mut, StreamExt};
31use log::warn;
32use selectors::FastError;
33use serde::Serialize;
34use std::collections::HashMap;
35use std::pin::Pin;
36use std::sync::Arc;
37use thiserror::Error;
38use {fidl_fuchsia_diagnostics_host as fhost, fuchsia_async as fasync, fuchsia_trace as ftrace};
39
40#[derive(Debug, Copy, Clone)]
41pub struct BatchRetrievalTimeout(i64);
42
43impl BatchRetrievalTimeout {
44 pub fn from_seconds(s: i64) -> Self {
45 Self(s)
46 }
47
48 #[cfg(test)]
49 pub fn max() -> Self {
50 Self::from_seconds(-1)
51 }
52
53 pub fn seconds(&self) -> i64 {
54 if self.0 > 0 {
55 self.0
56 } else {
57 i64::MAX
58 }
59 }
60}
61
62pub struct ArchiveAccessorServer {
66 inspect_repository: Arc<InspectRepository>,
67 logs_repository: Arc<LogsRepository>,
68 maximum_concurrent_snapshots_per_reader: u64,
69 scope: fasync::Scope,
70 default_batch_timeout_seconds: BatchRetrievalTimeout,
71}
72
73fn validate_and_parse_selectors(
74 selector_args: Vec<SelectorArgument>,
75) -> Result<Vec<Selector>, AccessorError> {
76 let mut selectors = vec![];
77 let mut errors = vec![];
78
79 if selector_args.is_empty() {
80 return Err(AccessorError::EmptySelectors);
81 }
82
83 for selector_arg in selector_args {
84 match selectors::take_from_argument::<FastError>(selector_arg) {
85 Ok(s) => selectors.push(s),
86 Err(e) => errors.push(e),
87 }
88 }
89
90 if !errors.is_empty() {
91 warn!(errors:?; "Found errors in selector arguments");
92 }
93
94 Ok(selectors)
95}
96
97fn validate_and_parse_log_selectors(
98 selector_args: Vec<SelectorArgument>,
99) -> Result<Vec<Selector>, AccessorError> {
100 let selectors = validate_and_parse_selectors(selector_args)?;
102 for selector in &selectors {
103 let tree_selector = selector.tree_selector.as_ref().unwrap();
105 match tree_selector {
106 TreeSelector::PropertySelector(_) => {
107 return Err(AccessorError::InvalidLogSelector);
108 }
109 TreeSelector::SubtreeSelector(subtree_selector) => {
110 if subtree_selector.node_path.len() != 1 {
111 return Err(AccessorError::InvalidLogSelector);
112 }
113 match &subtree_selector.node_path[0] {
114 StringSelector::ExactMatch(val) if val == "root" => {}
115 StringSelector::StringPattern(val) if val == "root" => {}
116 _ => {
117 return Err(AccessorError::InvalidLogSelector);
118 }
119 }
120 }
121 TreeSelectorUnknown!() => {}
122 }
123 }
124 Ok(selectors)
125}
126
127impl ArchiveAccessorServer {
128 pub fn new(
132 inspect_repository: Arc<InspectRepository>,
133 logs_repository: Arc<LogsRepository>,
134 maximum_concurrent_snapshots_per_reader: u64,
135 default_batch_timeout_seconds: BatchRetrievalTimeout,
136 scope: fasync::Scope,
137 ) -> Self {
138 ArchiveAccessorServer {
139 inspect_repository,
140 logs_repository,
141 maximum_concurrent_snapshots_per_reader,
142 scope,
143 default_batch_timeout_seconds,
144 }
145 }
146
147 async fn spawn<R: ArchiveAccessorWriter + Send>(
148 pipeline: Arc<Pipeline>,
149 inspect_repo: Arc<InspectRepository>,
150 log_repo: Arc<LogsRepository>,
151 requests: R,
152 params: StreamParameters,
153 maximum_concurrent_snapshots_per_reader: u64,
154 default_batch_timeout_seconds: BatchRetrievalTimeout,
155 ) -> Result<(), AccessorError> {
156 let format = params.format.ok_or(AccessorError::MissingFormat)?;
157 if !matches!(format, Format::Json | Format::Cbor | Format::Fxt) {
158 return Err(AccessorError::UnsupportedFormat);
159 }
160 let mode = params.stream_mode.ok_or(AccessorError::MissingMode)?;
161
162 let performance_config: PerformanceConfig = PerformanceConfig::new(
163 ¶ms,
164 maximum_concurrent_snapshots_per_reader,
165 default_batch_timeout_seconds,
166 )?;
167
168 let trace_id = ftrace::Id::random();
169 match params.data_type.ok_or(AccessorError::MissingDataType)? {
170 DataType::Inspect => {
171 let _trace_guard = ftrace::async_enter!(
172 trace_id,
173 TRACE_CATEGORY,
174 c"ArchiveAccessorServer::spawn",
175 "data_type" => "Inspect",
176 "trace_id" => u64::from(trace_id)
177 );
178 if !matches!(mode, StreamMode::Snapshot) {
179 return Err(AccessorError::UnsupportedMode);
180 }
181 let stats = Arc::new(pipeline.accessor_stats().new_inspect_batch_iterator());
182
183 let selectors =
184 params.client_selector_configuration.ok_or(AccessorError::MissingSelectors)?;
185
186 let selectors = match selectors {
187 ClientSelectorConfiguration::Selectors(selectors) => {
188 Some(validate_and_parse_selectors(selectors)?)
189 }
190 ClientSelectorConfiguration::SelectAll(_) => None,
191 _ => return Err(AccessorError::InvalidSelectors("unrecognized selectors")),
192 };
193
194 let static_hierarchy_allowlist = pipeline.static_hierarchy_allowlist();
195 let unpopulated_container_vec =
196 inspect_repo.fetch_inspect_data(&selectors, static_hierarchy_allowlist);
197
198 let per_component_budget_opt = if unpopulated_container_vec.is_empty() {
199 None
200 } else {
201 performance_config
202 .aggregated_content_limit_bytes
203 .map(|limit| (limit as usize) / unpopulated_container_vec.len())
204 };
205
206 if let Some(max_snapshot_size) = performance_config.aggregated_content_limit_bytes {
207 stats.global_stats().record_max_snapshot_size_config(max_snapshot_size);
208 }
209 BatchIterator::new(
210 ReaderServer::stream(
211 unpopulated_container_vec,
212 performance_config,
213 selectors,
214 Arc::clone(&stats),
215 trace_id,
216 ),
217 requests,
218 mode,
219 stats,
220 per_component_budget_opt,
221 trace_id,
222 format,
223 )?
224 .run()
225 .await
226 }
227 DataType::Logs => {
228 if format == Format::Cbor {
229 return Err(AccessorError::UnsupportedFormat);
231 }
232 let _trace_guard = ftrace::async_enter!(
233 trace_id,
234 TRACE_CATEGORY,
235 c"ArchiveAccessorServer::spawn",
236 "data_type" => "Logs",
237 "trace_id" => u64::from(trace_id)
240 );
241 let stats = Arc::new(pipeline.accessor_stats().new_logs_batch_iterator());
242 let selectors = match params.client_selector_configuration {
243 Some(ClientSelectorConfiguration::Selectors(selectors)) => {
244 Some(validate_and_parse_log_selectors(selectors)?)
245 }
246 Some(ClientSelectorConfiguration::SelectAll(_)) => None,
247 _ => return Err(AccessorError::InvalidSelectors("unrecognized selectors")),
248 };
249 match format {
250 Format::Fxt => {
251 let logs = log_repo.logs_cursor_raw(mode, selectors, trace_id);
252 BatchIterator::new_serving_fxt(
253 logs,
254 requests,
255 mode,
256 stats,
257 trace_id,
258 performance_config,
259 )?
260 .run()
261 .await?;
262 Ok(())
263 }
264 Format::Json => {
265 let logs = log_repo
266 .logs_cursor(mode, selectors, trace_id)
267 .map(move |inner: _| (*inner).clone());
268 BatchIterator::new_serving_arrays(logs, requests, mode, stats, trace_id)?
269 .run()
270 .await?;
271 Ok(())
272 }
273 Format::Text => Err(AccessorError::UnsupportedFormat),
275 Format::Cbor => unreachable!("CBOR is not supported for logs"),
276 }
277 }
278 }
279 }
280
281 pub fn spawn_server<RequestStream>(&self, pipeline: Arc<Pipeline>, mut stream: RequestStream)
284 where
285 RequestStream: ArchiveAccessorTranslator + Send + 'static,
286 <RequestStream as ArchiveAccessorTranslator>::InnerDataRequestChannel:
287 ArchiveAccessorWriter + Send,
288 {
289 let log_repo = Arc::clone(&self.logs_repository);
292 let inspect_repo = Arc::clone(&self.inspect_repository);
293 let maximum_concurrent_snapshots_per_reader = self.maximum_concurrent_snapshots_per_reader;
294 let default_batch_timeout_seconds = self.default_batch_timeout_seconds;
295 let scope = self.scope.to_handle();
296 self.scope.spawn(async move {
297 let stats = pipeline.accessor_stats();
298 stats.global_stats.connections_opened.add(1);
299 while let Some(request) = stream.next().await {
300 let control_handle = request.iterator.get_control_handle();
301 stats.global_stats.stream_diagnostics_requests.add(1);
302 let pipeline = Arc::clone(&pipeline);
303
304 let inspect_repo_for_task = Arc::clone(&inspect_repo);
308 let log_repo_for_task = Arc::clone(&log_repo);
309 scope.spawn(async move {
310 if let Err(e) = Self::spawn(
311 pipeline,
312 inspect_repo_for_task,
313 log_repo_for_task,
314 request.iterator,
315 request.parameters,
316 maximum_concurrent_snapshots_per_reader,
317 default_batch_timeout_seconds,
318 )
319 .await
320 {
321 if let Some(control) = control_handle {
322 e.close(control);
323 }
324 }
325 });
326 }
327 pipeline.accessor_stats().global_stats.connections_closed.add(1);
328 });
329 }
330}
331
332pub trait ArchiveAccessorWriter {
333 fn write(
335 &mut self,
336 results: Vec<FormattedContent>,
337 ) -> impl Future<Output = Result<(), IteratorError>> + Send;
338
339 fn wait_for_buffer(&mut self) -> impl Future<Output = anyhow::Result<()>> + Send {
341 futures::future::ready(Ok(()))
342 }
343
344 fn get_control_handle(&self) -> Option<BatchIteratorControlHandle> {
347 None
348 }
349
350 fn maybe_respond_ready(&mut self) -> impl Future<Output = Result<(), AccessorError>> + Send {
352 futures::future::ready(Ok(()))
353 }
354
355 fn wait_for_close(&mut self) -> impl Future<Output = ()> + Send;
357}
358
359fn get_buffer_from_formatted_content(
360 content: fidl_fuchsia_diagnostics::FormattedContent,
361) -> Result<Buffer, AccessorError> {
362 match content {
363 FormattedContent::Json(json) => Ok(json),
364 FormattedContent::Text(text) => Ok(text),
365 _ => Err(AccessorError::UnsupportedFormat),
366 }
367}
368
369impl ArchiveAccessorWriter for fuchsia_async::Socket {
370 async fn write(&mut self, data: Vec<FormattedContent>) -> Result<(), IteratorError> {
371 if data.is_empty() {
372 return Err(IteratorError::PeerClosed);
373 }
374 let mut buf = vec![0];
375 for value in data {
376 let data = get_buffer_from_formatted_content(value)?;
377 buf.resize(data.size as usize, 0);
378 data.vmo.read(&mut buf, 0)?;
379 let res = self.write_all(&buf).await;
380 if res.is_err() {
381 break;
383 }
384 }
385 Ok(())
386 }
387
388 async fn wait_for_close(&mut self) {
389 let _ = self.on_closed().await;
390 }
391}
392
393#[derive(Error, Debug)]
394pub enum IteratorError {
395 #[error("Peer closed")]
396 PeerClosed,
397 #[error(transparent)]
398 Ipc(#[from] fidl::Error),
399 #[error(transparent)]
400 AccessorError(#[from] AccessorError),
401 #[error("Error reading from VMO: {}", source)]
405 VmoReadError {
406 #[from]
407 source: zx::Status,
408 },
409}
410
411impl ArchiveAccessorWriter for Peekable<BatchIteratorRequestStream> {
412 async fn write(&mut self, data: Vec<FormattedContent>) -> Result<(), IteratorError> {
413 loop {
414 match self.next().await {
415 Some(Ok(BatchIteratorRequest::GetNext { responder })) => {
416 responder.send(Ok(data))?;
417 return Ok(());
418 }
419 Some(Ok(BatchIteratorRequest::WaitForReady { responder })) => {
420 responder.send()?;
421 }
422 Some(Ok(BatchIteratorRequest::_UnknownMethod { method_type, ordinal, .. })) => {
423 warn!(method_type:?, ordinal; "Got unknown interaction on BatchIterator");
424 return Err(IteratorError::PeerClosed);
425 }
426 Some(Err(err)) => return Err(err.into()),
427 None => {
428 return Err(IteratorError::PeerClosed);
429 }
430 }
431 }
432 }
433
434 async fn maybe_respond_ready(&mut self) -> Result<(), AccessorError> {
435 let mut this = Pin::new(self);
436 if matches!(this.as_mut().peek().await, Some(Ok(BatchIteratorRequest::WaitForReady { .. })))
437 {
438 let Some(Ok(BatchIteratorRequest::WaitForReady { responder })) = this.next().await
439 else {
440 unreachable!("We already checked the next request was WaitForReady");
441 };
442 responder.send()?;
443 }
444 Ok(())
445 }
446
447 async fn wait_for_buffer(&mut self) -> anyhow::Result<()> {
448 let this = Pin::new(self);
449 match this.peek().await {
450 Some(Ok(_)) => Ok(()),
451 _ => Err(IteratorError::PeerClosed.into()),
452 }
453 }
454
455 fn get_control_handle(&self) -> Option<BatchIteratorControlHandle> {
456 Some(self.get_ref().control_handle())
457 }
458
459 async fn wait_for_close(&mut self) {
460 let _ = self.get_ref().control_handle().on_closed().await;
461 }
462}
463
464pub struct ArchiveIteratorRequest<R> {
465 parameters: StreamParameters,
466 iterator: R,
467}
468
469pub trait ArchiveAccessorTranslator {
472 type InnerDataRequestChannel;
473 fn next(
474 &mut self,
475 ) -> impl Future<Output = Option<ArchiveIteratorRequest<Self::InnerDataRequestChannel>>> + Send;
476}
477
478impl ArchiveAccessorTranslator for fhost::ArchiveAccessorRequestStream {
479 type InnerDataRequestChannel = fuchsia_async::Socket;
480
481 async fn next(&mut self) -> Option<ArchiveIteratorRequest<Self::InnerDataRequestChannel>> {
482 match StreamExt::next(self).await {
483 Some(Ok(fhost::ArchiveAccessorRequest::StreamDiagnostics {
484 parameters,
485 responder,
486 stream,
487 })) => {
488 let _ = responder.send();
493 Some(ArchiveIteratorRequest {
494 iterator: fuchsia_async::Socket::from_socket(stream),
495 parameters,
496 })
497 }
498 _ => None,
499 }
500 }
501}
502
503impl ArchiveAccessorTranslator for ArchiveAccessorRequestStream {
504 type InnerDataRequestChannel = Peekable<BatchIteratorRequestStream>;
505
506 async fn next(&mut self) -> Option<ArchiveIteratorRequest<Self::InnerDataRequestChannel>> {
507 loop {
508 match StreamExt::next(self).await {
509 Some(Ok(ArchiveAccessorRequest::StreamDiagnostics {
510 control_handle: _,
511 result_stream,
512 stream_parameters,
513 })) => {
514 return Some(ArchiveIteratorRequest {
515 iterator: result_stream.into_stream().peekable(),
516 parameters: stream_parameters,
517 })
518 }
519 Some(Ok(ArchiveAccessorRequest::WaitForReady { responder })) => {
520 let _ = responder.send();
521 }
522 _ => return None,
523 }
524 }
525 }
526}
527struct SchemaTruncationCounter {
528 truncated_schemas: u64,
529 total_schemas: u64,
530}
531
532impl SchemaTruncationCounter {
533 pub fn new() -> Arc<Mutex<Self>> {
534 Arc::new(Mutex::new(Self { truncated_schemas: 0, total_schemas: 0 }))
535 }
536}
537
538pub(crate) struct BatchIterator<R> {
539 requests: R,
540 stats: Arc<BatchIteratorConnectionStats>,
541 data: FormattedStream,
542 truncation_counter: Option<Arc<Mutex<SchemaTruncationCounter>>>,
543 parent_trace_id: ftrace::Id,
544}
545
546fn maybe_update_budget(
549 budget_map: &mut HashMap<ExtendedMoniker, usize>,
550 moniker: &ExtendedMoniker,
551 bytes: usize,
552 byte_limit: usize,
553) -> bool {
554 if let Some(remaining_budget) = budget_map.get_mut(moniker) {
555 if *remaining_budget + bytes > byte_limit {
556 false
557 } else {
558 *remaining_budget += bytes;
559 true
560 }
561 } else if bytes > byte_limit {
562 budget_map.insert(moniker.clone(), 0);
563 false
564 } else {
565 budget_map.insert(moniker.clone(), bytes);
566 true
567 }
568}
569
570impl<R> BatchIterator<R>
571where
572 R: ArchiveAccessorWriter + Send,
573{
574 pub fn new<Items, D>(
575 data: Items,
576 requests: R,
577 mode: StreamMode,
578 stats: Arc<BatchIteratorConnectionStats>,
579 per_component_byte_limit_opt: Option<usize>,
580 parent_trace_id: ftrace::Id,
581 format: Format,
582 ) -> Result<Self, AccessorError>
583 where
584 Items: Stream<Item = Data<D>> + Send + 'static,
585 D: DiagnosticsData + 'static,
586 {
587 let result_stats_for_fut = Arc::clone(&stats);
588
589 let budget_tracker_shared = Arc::new(Mutex::new(HashMap::new()));
590
591 let truncation_counter = SchemaTruncationCounter::new();
592 let stream_owned_counter_for_fut = Arc::clone(&truncation_counter);
593
594 let data = data.then(move |mut d| {
595 let stream_owned_counter = Arc::clone(&stream_owned_counter_for_fut);
596 let result_stats = Arc::clone(&result_stats_for_fut);
597 let budget_tracker = Arc::clone(&budget_tracker_shared);
598 async move {
599 let trace_id = ftrace::Id::random();
600 let _trace_guard = ftrace::async_enter!(
601 trace_id,
602 TRACE_CATEGORY,
603 c"BatchIterator::new.serialize",
604 "parent_trace_id" => u64::from(parent_trace_id),
607 "trace_id" => u64::from(trace_id),
608 "moniker" => d.moniker.to_string().as_ref()
609 );
610 let mut unlocked_counter = stream_owned_counter.lock();
611 let mut tracker_guard = budget_tracker.lock();
612 unlocked_counter.total_schemas += 1;
613 if d.metadata.has_errors() {
614 result_stats.add_result_error();
615 }
616
617 match SerializedVmo::serialize(&d, D::DATA_TYPE, format) {
618 Err(e) => {
619 result_stats.add_result_error();
620 Err(e)
621 }
622 Ok(contents) => {
623 result_stats.add_result();
624 match per_component_byte_limit_opt {
625 Some(x) => {
626 if maybe_update_budget(
627 &mut tracker_guard,
628 &d.moniker,
629 contents.size as usize,
630 x,
631 ) {
632 Ok(contents)
633 } else {
634 result_stats.add_schema_truncated();
635 unlocked_counter.truncated_schemas += 1;
636 d.drop_payload();
637 SerializedVmo::serialize(&d, D::DATA_TYPE, format)
641 }
642 }
643 None => Ok(contents),
644 }
645 }
646 }
647 }
648 });
649
650 Self::new_inner(
651 new_batcher(data, Arc::clone(&stats), mode),
652 requests,
653 stats,
654 Some(truncation_counter),
655 parent_trace_id,
656 )
657 }
658
659 pub fn new_serving_fxt<S>(
660 data: S,
661 requests: R,
662 mode: StreamMode,
663 stats: Arc<BatchIteratorConnectionStats>,
664 parent_trace_id: ftrace::Id,
665 performance_config: PerformanceConfig,
666 ) -> Result<Self, AccessorError>
667 where
668 S: Stream<Item = CursorItem> + Send + Unpin + 'static,
669 {
670 let data = FXTPacketSerializer::new(
671 Arc::clone(&stats),
672 performance_config
673 .aggregated_content_limit_bytes
674 .unwrap_or(FORMATTED_CONTENT_CHUNK_SIZE_TARGET),
675 data,
676 );
677 Self::new_inner(
678 new_batcher(data, Arc::clone(&stats), mode),
679 requests,
680 stats,
681 None,
682 parent_trace_id,
683 )
684 }
685
686 pub fn new_serving_arrays<D, S>(
687 data: S,
688 requests: R,
689 mode: StreamMode,
690 stats: Arc<BatchIteratorConnectionStats>,
691 parent_trace_id: ftrace::Id,
692 ) -> Result<Self, AccessorError>
693 where
694 D: Serialize + Send + 'static,
695 S: Stream<Item = D> + Send + Unpin + 'static,
696 {
697 let data = JsonPacketSerializer::new(
698 Arc::clone(&stats),
699 FORMATTED_CONTENT_CHUNK_SIZE_TARGET,
700 data,
701 );
702 Self::new_inner(
703 new_batcher(data, Arc::clone(&stats), mode),
704 requests,
705 stats,
706 None,
707 parent_trace_id,
708 )
709 }
710
711 fn new_inner(
712 data: FormattedStream,
713 requests: R,
714 stats: Arc<BatchIteratorConnectionStats>,
715 truncation_counter: Option<Arc<Mutex<SchemaTruncationCounter>>>,
716 parent_trace_id: ftrace::Id,
717 ) -> Result<Self, AccessorError> {
718 stats.open_connection();
719 Ok(Self { data, requests, stats, truncation_counter, parent_trace_id })
720 }
721
722 pub async fn run(mut self) -> Result<(), AccessorError> {
723 self.requests.maybe_respond_ready().await?;
724 while self.requests.wait_for_buffer().await.is_ok() {
725 self.stats.add_request();
726 let start_time = zx::MonotonicInstant::get();
727 let trace_id = ftrace::Id::random();
728 let _trace_guard = ftrace::async_enter!(
729 trace_id,
730 TRACE_CATEGORY,
731 c"BatchIterator::run.get_send_batch",
732 "parent_trace_id" => u64::from(self.parent_trace_id),
735 "trace_id" => u64::from(trace_id)
736 );
737 let batch = {
738 let wait_for_close = self.requests.wait_for_close();
739 let next_data = self.data.next();
740 pin_mut!(wait_for_close);
741 match select(next_data, wait_for_close).await {
742 Either::Left((batch_option, _)) => batch_option.unwrap_or_default(),
744 Either::Right(_) => break,
746 }
747 };
748
749 let batch = batch.into_iter().collect::<Result<Vec<_>, _>>()?;
751
752 self.stats.add_response();
754 if batch.is_empty() {
755 if let Some(truncation_count) = &self.truncation_counter {
756 let unlocked_count = truncation_count.lock();
757 if unlocked_count.total_schemas > 0 {
758 self.stats.global_stats().record_percent_truncated_schemas(
759 ((unlocked_count.truncated_schemas as f32
760 / unlocked_count.total_schemas as f32)
761 * 100.0)
762 .round() as u64,
763 );
764 }
765 }
766 self.stats.add_terminal();
767 }
768 self.stats
769 .global_stats()
770 .record_batch_duration(zx::MonotonicInstant::get() - start_time);
771 if self.requests.write(batch).await.is_err() {
772 break;
774 }
775 }
776 Ok(())
777 }
778}
779
780impl<R> Drop for BatchIterator<R> {
781 fn drop(&mut self) {
782 self.stats.close_connection();
783 }
784}
785
786pub struct PerformanceConfig {
787 pub batch_timeout_sec: i64,
788 pub aggregated_content_limit_bytes: Option<u64>,
789 pub maximum_concurrent_snapshots_per_reader: u64,
790}
791
792impl PerformanceConfig {
793 fn new(
794 params: &StreamParameters,
795 maximum_concurrent_snapshots_per_reader: u64,
796 default_batch_timeout_seconds: BatchRetrievalTimeout,
797 ) -> Result<PerformanceConfig, AccessorError> {
798 let batch_timeout = match params {
799 StreamParameters {
802 batch_retrieval_timeout_seconds,
803 performance_configuration: None,
804 ..
805 }
806 | StreamParameters {
807 batch_retrieval_timeout_seconds,
808 performance_configuration:
809 Some(PerformanceConfiguration { batch_retrieval_timeout_seconds: None, .. }),
810 ..
811 } => batch_retrieval_timeout_seconds,
812 StreamParameters {
815 batch_retrieval_timeout_seconds: None,
816 performance_configuration:
817 Some(PerformanceConfiguration { batch_retrieval_timeout_seconds, .. }),
818 ..
819 } => batch_retrieval_timeout_seconds,
820 _ => return Err(AccessorError::DuplicateBatchTimeout),
822 }
823 .map(BatchRetrievalTimeout::from_seconds)
824 .unwrap_or(default_batch_timeout_seconds);
825
826 let aggregated_content_limit_bytes = match params {
827 StreamParameters {
828 performance_configuration:
829 Some(PerformanceConfiguration { max_aggregate_content_size_bytes, .. }),
830 ..
831 } => *max_aggregate_content_size_bytes,
832 _ => None,
833 };
834
835 Ok(PerformanceConfig {
836 batch_timeout_sec: batch_timeout.seconds(),
837 aggregated_content_limit_bytes,
838 maximum_concurrent_snapshots_per_reader,
839 })
840 }
841}
842
843#[cfg(test)]
844mod tests {
845 use super::*;
846 use crate::diagnostics::AccessorStats;
847 use assert_matches::assert_matches;
848 use fidl_fuchsia_diagnostics::{ArchiveAccessorMarker, BatchIteratorMarker};
849 use fuchsia_inspect::{Inspector, Node};
850 use zx::AsHandleRef;
851
852 #[fuchsia::test]
853 async fn logs_only_accept_basic_component_selectors() {
854 let scope = fasync::Scope::new();
855 let (accessor, stream) =
856 fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>();
857 let pipeline = Arc::new(Pipeline::for_test(None));
858 let inspector = Inspector::default();
859 let log_repo =
860 LogsRepository::new(1_000_000, std::iter::empty(), inspector.root(), scope.new_child());
861 let inspect_repo =
862 Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], scope.new_child()));
863 let server = ArchiveAccessorServer::new(
864 inspect_repo,
865 log_repo,
866 4,
867 BatchRetrievalTimeout::max(),
868 scope.new_child(),
869 );
870 server.spawn_server(pipeline, stream);
871
872 let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
874 assert!(accessor
875 .r#stream_diagnostics(
876 &StreamParameters {
877 data_type: Some(DataType::Logs),
878 stream_mode: Some(StreamMode::SnapshotThenSubscribe),
879 format: Some(Format::Json),
880 client_selector_configuration: Some(ClientSelectorConfiguration::Selectors(
881 vec![SelectorArgument::RawSelector("foo:root/bar:baz".to_string()),]
882 )),
883 ..Default::default()
884 },
885 server_end
886 )
887 .is_ok());
888 assert_matches!(
889 batch_iterator.get_next().await,
890 Err(fidl::Error::ClientChannelClosed { status: zx_status::Status::INVALID_ARGS, .. })
891 );
892
893 let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
895 assert!(accessor
896 .r#stream_diagnostics(
897 &StreamParameters {
898 data_type: Some(DataType::Logs),
899 stream_mode: Some(StreamMode::Snapshot),
900 format: Some(Format::Json),
901 client_selector_configuration: Some(ClientSelectorConfiguration::Selectors(
902 vec![SelectorArgument::RawSelector("foo:root".to_string()),]
903 )),
904 ..Default::default()
905 },
906 server_end
907 )
908 .is_ok());
909
910 assert!(batch_iterator.get_next().await.is_ok());
911 }
912
913 #[fuchsia::test]
914 async fn accessor_skips_invalid_selectors() {
915 let scope = fasync::Scope::new();
916 let (accessor, stream) =
917 fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>();
918 let pipeline = Arc::new(Pipeline::for_test(None));
919 let inspector = Inspector::default();
920 let log_repo =
921 LogsRepository::new(1_000_000, std::iter::empty(), inspector.root(), scope.new_child());
922 let inspect_repo =
923 Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], scope.new_child()));
924 let server = Arc::new(ArchiveAccessorServer::new(
925 inspect_repo,
926 log_repo,
927 4,
928 BatchRetrievalTimeout::max(),
929 scope.new_child(),
930 ));
931 server.spawn_server(pipeline, stream);
932
933 let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
935
936 assert!(accessor
937 .r#stream_diagnostics(
938 &StreamParameters {
939 data_type: Some(DataType::Inspect),
940 stream_mode: Some(StreamMode::Snapshot),
941 format: Some(Format::Json),
942 client_selector_configuration: Some(ClientSelectorConfiguration::Selectors(
943 vec![
944 SelectorArgument::RawSelector("invalid".to_string()),
945 SelectorArgument::RawSelector("valid:root".to_string()),
946 ]
947 )),
948 ..Default::default()
949 },
950 server_end
951 )
952 .is_ok());
953
954 assert!(batch_iterator.get_next().await.is_ok());
957 }
958
959 #[fuchsia::test]
960 async fn buffered_iterator_handles_two_consecutive_buffer_waits() {
961 let (client, server) = fidl::endpoints::create_proxy_and_stream::<BatchIteratorMarker>();
962 let _fut = client.get_next();
963 let mut server = server.peekable();
964 assert_matches!(server.wait_for_buffer().await, Ok(()));
965 assert_matches!(server.wait_for_buffer().await, Ok(()));
966 }
967
968 #[fuchsia::test]
969 async fn buffered_iterator_handles_peer_closed() {
970 let (client, server) = fidl::endpoints::create_proxy_and_stream::<BatchIteratorMarker>();
971 let mut server = server.peekable();
972 drop(client);
973 assert_matches!(
974 server
975 .write(vec![FormattedContent::Text(Buffer {
976 size: 1,
977 vmo: zx::Vmo::create(1).unwrap(),
978 })])
979 .await,
980 Err(IteratorError::PeerClosed)
981 );
982 }
983
984 #[fuchsia::test]
985 fn socket_writer_handles_text() {
986 let vmo = zx::Vmo::create(1).unwrap();
987 vmo.write(&[5u8], 0).unwrap();
988 let koid = vmo.get_koid().unwrap();
989 let text = FormattedContent::Text(Buffer { size: 1, vmo });
990 let result = get_buffer_from_formatted_content(text).unwrap();
991 assert_eq!(result.size, 1);
992 assert_eq!(result.vmo.get_koid().unwrap(), koid);
993 let mut buffer = [0];
994 result.vmo.read(&mut buffer, 0).unwrap();
995 assert_eq!(buffer[0], 5);
996 }
997
998 #[fuchsia::test]
999 fn socket_writer_does_not_handle_cbor() {
1000 let vmo = zx::Vmo::create(1).unwrap();
1001 vmo.write(&[5u8], 0).unwrap();
1002 let text = FormattedContent::Cbor(vmo);
1003 let result = get_buffer_from_formatted_content(text);
1004 assert_matches!(result, Err(AccessorError::UnsupportedFormat));
1005 }
1006
1007 #[fuchsia::test]
1008 async fn socket_writer_handles_closed_socket() {
1009 let (local, remote) = zx::Socket::create_stream();
1010 drop(local);
1011 let mut remote = fuchsia_async::Socket::from_socket(remote);
1012 {
1013 let result = ArchiveAccessorWriter::write(
1014 &mut remote,
1015 vec![FormattedContent::Text(Buffer { size: 1, vmo: zx::Vmo::create(1).unwrap() })],
1016 )
1017 .await;
1018 assert_matches!(result, Ok(()));
1019 }
1020 remote.wait_for_close().await;
1021 }
1022
1023 #[fuchsia::test]
1024 fn batch_iterator_terminates_on_client_disconnect() {
1025 let mut executor = fasync::TestExecutor::new();
1026 let (batch_iterator_proxy, stream) =
1027 fidl::endpoints::create_proxy_and_stream::<BatchIteratorMarker>();
1028 let batch_iterator = BatchIterator::new(
1030 futures::stream::pending::<diagnostics_data::Data<diagnostics_data::Logs>>(),
1031 stream.peekable(),
1032 StreamMode::Subscribe,
1033 Arc::new(AccessorStats::new(Node::default()).new_inspect_batch_iterator()),
1034 None,
1035 ftrace::Id::random(),
1036 Format::Json,
1037 )
1038 .expect("create batch iterator");
1039
1040 let mut batch_iterator_fut = batch_iterator.run().boxed();
1041 assert!(executor.run_until_stalled(&mut batch_iterator_fut).is_pending());
1042
1043 let mut iterator_request_fut = batch_iterator_proxy.get_next();
1045 assert!(executor.run_until_stalled(&mut iterator_request_fut).is_pending());
1046 assert!(executor.run_until_stalled(&mut batch_iterator_fut).is_pending());
1047 assert!(executor.run_until_stalled(&mut iterator_request_fut).is_pending());
1048
1049 drop(iterator_request_fut);
1052 drop(batch_iterator_proxy);
1053 assert_matches!(executor.run_singlethreaded(&mut batch_iterator_fut), Ok(()));
1054 }
1055
1056 #[fuchsia::test]
1057 async fn batch_iterator_on_ready_is_called() {
1058 let scope = fasync::Scope::new();
1059 let (accessor, stream) =
1060 fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>();
1061 let pipeline = Arc::new(Pipeline::for_test(None));
1062 let inspector = Inspector::default();
1063 let log_repo =
1064 LogsRepository::new(1_000_000, std::iter::empty(), inspector.root(), scope.new_child());
1065 let inspect_repo =
1066 Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], scope.new_child()));
1067 let server = ArchiveAccessorServer::new(
1068 inspect_repo,
1069 log_repo,
1070 4,
1071 BatchRetrievalTimeout::max(),
1072 scope.new_child(),
1073 );
1074 server.spawn_server(pipeline, stream);
1075
1076 let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
1078 assert!(accessor
1079 .r#stream_diagnostics(
1080 &StreamParameters {
1081 data_type: Some(DataType::Logs),
1082 stream_mode: Some(StreamMode::Subscribe),
1083 format: Some(Format::Json),
1084 client_selector_configuration: Some(ClientSelectorConfiguration::SelectAll(
1085 true
1086 )),
1087 ..Default::default()
1088 },
1089 server_end
1090 )
1091 .is_ok());
1092
1093 assert!(batch_iterator.wait_for_ready().await.is_ok());
1095 }
1096}