archivist_lib/
accessor.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::constants::FORMATTED_CONTENT_CHUNK_SIZE_TARGET;
6use crate::diagnostics::{BatchIteratorConnectionStats, TRACE_CATEGORY};
7use crate::error::AccessorError;
8use crate::formatter::{
9    new_batcher, FXTPacketSerializer, FormattedStream, JsonPacketSerializer, SerializedVmo,
10};
11use crate::inspect::repository::InspectRepository;
12use crate::inspect::ReaderServer;
13use crate::logs::container::CursorItem;
14use crate::logs::repository::LogsRepository;
15use crate::pipeline::Pipeline;
16use diagnostics_data::{Data, DiagnosticsData, ExtendedMoniker, Metadata};
17use fidl::endpoints::{ControlHandle, RequestStream};
18use fidl_fuchsia_diagnostics::{
19    ArchiveAccessorRequest, ArchiveAccessorRequestStream, BatchIteratorControlHandle,
20    BatchIteratorRequest, BatchIteratorRequestStream, ClientSelectorConfiguration, DataType,
21    Format, FormattedContent, PerformanceConfiguration, Selector, SelectorArgument, StreamMode,
22    StreamParameters, StringSelector, TreeSelector, TreeSelectorUnknown,
23};
24use fidl_fuchsia_mem::Buffer;
25use fuchsia_inspect::NumericProperty;
26use fuchsia_sync::Mutex;
27use futures::future::{select, Either};
28use futures::prelude::*;
29use futures::stream::Peekable;
30use futures::{pin_mut, StreamExt};
31use log::warn;
32use selectors::FastError;
33use serde::Serialize;
34use std::collections::HashMap;
35use std::pin::Pin;
36use std::sync::Arc;
37use thiserror::Error;
38use {fidl_fuchsia_diagnostics_host as fhost, fuchsia_async as fasync, fuchsia_trace as ftrace};
39
40#[derive(Debug, Copy, Clone)]
41pub struct BatchRetrievalTimeout(i64);
42
43impl BatchRetrievalTimeout {
44    pub fn from_seconds(s: i64) -> Self {
45        Self(s)
46    }
47
48    #[cfg(test)]
49    pub fn max() -> Self {
50        Self::from_seconds(-1)
51    }
52
53    pub fn seconds(&self) -> i64 {
54        if self.0 > 0 {
55            self.0
56        } else {
57            i64::MAX
58        }
59    }
60}
61
62/// ArchiveAccessorServer represents an incoming connection from a client to an Archivist
63/// instance, through which the client may make Reader requests to the various data
64/// sources the Archivist offers.
65pub struct ArchiveAccessorServer {
66    inspect_repository: Arc<InspectRepository>,
67    logs_repository: Arc<LogsRepository>,
68    maximum_concurrent_snapshots_per_reader: u64,
69    scope: fasync::Scope,
70    default_batch_timeout_seconds: BatchRetrievalTimeout,
71}
72
73fn validate_and_parse_selectors(
74    selector_args: Vec<SelectorArgument>,
75) -> Result<Vec<Selector>, AccessorError> {
76    let mut selectors = vec![];
77    let mut errors = vec![];
78
79    if selector_args.is_empty() {
80        return Err(AccessorError::EmptySelectors);
81    }
82
83    for selector_arg in selector_args {
84        match selectors::take_from_argument::<FastError>(selector_arg) {
85            Ok(s) => selectors.push(s),
86            Err(e) => errors.push(e),
87        }
88    }
89
90    if !errors.is_empty() {
91        warn!(errors:?; "Found errors in selector arguments");
92    }
93
94    Ok(selectors)
95}
96
97fn validate_and_parse_log_selectors(
98    selector_args: Vec<SelectorArgument>,
99) -> Result<Vec<Selector>, AccessorError> {
100    // Only accept selectors of the type: `component:root` for logs for now.
101    let selectors = validate_and_parse_selectors(selector_args)?;
102    for selector in &selectors {
103        // Unwrap safe: Previous validation discards any selector without a node.
104        let tree_selector = selector.tree_selector.as_ref().unwrap();
105        match tree_selector {
106            TreeSelector::PropertySelector(_) => {
107                return Err(AccessorError::InvalidLogSelector);
108            }
109            TreeSelector::SubtreeSelector(subtree_selector) => {
110                if subtree_selector.node_path.len() != 1 {
111                    return Err(AccessorError::InvalidLogSelector);
112                }
113                match &subtree_selector.node_path[0] {
114                    StringSelector::ExactMatch(val) if val == "root" => {}
115                    StringSelector::StringPattern(val) if val == "root" => {}
116                    _ => {
117                        return Err(AccessorError::InvalidLogSelector);
118                    }
119                }
120            }
121            TreeSelectorUnknown!() => {}
122        }
123    }
124    Ok(selectors)
125}
126
127impl ArchiveAccessorServer {
128    /// Create a new accessor for interacting with the archivist's data. The pipeline
129    /// parameter determines which static configurations scope/restrict the visibility of
130    /// data accessed by readers spawned by this accessor.
131    pub fn new(
132        inspect_repository: Arc<InspectRepository>,
133        logs_repository: Arc<LogsRepository>,
134        maximum_concurrent_snapshots_per_reader: u64,
135        default_batch_timeout_seconds: BatchRetrievalTimeout,
136        scope: fasync::Scope,
137    ) -> Self {
138        ArchiveAccessorServer {
139            inspect_repository,
140            logs_repository,
141            maximum_concurrent_snapshots_per_reader,
142            scope,
143            default_batch_timeout_seconds,
144        }
145    }
146
147    async fn spawn<R: ArchiveAccessorWriter + Send>(
148        pipeline: Arc<Pipeline>,
149        inspect_repo: Arc<InspectRepository>,
150        log_repo: Arc<LogsRepository>,
151        requests: R,
152        params: StreamParameters,
153        maximum_concurrent_snapshots_per_reader: u64,
154        default_batch_timeout_seconds: BatchRetrievalTimeout,
155    ) -> Result<(), AccessorError> {
156        let format = params.format.ok_or(AccessorError::MissingFormat)?;
157        if !matches!(format, Format::Json | Format::Cbor | Format::Fxt) {
158            return Err(AccessorError::UnsupportedFormat);
159        }
160        let mode = params.stream_mode.ok_or(AccessorError::MissingMode)?;
161
162        let performance_config: PerformanceConfig = PerformanceConfig::new(
163            &params,
164            maximum_concurrent_snapshots_per_reader,
165            default_batch_timeout_seconds,
166        )?;
167
168        let trace_id = ftrace::Id::random();
169        match params.data_type.ok_or(AccessorError::MissingDataType)? {
170            DataType::Inspect => {
171                let _trace_guard = ftrace::async_enter!(
172                    trace_id,
173                    TRACE_CATEGORY,
174                    c"ArchiveAccessorServer::spawn",
175                    "data_type" => "Inspect",
176                    "trace_id" => u64::from(trace_id)
177                );
178                if !matches!(mode, StreamMode::Snapshot) {
179                    return Err(AccessorError::UnsupportedMode);
180                }
181                let stats = Arc::new(pipeline.accessor_stats().new_inspect_batch_iterator());
182
183                let selectors =
184                    params.client_selector_configuration.ok_or(AccessorError::MissingSelectors)?;
185
186                let selectors = match selectors {
187                    ClientSelectorConfiguration::Selectors(selectors) => {
188                        Some(validate_and_parse_selectors(selectors)?)
189                    }
190                    ClientSelectorConfiguration::SelectAll(_) => None,
191                    _ => return Err(AccessorError::InvalidSelectors("unrecognized selectors")),
192                };
193
194                let static_hierarchy_allowlist = pipeline.static_hierarchy_allowlist();
195                let unpopulated_container_vec =
196                    inspect_repo.fetch_inspect_data(&selectors, static_hierarchy_allowlist);
197
198                let per_component_budget_opt = if unpopulated_container_vec.is_empty() {
199                    None
200                } else {
201                    performance_config
202                        .aggregated_content_limit_bytes
203                        .map(|limit| (limit as usize) / unpopulated_container_vec.len())
204                };
205
206                if let Some(max_snapshot_size) = performance_config.aggregated_content_limit_bytes {
207                    stats.global_stats().record_max_snapshot_size_config(max_snapshot_size);
208                }
209                BatchIterator::new(
210                    ReaderServer::stream(
211                        unpopulated_container_vec,
212                        performance_config,
213                        selectors,
214                        Arc::clone(&stats),
215                        trace_id,
216                    ),
217                    requests,
218                    mode,
219                    stats,
220                    per_component_budget_opt,
221                    trace_id,
222                    format,
223                )?
224                .run()
225                .await
226            }
227            DataType::Logs => {
228                if format == Format::Cbor {
229                    // CBOR is not supported for logs
230                    return Err(AccessorError::UnsupportedFormat);
231                }
232                let _trace_guard = ftrace::async_enter!(
233                    trace_id,
234                    TRACE_CATEGORY,
235                    c"ArchiveAccessorServer::spawn",
236                    "data_type" => "Logs",
237                    // An async duration cannot have multiple concurrent child async durations
238                    // so we include the nonce as metadata to manually determine relationship.
239                    "trace_id" => u64::from(trace_id)
240                );
241                let stats = Arc::new(pipeline.accessor_stats().new_logs_batch_iterator());
242                let selectors = match params.client_selector_configuration {
243                    Some(ClientSelectorConfiguration::Selectors(selectors)) => {
244                        Some(validate_and_parse_log_selectors(selectors)?)
245                    }
246                    Some(ClientSelectorConfiguration::SelectAll(_)) => None,
247                    _ => return Err(AccessorError::InvalidSelectors("unrecognized selectors")),
248                };
249                match format {
250                    Format::Fxt => {
251                        let logs = log_repo.logs_cursor_raw(mode, selectors, trace_id);
252                        BatchIterator::new_serving_fxt(
253                            logs,
254                            requests,
255                            mode,
256                            stats,
257                            trace_id,
258                            performance_config,
259                        )?
260                        .run()
261                        .await?;
262                        Ok(())
263                    }
264                    Format::Json => {
265                        let logs = log_repo
266                            .logs_cursor(mode, selectors, trace_id)
267                            .map(move |inner: _| (*inner).clone());
268                        BatchIterator::new_serving_arrays(logs, requests, mode, stats, trace_id)?
269                            .run()
270                            .await?;
271                        Ok(())
272                    }
273                    // TODO(https://fxbug.dev/401548725): Remove this from the FIDL definition.
274                    Format::Text => Err(AccessorError::UnsupportedFormat),
275                    Format::Cbor => unreachable!("CBOR is not supported for logs"),
276                }
277            }
278        }
279    }
280
281    /// Spawn an instance `fidl_fuchsia_diagnostics/Archive` that allows clients to open
282    /// reader session to diagnostics data.
283    pub fn spawn_server<RequestStream>(&self, pipeline: Arc<Pipeline>, mut stream: RequestStream)
284    where
285        RequestStream: ArchiveAccessorTranslator + Send + 'static,
286        <RequestStream as ArchiveAccessorTranslator>::InnerDataRequestChannel:
287            ArchiveAccessorWriter + Send,
288    {
289        // Self isn't guaranteed to live into the exception handling of the async block. We need to clone self
290        // to have a version that can be referenced in the exception handling.
291        let log_repo = Arc::clone(&self.logs_repository);
292        let inspect_repo = Arc::clone(&self.inspect_repository);
293        let maximum_concurrent_snapshots_per_reader = self.maximum_concurrent_snapshots_per_reader;
294        let default_batch_timeout_seconds = self.default_batch_timeout_seconds;
295        let scope = self.scope.to_handle();
296        self.scope.spawn(async move {
297            let stats = pipeline.accessor_stats();
298            stats.global_stats.connections_opened.add(1);
299            while let Some(request) = stream.next().await {
300                let control_handle = request.iterator.get_control_handle();
301                stats.global_stats.stream_diagnostics_requests.add(1);
302                let pipeline = Arc::clone(&pipeline);
303
304                // Store the batch iterator task so that we can ensure that the client finishes
305                // draining items through it when a Controller#Stop call happens. For example,
306                // this allows tests to fetch all isolated logs before finishing.
307                let inspect_repo_for_task = Arc::clone(&inspect_repo);
308                let log_repo_for_task = Arc::clone(&log_repo);
309                scope.spawn(async move {
310                    if let Err(e) = Self::spawn(
311                        pipeline,
312                        inspect_repo_for_task,
313                        log_repo_for_task,
314                        request.iterator,
315                        request.parameters,
316                        maximum_concurrent_snapshots_per_reader,
317                        default_batch_timeout_seconds,
318                    )
319                    .await
320                    {
321                        if let Some(control) = control_handle {
322                            e.close(control);
323                        }
324                    }
325                });
326            }
327            pipeline.accessor_stats().global_stats.connections_closed.add(1);
328        });
329    }
330}
331
332pub trait ArchiveAccessorWriter {
333    /// Writes diagnostics data to the remote side.
334    fn write(
335        &mut self,
336        results: Vec<FormattedContent>,
337    ) -> impl Future<Output = Result<(), IteratorError>> + Send;
338
339    /// Waits for a buffer to be available for writing into. For sockets, this is a no-op.
340    fn wait_for_buffer(&mut self) -> impl Future<Output = anyhow::Result<()>> + Send {
341        futures::future::ready(Ok(()))
342    }
343
344    /// Takes the control handle from the FIDL stream (or returns None
345    /// if the handle has already been taken, or if this is a socket.
346    fn get_control_handle(&self) -> Option<BatchIteratorControlHandle> {
347        None
348    }
349
350    /// Sends an on ready event.
351    fn maybe_respond_ready(&mut self) -> impl Future<Output = Result<(), AccessorError>> + Send {
352        futures::future::ready(Ok(()))
353    }
354
355    /// Waits for ZX_ERR_PEER_CLOSED
356    fn wait_for_close(&mut self) -> impl Future<Output = ()> + Send;
357}
358
359fn get_buffer_from_formatted_content(
360    content: fidl_fuchsia_diagnostics::FormattedContent,
361) -> Result<Buffer, AccessorError> {
362    match content {
363        FormattedContent::Json(json) => Ok(json),
364        FormattedContent::Text(text) => Ok(text),
365        _ => Err(AccessorError::UnsupportedFormat),
366    }
367}
368
369impl ArchiveAccessorWriter for fuchsia_async::Socket {
370    async fn write(&mut self, data: Vec<FormattedContent>) -> Result<(), IteratorError> {
371        if data.is_empty() {
372            return Err(IteratorError::PeerClosed);
373        }
374        let mut buf = vec![0];
375        for value in data {
376            let data = get_buffer_from_formatted_content(value)?;
377            buf.resize(data.size as usize, 0);
378            data.vmo.read(&mut buf, 0)?;
379            let res = self.write_all(&buf).await;
380            if res.is_err() {
381                // connection probably closed.
382                break;
383            }
384        }
385        Ok(())
386    }
387
388    async fn wait_for_close(&mut self) {
389        let _ = self.on_closed().await;
390    }
391}
392
393#[derive(Error, Debug)]
394pub enum IteratorError {
395    #[error("Peer closed")]
396    PeerClosed,
397    #[error(transparent)]
398    Ipc(#[from] fidl::Error),
399    #[error(transparent)]
400    AccessorError(#[from] AccessorError),
401    // This error should be unreachable. We should never
402    // fail to read from a VMO that we created, but the type system
403    // requires us to handle this.
404    #[error("Error reading from VMO: {}", source)]
405    VmoReadError {
406        #[from]
407        source: zx::Status,
408    },
409}
410
411impl ArchiveAccessorWriter for Peekable<BatchIteratorRequestStream> {
412    async fn write(&mut self, data: Vec<FormattedContent>) -> Result<(), IteratorError> {
413        loop {
414            match self.next().await {
415                Some(Ok(BatchIteratorRequest::GetNext { responder })) => {
416                    responder.send(Ok(data))?;
417                    return Ok(());
418                }
419                Some(Ok(BatchIteratorRequest::WaitForReady { responder })) => {
420                    responder.send()?;
421                }
422                Some(Ok(BatchIteratorRequest::_UnknownMethod { method_type, ordinal, .. })) => {
423                    warn!(method_type:?, ordinal; "Got unknown interaction on BatchIterator");
424                    return Err(IteratorError::PeerClosed);
425                }
426                Some(Err(err)) => return Err(err.into()),
427                None => {
428                    return Err(IteratorError::PeerClosed);
429                }
430            }
431        }
432    }
433
434    async fn maybe_respond_ready(&mut self) -> Result<(), AccessorError> {
435        let mut this = Pin::new(self);
436        if matches!(this.as_mut().peek().await, Some(Ok(BatchIteratorRequest::WaitForReady { .. })))
437        {
438            let Some(Ok(BatchIteratorRequest::WaitForReady { responder })) = this.next().await
439            else {
440                unreachable!("We already checked the next request was WaitForReady");
441            };
442            responder.send()?;
443        }
444        Ok(())
445    }
446
447    async fn wait_for_buffer(&mut self) -> anyhow::Result<()> {
448        let this = Pin::new(self);
449        match this.peek().await {
450            Some(Ok(_)) => Ok(()),
451            _ => Err(IteratorError::PeerClosed.into()),
452        }
453    }
454
455    fn get_control_handle(&self) -> Option<BatchIteratorControlHandle> {
456        Some(self.get_ref().control_handle())
457    }
458
459    async fn wait_for_close(&mut self) {
460        let _ = self.get_ref().control_handle().on_closed().await;
461    }
462}
463
464pub struct ArchiveIteratorRequest<R> {
465    parameters: StreamParameters,
466    iterator: R,
467}
468
469/// Translation trait used to support both remote and
470/// local ArchiveAccessor implementations.
471pub trait ArchiveAccessorTranslator {
472    type InnerDataRequestChannel;
473    fn next(
474        &mut self,
475    ) -> impl Future<Output = Option<ArchiveIteratorRequest<Self::InnerDataRequestChannel>>> + Send;
476}
477
478impl ArchiveAccessorTranslator for fhost::ArchiveAccessorRequestStream {
479    type InnerDataRequestChannel = fuchsia_async::Socket;
480
481    async fn next(&mut self) -> Option<ArchiveIteratorRequest<Self::InnerDataRequestChannel>> {
482        match StreamExt::next(self).await {
483            Some(Ok(fhost::ArchiveAccessorRequest::StreamDiagnostics {
484                parameters,
485                responder,
486                stream,
487            })) => {
488                // It's fine for the client to send us a socket
489                // and discard the channel without waiting for a response.
490                // Future communication takes place over the socket so
491                // the client may opt to use this as an optimization.
492                let _ = responder.send();
493                Some(ArchiveIteratorRequest {
494                    iterator: fuchsia_async::Socket::from_socket(stream),
495                    parameters,
496                })
497            }
498            _ => None,
499        }
500    }
501}
502
503impl ArchiveAccessorTranslator for ArchiveAccessorRequestStream {
504    type InnerDataRequestChannel = Peekable<BatchIteratorRequestStream>;
505
506    async fn next(&mut self) -> Option<ArchiveIteratorRequest<Self::InnerDataRequestChannel>> {
507        loop {
508            match StreamExt::next(self).await {
509                Some(Ok(ArchiveAccessorRequest::StreamDiagnostics {
510                    control_handle: _,
511                    result_stream,
512                    stream_parameters,
513                })) => {
514                    return Some(ArchiveIteratorRequest {
515                        iterator: result_stream.into_stream().peekable(),
516                        parameters: stream_parameters,
517                    })
518                }
519                Some(Ok(ArchiveAccessorRequest::WaitForReady { responder })) => {
520                    let _ = responder.send();
521                }
522                _ => return None,
523            }
524        }
525    }
526}
527struct SchemaTruncationCounter {
528    truncated_schemas: u64,
529    total_schemas: u64,
530}
531
532impl SchemaTruncationCounter {
533    pub fn new() -> Arc<Mutex<Self>> {
534        Arc::new(Mutex::new(Self { truncated_schemas: 0, total_schemas: 0 }))
535    }
536}
537
538pub(crate) struct BatchIterator<R> {
539    requests: R,
540    stats: Arc<BatchIteratorConnectionStats>,
541    data: FormattedStream,
542    truncation_counter: Option<Arc<Mutex<SchemaTruncationCounter>>>,
543    parent_trace_id: ftrace::Id,
544}
545
546// Checks if a given schema is within a components budget, and if it is, updates the budget,
547// then returns true. Otherwise, if the schema is not within budget, returns false.
548fn maybe_update_budget(
549    budget_map: &mut HashMap<ExtendedMoniker, usize>,
550    moniker: &ExtendedMoniker,
551    bytes: usize,
552    byte_limit: usize,
553) -> bool {
554    if let Some(remaining_budget) = budget_map.get_mut(moniker) {
555        if *remaining_budget + bytes > byte_limit {
556            false
557        } else {
558            *remaining_budget += bytes;
559            true
560        }
561    } else if bytes > byte_limit {
562        budget_map.insert(moniker.clone(), 0);
563        false
564    } else {
565        budget_map.insert(moniker.clone(), bytes);
566        true
567    }
568}
569
570impl<R> BatchIterator<R>
571where
572    R: ArchiveAccessorWriter + Send,
573{
574    pub fn new<Items, D>(
575        data: Items,
576        requests: R,
577        mode: StreamMode,
578        stats: Arc<BatchIteratorConnectionStats>,
579        per_component_byte_limit_opt: Option<usize>,
580        parent_trace_id: ftrace::Id,
581        format: Format,
582    ) -> Result<Self, AccessorError>
583    where
584        Items: Stream<Item = Data<D>> + Send + 'static,
585        D: DiagnosticsData + 'static,
586    {
587        let result_stats_for_fut = Arc::clone(&stats);
588
589        let budget_tracker_shared = Arc::new(Mutex::new(HashMap::new()));
590
591        let truncation_counter = SchemaTruncationCounter::new();
592        let stream_owned_counter_for_fut = Arc::clone(&truncation_counter);
593
594        let data = data.then(move |mut d| {
595            let stream_owned_counter = Arc::clone(&stream_owned_counter_for_fut);
596            let result_stats = Arc::clone(&result_stats_for_fut);
597            let budget_tracker = Arc::clone(&budget_tracker_shared);
598            async move {
599                let trace_id = ftrace::Id::random();
600                let _trace_guard = ftrace::async_enter!(
601                    trace_id,
602                    TRACE_CATEGORY,
603                    c"BatchIterator::new.serialize",
604                    // An async duration cannot have multiple concurrent child async durations
605                    // so we include the nonce as metadata to manually determine relationship.
606                    "parent_trace_id" => u64::from(parent_trace_id),
607                    "trace_id" => u64::from(trace_id),
608                    "moniker" => d.moniker.to_string().as_ref()
609                );
610                let mut unlocked_counter = stream_owned_counter.lock();
611                let mut tracker_guard = budget_tracker.lock();
612                unlocked_counter.total_schemas += 1;
613                if d.metadata.has_errors() {
614                    result_stats.add_result_error();
615                }
616
617                match SerializedVmo::serialize(&d, D::DATA_TYPE, format) {
618                    Err(e) => {
619                        result_stats.add_result_error();
620                        Err(e)
621                    }
622                    Ok(contents) => {
623                        result_stats.add_result();
624                        match per_component_byte_limit_opt {
625                            Some(x) => {
626                                if maybe_update_budget(
627                                    &mut tracker_guard,
628                                    &d.moniker,
629                                    contents.size as usize,
630                                    x,
631                                ) {
632                                    Ok(contents)
633                                } else {
634                                    result_stats.add_schema_truncated();
635                                    unlocked_counter.truncated_schemas += 1;
636                                    d.drop_payload();
637                                    // TODO(66085): If a payload is truncated, cache the
638                                    // new schema so that we can reuse if other schemas from
639                                    // the same component get dropped.
640                                    SerializedVmo::serialize(&d, D::DATA_TYPE, format)
641                                }
642                            }
643                            None => Ok(contents),
644                        }
645                    }
646                }
647            }
648        });
649
650        Self::new_inner(
651            new_batcher(data, Arc::clone(&stats), mode),
652            requests,
653            stats,
654            Some(truncation_counter),
655            parent_trace_id,
656        )
657    }
658
659    pub fn new_serving_fxt<S>(
660        data: S,
661        requests: R,
662        mode: StreamMode,
663        stats: Arc<BatchIteratorConnectionStats>,
664        parent_trace_id: ftrace::Id,
665        performance_config: PerformanceConfig,
666    ) -> Result<Self, AccessorError>
667    where
668        S: Stream<Item = CursorItem> + Send + Unpin + 'static,
669    {
670        let data = FXTPacketSerializer::new(
671            Arc::clone(&stats),
672            performance_config
673                .aggregated_content_limit_bytes
674                .unwrap_or(FORMATTED_CONTENT_CHUNK_SIZE_TARGET),
675            data,
676        );
677        Self::new_inner(
678            new_batcher(data, Arc::clone(&stats), mode),
679            requests,
680            stats,
681            None,
682            parent_trace_id,
683        )
684    }
685
686    pub fn new_serving_arrays<D, S>(
687        data: S,
688        requests: R,
689        mode: StreamMode,
690        stats: Arc<BatchIteratorConnectionStats>,
691        parent_trace_id: ftrace::Id,
692    ) -> Result<Self, AccessorError>
693    where
694        D: Serialize + Send + 'static,
695        S: Stream<Item = D> + Send + Unpin + 'static,
696    {
697        let data = JsonPacketSerializer::new(
698            Arc::clone(&stats),
699            FORMATTED_CONTENT_CHUNK_SIZE_TARGET,
700            data,
701        );
702        Self::new_inner(
703            new_batcher(data, Arc::clone(&stats), mode),
704            requests,
705            stats,
706            None,
707            parent_trace_id,
708        )
709    }
710
711    fn new_inner(
712        data: FormattedStream,
713        requests: R,
714        stats: Arc<BatchIteratorConnectionStats>,
715        truncation_counter: Option<Arc<Mutex<SchemaTruncationCounter>>>,
716        parent_trace_id: ftrace::Id,
717    ) -> Result<Self, AccessorError> {
718        stats.open_connection();
719        Ok(Self { data, requests, stats, truncation_counter, parent_trace_id })
720    }
721
722    pub async fn run(mut self) -> Result<(), AccessorError> {
723        self.requests.maybe_respond_ready().await?;
724        while self.requests.wait_for_buffer().await.is_ok() {
725            self.stats.add_request();
726            let start_time = zx::MonotonicInstant::get();
727            let trace_id = ftrace::Id::random();
728            let _trace_guard = ftrace::async_enter!(
729                trace_id,
730                TRACE_CATEGORY,
731                c"BatchIterator::run.get_send_batch",
732                // An async duration cannot have multiple concurrent child async durations
733                // so we include the nonce as metadata to manually determine relationship.
734                "parent_trace_id" => u64::from(self.parent_trace_id),
735                "trace_id" => u64::from(trace_id)
736            );
737            let batch = {
738                let wait_for_close = self.requests.wait_for_close();
739                let next_data = self.data.next();
740                pin_mut!(wait_for_close);
741                match select(next_data, wait_for_close).await {
742                    // if we get None back, treat that as a terminal batch with an empty vec
743                    Either::Left((batch_option, _)) => batch_option.unwrap_or_default(),
744                    // if the client closes the channel, stop waiting and terminate.
745                    Either::Right(_) => break,
746                }
747            };
748
749            // turn errors into epitaphs -- we drop intermediate items if there was an error midway
750            let batch = batch.into_iter().collect::<Result<Vec<_>, _>>()?;
751
752            // increment counters
753            self.stats.add_response();
754            if batch.is_empty() {
755                if let Some(truncation_count) = &self.truncation_counter {
756                    let unlocked_count = truncation_count.lock();
757                    if unlocked_count.total_schemas > 0 {
758                        self.stats.global_stats().record_percent_truncated_schemas(
759                            ((unlocked_count.truncated_schemas as f32
760                                / unlocked_count.total_schemas as f32)
761                                * 100.0)
762                                .round() as u64,
763                        );
764                    }
765                }
766                self.stats.add_terminal();
767            }
768            self.stats
769                .global_stats()
770                .record_batch_duration(zx::MonotonicInstant::get() - start_time);
771            if self.requests.write(batch).await.is_err() {
772                // Peer closed, end the stream.
773                break;
774            }
775        }
776        Ok(())
777    }
778}
779
780impl<R> Drop for BatchIterator<R> {
781    fn drop(&mut self) {
782        self.stats.close_connection();
783    }
784}
785
786pub struct PerformanceConfig {
787    pub batch_timeout_sec: i64,
788    pub aggregated_content_limit_bytes: Option<u64>,
789    pub maximum_concurrent_snapshots_per_reader: u64,
790}
791
792impl PerformanceConfig {
793    fn new(
794        params: &StreamParameters,
795        maximum_concurrent_snapshots_per_reader: u64,
796        default_batch_timeout_seconds: BatchRetrievalTimeout,
797    ) -> Result<PerformanceConfig, AccessorError> {
798        let batch_timeout = match params {
799            // If only nested batch retrieval timeout is definitely not set,
800            // use the optional outer field.
801            StreamParameters {
802                batch_retrieval_timeout_seconds,
803                performance_configuration: None,
804                ..
805            }
806            | StreamParameters {
807                batch_retrieval_timeout_seconds,
808                performance_configuration:
809                    Some(PerformanceConfiguration { batch_retrieval_timeout_seconds: None, .. }),
810                ..
811            } => batch_retrieval_timeout_seconds,
812            // If the outer field is definitely not set, and the inner field might be,
813            // use the inner field.
814            StreamParameters {
815                batch_retrieval_timeout_seconds: None,
816                performance_configuration:
817                    Some(PerformanceConfiguration { batch_retrieval_timeout_seconds, .. }),
818                ..
819            } => batch_retrieval_timeout_seconds,
820            // Both the inner and outer fields are set, which is an error.
821            _ => return Err(AccessorError::DuplicateBatchTimeout),
822        }
823        .map(BatchRetrievalTimeout::from_seconds)
824        .unwrap_or(default_batch_timeout_seconds);
825
826        let aggregated_content_limit_bytes = match params {
827            StreamParameters {
828                performance_configuration:
829                    Some(PerformanceConfiguration { max_aggregate_content_size_bytes, .. }),
830                ..
831            } => *max_aggregate_content_size_bytes,
832            _ => None,
833        };
834
835        Ok(PerformanceConfig {
836            batch_timeout_sec: batch_timeout.seconds(),
837            aggregated_content_limit_bytes,
838            maximum_concurrent_snapshots_per_reader,
839        })
840    }
841}
842
843#[cfg(test)]
844mod tests {
845    use super::*;
846    use crate::diagnostics::AccessorStats;
847    use assert_matches::assert_matches;
848    use fidl_fuchsia_diagnostics::{ArchiveAccessorMarker, BatchIteratorMarker};
849    use fuchsia_inspect::{Inspector, Node};
850    use zx::AsHandleRef;
851
852    #[fuchsia::test]
853    async fn logs_only_accept_basic_component_selectors() {
854        let scope = fasync::Scope::new();
855        let (accessor, stream) =
856            fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>();
857        let pipeline = Arc::new(Pipeline::for_test(None));
858        let inspector = Inspector::default();
859        let log_repo =
860            LogsRepository::new(1_000_000, std::iter::empty(), inspector.root(), scope.new_child());
861        let inspect_repo =
862            Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], scope.new_child()));
863        let server = ArchiveAccessorServer::new(
864            inspect_repo,
865            log_repo,
866            4,
867            BatchRetrievalTimeout::max(),
868            scope.new_child(),
869        );
870        server.spawn_server(pipeline, stream);
871
872        // A selector of the form `component:node/path:property` is rejected.
873        let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
874        assert!(accessor
875            .r#stream_diagnostics(
876                &StreamParameters {
877                    data_type: Some(DataType::Logs),
878                    stream_mode: Some(StreamMode::SnapshotThenSubscribe),
879                    format: Some(Format::Json),
880                    client_selector_configuration: Some(ClientSelectorConfiguration::Selectors(
881                        vec![SelectorArgument::RawSelector("foo:root/bar:baz".to_string()),]
882                    )),
883                    ..Default::default()
884                },
885                server_end
886            )
887            .is_ok());
888        assert_matches!(
889            batch_iterator.get_next().await,
890            Err(fidl::Error::ClientChannelClosed { status: zx_status::Status::INVALID_ARGS, .. })
891        );
892
893        // A selector of the form `component:root` is accepted.
894        let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
895        assert!(accessor
896            .r#stream_diagnostics(
897                &StreamParameters {
898                    data_type: Some(DataType::Logs),
899                    stream_mode: Some(StreamMode::Snapshot),
900                    format: Some(Format::Json),
901                    client_selector_configuration: Some(ClientSelectorConfiguration::Selectors(
902                        vec![SelectorArgument::RawSelector("foo:root".to_string()),]
903                    )),
904                    ..Default::default()
905                },
906                server_end
907            )
908            .is_ok());
909
910        assert!(batch_iterator.get_next().await.is_ok());
911    }
912
913    #[fuchsia::test]
914    async fn accessor_skips_invalid_selectors() {
915        let scope = fasync::Scope::new();
916        let (accessor, stream) =
917            fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>();
918        let pipeline = Arc::new(Pipeline::for_test(None));
919        let inspector = Inspector::default();
920        let log_repo =
921            LogsRepository::new(1_000_000, std::iter::empty(), inspector.root(), scope.new_child());
922        let inspect_repo =
923            Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], scope.new_child()));
924        let server = Arc::new(ArchiveAccessorServer::new(
925            inspect_repo,
926            log_repo,
927            4,
928            BatchRetrievalTimeout::max(),
929            scope.new_child(),
930        ));
931        server.spawn_server(pipeline, stream);
932
933        // A selector of the form `component:node/path:property` is rejected.
934        let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
935
936        assert!(accessor
937            .r#stream_diagnostics(
938                &StreamParameters {
939                    data_type: Some(DataType::Inspect),
940                    stream_mode: Some(StreamMode::Snapshot),
941                    format: Some(Format::Json),
942                    client_selector_configuration: Some(ClientSelectorConfiguration::Selectors(
943                        vec![
944                            SelectorArgument::RawSelector("invalid".to_string()),
945                            SelectorArgument::RawSelector("valid:root".to_string()),
946                        ]
947                    )),
948                    ..Default::default()
949                },
950                server_end
951            )
952            .is_ok());
953
954        // The batch iterator proxy should remain valid and providing responses regardless of the
955        // invalid selectors that were given.
956        assert!(batch_iterator.get_next().await.is_ok());
957    }
958
959    #[fuchsia::test]
960    async fn buffered_iterator_handles_two_consecutive_buffer_waits() {
961        let (client, server) = fidl::endpoints::create_proxy_and_stream::<BatchIteratorMarker>();
962        let _fut = client.get_next();
963        let mut server = server.peekable();
964        assert_matches!(server.wait_for_buffer().await, Ok(()));
965        assert_matches!(server.wait_for_buffer().await, Ok(()));
966    }
967
968    #[fuchsia::test]
969    async fn buffered_iterator_handles_peer_closed() {
970        let (client, server) = fidl::endpoints::create_proxy_and_stream::<BatchIteratorMarker>();
971        let mut server = server.peekable();
972        drop(client);
973        assert_matches!(
974            server
975                .write(vec![FormattedContent::Text(Buffer {
976                    size: 1,
977                    vmo: zx::Vmo::create(1).unwrap(),
978                })])
979                .await,
980            Err(IteratorError::PeerClosed)
981        );
982    }
983
984    #[fuchsia::test]
985    fn socket_writer_handles_text() {
986        let vmo = zx::Vmo::create(1).unwrap();
987        vmo.write(&[5u8], 0).unwrap();
988        let koid = vmo.get_koid().unwrap();
989        let text = FormattedContent::Text(Buffer { size: 1, vmo });
990        let result = get_buffer_from_formatted_content(text).unwrap();
991        assert_eq!(result.size, 1);
992        assert_eq!(result.vmo.get_koid().unwrap(), koid);
993        let mut buffer = [0];
994        result.vmo.read(&mut buffer, 0).unwrap();
995        assert_eq!(buffer[0], 5);
996    }
997
998    #[fuchsia::test]
999    fn socket_writer_does_not_handle_cbor() {
1000        let vmo = zx::Vmo::create(1).unwrap();
1001        vmo.write(&[5u8], 0).unwrap();
1002        let text = FormattedContent::Cbor(vmo);
1003        let result = get_buffer_from_formatted_content(text);
1004        assert_matches!(result, Err(AccessorError::UnsupportedFormat));
1005    }
1006
1007    #[fuchsia::test]
1008    async fn socket_writer_handles_closed_socket() {
1009        let (local, remote) = zx::Socket::create_stream();
1010        drop(local);
1011        let mut remote = fuchsia_async::Socket::from_socket(remote);
1012        {
1013            let result = ArchiveAccessorWriter::write(
1014                &mut remote,
1015                vec![FormattedContent::Text(Buffer { size: 1, vmo: zx::Vmo::create(1).unwrap() })],
1016            )
1017            .await;
1018            assert_matches!(result, Ok(()));
1019        }
1020        remote.wait_for_close().await;
1021    }
1022
1023    #[fuchsia::test]
1024    fn batch_iterator_terminates_on_client_disconnect() {
1025        let mut executor = fasync::TestExecutor::new();
1026        let (batch_iterator_proxy, stream) =
1027            fidl::endpoints::create_proxy_and_stream::<BatchIteratorMarker>();
1028        // Create a batch iterator that uses a hung stream to serve logs.
1029        let batch_iterator = BatchIterator::new(
1030            futures::stream::pending::<diagnostics_data::Data<diagnostics_data::Logs>>(),
1031            stream.peekable(),
1032            StreamMode::Subscribe,
1033            Arc::new(AccessorStats::new(Node::default()).new_inspect_batch_iterator()),
1034            None,
1035            ftrace::Id::random(),
1036            Format::Json,
1037        )
1038        .expect("create batch iterator");
1039
1040        let mut batch_iterator_fut = batch_iterator.run().boxed();
1041        assert!(executor.run_until_stalled(&mut batch_iterator_fut).is_pending());
1042
1043        // After sending a request, the request should be unfulfilled.
1044        let mut iterator_request_fut = batch_iterator_proxy.get_next();
1045        assert!(executor.run_until_stalled(&mut iterator_request_fut).is_pending());
1046        assert!(executor.run_until_stalled(&mut batch_iterator_fut).is_pending());
1047        assert!(executor.run_until_stalled(&mut iterator_request_fut).is_pending());
1048
1049        // After closing the client end of the channel, the server should terminate and release
1050        // resources.
1051        drop(iterator_request_fut);
1052        drop(batch_iterator_proxy);
1053        assert_matches!(executor.run_singlethreaded(&mut batch_iterator_fut), Ok(()));
1054    }
1055
1056    #[fuchsia::test]
1057    async fn batch_iterator_on_ready_is_called() {
1058        let scope = fasync::Scope::new();
1059        let (accessor, stream) =
1060            fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>();
1061        let pipeline = Arc::new(Pipeline::for_test(None));
1062        let inspector = Inspector::default();
1063        let log_repo =
1064            LogsRepository::new(1_000_000, std::iter::empty(), inspector.root(), scope.new_child());
1065        let inspect_repo =
1066            Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], scope.new_child()));
1067        let server = ArchiveAccessorServer::new(
1068            inspect_repo,
1069            log_repo,
1070            4,
1071            BatchRetrievalTimeout::max(),
1072            scope.new_child(),
1073        );
1074        server.spawn_server(pipeline, stream);
1075
1076        // A selector of the form `component:node/path:property` is rejected.
1077        let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
1078        assert!(accessor
1079            .r#stream_diagnostics(
1080                &StreamParameters {
1081                    data_type: Some(DataType::Logs),
1082                    stream_mode: Some(StreamMode::Subscribe),
1083                    format: Some(Format::Json),
1084                    client_selector_configuration: Some(ClientSelectorConfiguration::SelectAll(
1085                        true
1086                    )),
1087                    ..Default::default()
1088                },
1089                server_end
1090            )
1091            .is_ok());
1092
1093        // We receive a response for WaitForReady
1094        assert!(batch_iterator.wait_for_ready().await.is_ok());
1095    }
1096}