archivist_lib/
archivist.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::accessor::{ArchiveAccessorServer, BatchRetrievalTimeout};
6use crate::component_lifecycle;
7use crate::error::Error;
8use crate::events::router::{ConsumerConfig, EventRouter, ProducerConfig};
9use crate::events::sources::EventSource;
10use crate::events::types::*;
11use crate::identity::ComponentIdentity;
12use crate::inspect::container::InspectHandle;
13use crate::inspect::repository::InspectRepository;
14use crate::inspect::servers::*;
15use crate::logs::debuglog::KernelDebugLog;
16use crate::logs::repository::{ComponentInitialInterest, LogsRepository};
17use crate::logs::serial::{SerialSink, launch_serial};
18use crate::logs::servers::*;
19use crate::pipeline::PipelineManager;
20use archivist_config::Config;
21use fidl_fuchsia_process_lifecycle::LifecycleRequestStream;
22use fuchsia_component::server::{ServiceFs, ServiceObj};
23use fuchsia_inspect::component;
24use fuchsia_inspect::health::Reporter;
25use futures::channel::mpsc::unbounded;
26use futures::prelude::*;
27use log::{debug, error, info, warn};
28use moniker::ExtendedMoniker;
29use std::path::PathBuf;
30use std::str::FromStr;
31use std::sync::Arc;
32use {fidl_fuchsia_component_sandbox as fsandbox, fidl_fuchsia_io as fio, fuchsia_async as fasync};
33
34/// Responsible for initializing an `Archivist` instance. Supports multiple configurations by
35/// either calling or not calling methods on the builder like `serve_test_controller_protocol`.
36pub struct Archivist {
37    /// Handles event routing between archivist parts.
38    event_router: EventRouter,
39
40    /// The diagnostics pipelines that have been installed.
41    pipeline_manager: PipelineManager,
42
43    /// The repository holding Inspect data.
44    _inspect_repository: Arc<InspectRepository>,
45
46    /// The repository holding active log connections.
47    logs_repository: Arc<LogsRepository>,
48
49    /// The server handling fuchsia.diagnostics.ArchiveAccessor
50    accessor_server: Arc<ArchiveAccessorServer>,
51
52    /// The server handling fuchsia.logger.Log
53    log_server: Arc<LogServer>,
54
55    /// The server handling fuchsia.diagnostics.system.LogFlush
56    flush_server: Arc<LogFlushServer>,
57
58    /// The server handling fuchsia.diagnostics.LogStream
59    log_stream_server: Arc<LogStreamServer>,
60
61    /// The server handling fuchsia.inspect.InspectSink
62    _inspect_sink_server: Arc<InspectSinkServer>,
63
64    /// Top level scope.
65    general_scope: fasync::Scope,
66
67    /// Tasks receiving external events from component manager.
68    incoming_events_scope: fasync::Scope,
69
70    /// All tasks for FIDL servers that ingest data into the Archivist must run in this scope.
71    servers_scope: fasync::Scope,
72
73    /// Freeze server.
74    freeze_server: LogFreezeServer,
75
76    /// The server handling fuchsia.diagnostics.Sample
77    sample_server: Arc<SampleServer>,
78}
79
80impl Archivist {
81    /// Creates new instance, sets up inspect and adds 'archive' directory to output folder.
82    /// Also installs `fuchsia.diagnostics.Archive` service.
83    /// Call `install_log_services`
84    pub async fn new(ring_buffer: ring_buffer::Reader, config: Config) -> Self {
85        let general_scope = fasync::Scope::new_with_name("general");
86        let servers_scope = fasync::Scope::new_with_name("servers");
87
88        // Initialize the pipelines that the archivist will expose.
89        let pipeline_manager = PipelineManager::new(
90            PathBuf::from(&config.pipelines_path),
91            component::inspector().root().create_child("pipelines"),
92            component::inspector().root().create_child("archive_accessor_stats"),
93            general_scope.new_child_with_name("pipeline_manager"),
94        )
95        .await;
96
97        // Initialize the core event router
98        let mut event_router =
99            EventRouter::new(component::inspector().root().create_child("events"));
100        let incoming_events_scope = general_scope.new_child_with_name("incoming_events");
101        Self::initialize_external_event_sources(&mut event_router, &incoming_events_scope).await;
102
103        let initial_interests =
104            config.component_initial_interests.into_iter().filter_map(|interest| {
105                ComponentInitialInterest::from_str(&interest)
106                    .map_err(|err| {
107                        warn!(err:?, invalid:% = interest; "Failed to load initial interest");
108                    })
109                    .ok()
110            });
111        let logs_repo = LogsRepository::new(
112            ring_buffer,
113            initial_interests,
114            component::inspector().root(),
115            general_scope.new_child_with_name("logs_repository"),
116        );
117        let (freeze_server, freeze_receiver) = LogFreezeServer::new();
118        let (flush_sender, flush_receiver) = unbounded();
119        if !config.allow_serial_logs.is_empty() {
120            let logs_repo_clone = Arc::clone(&logs_repo);
121            general_scope.spawn(async move {
122                launch_serial(
123                    config.allow_serial_logs,
124                    config.deny_serial_log_tags,
125                    logs_repo_clone,
126                    SerialSink,
127                    freeze_receiver,
128                    flush_receiver,
129                )
130                .await;
131            });
132        }
133        let inspect_repo = Arc::new(InspectRepository::new(
134            pipeline_manager.weak_pipelines(),
135            general_scope.new_child_with_name("inspect_repository"),
136        ));
137
138        let inspect_sink_server = Arc::new(InspectSinkServer::new(
139            Arc::clone(&inspect_repo),
140            servers_scope.new_child_with_name("InspectSink"),
141        ));
142
143        let sample_server = Arc::new(SampleServer::new(
144            Arc::clone(&inspect_repo),
145            zx::MonotonicDuration::from_seconds(
146                config.fuchsia_diagnostics_sample_min_period_seconds,
147            ),
148            servers_scope.new_child_with_name("SampleServer"),
149        ));
150
151        // Initialize our FIDL servers. This doesn't start serving yet.
152        let accessor_server = Arc::new(ArchiveAccessorServer::new(
153            Arc::clone(&inspect_repo),
154            Arc::clone(&logs_repo),
155            config.maximum_concurrent_snapshots_per_reader,
156            BatchRetrievalTimeout::from_seconds(config.per_component_batch_timeout_seconds),
157            servers_scope.new_child_with_name("ArchiveAccessor"),
158        ));
159
160        let flush_server = Arc::new(LogFlushServer::new(
161            servers_scope.new_child_with_name("LogFlush"),
162            flush_sender,
163        ));
164
165        let log_server = Arc::new(LogServer::new(
166            Arc::clone(&logs_repo),
167            servers_scope.new_child_with_name("Log"),
168        ));
169        let log_stream_server = Arc::new(LogStreamServer::new(
170            Arc::clone(&logs_repo),
171            servers_scope.new_child_with_name("LogStream"),
172        ));
173
174        // Initialize the external event providers containing incoming diagnostics directories and
175        // log sink connections.
176        event_router.add_consumer(ConsumerConfig {
177            consumer: &logs_repo,
178            events: vec![EventType::LogSinkRequested],
179        });
180        event_router.add_consumer(ConsumerConfig {
181            consumer: &inspect_sink_server,
182            events: vec![EventType::InspectSinkRequested],
183        });
184
185        // Drain klog and publish it to syslog.
186        if config.enable_klog {
187            match KernelDebugLog::new().await {
188                Ok(klog) => logs_repo.drain_debuglog(klog),
189                Err(err) => warn!(
190                    err:?;
191                    "Failed to start the kernel debug log reader. Klog won't be in syslog"
192                ),
193            };
194        }
195
196        // Start related services that should start once the Archivist has started.
197        for name in &config.bind_services {
198            info!("Connecting to service {}", name);
199            let (_local, remote) = zx::Channel::create();
200            if let Err(e) = fdio::service_connect(&format!("/svc/{name}"), remote) {
201                error!("Couldn't connect to service {}: {:?}", name, e);
202            }
203        }
204
205        // TODO(https://fxbug.dev/324494668): remove this when Netstack2 is gone.
206        if let Ok(dir) =
207            fuchsia_fs::directory::open_in_namespace("/netstack-diagnostics", fio::PERM_READABLE)
208        {
209            inspect_repo.add_inspect_handle(
210                Arc::new(ComponentIdentity::new(
211                    ExtendedMoniker::parse_str("core/network/netstack").unwrap(),
212                    "fuchsia-pkg://fuchsia.com/netstack#meta/netstack2.cm",
213                )),
214                InspectHandle::directory(dir),
215            );
216        }
217
218        Self {
219            accessor_server,
220            log_server,
221            flush_server,
222            log_stream_server,
223            event_router,
224            _inspect_sink_server: inspect_sink_server,
225            pipeline_manager,
226            _inspect_repository: inspect_repo,
227            logs_repository: logs_repo,
228            general_scope,
229            servers_scope,
230            freeze_server,
231            incoming_events_scope,
232            sample_server,
233        }
234    }
235
236    pub async fn initialize_external_event_sources(
237        event_router: &mut EventRouter,
238        scope: &fasync::Scope,
239    ) {
240        match EventSource::new("/events/log_sink_requested_event_stream").await {
241            Err(err) => warn!(err:?; "Failed to create event source for log sink requests"),
242            Ok(mut event_source) => {
243                event_router.add_producer(ProducerConfig {
244                    producer: &mut event_source,
245                    events: vec![EventType::LogSinkRequested],
246                });
247                scope.spawn(async move {
248                    // This should never exit.
249                    let _ = event_source.spawn().await;
250                });
251            }
252        }
253
254        match EventSource::new("/events/inspect_sink_requested_event_stream").await {
255            Err(err) => {
256                warn!(err:?; "Failed to create event source for InspectSink requests");
257            }
258            Ok(mut event_source) => {
259                event_router.add_producer(ProducerConfig {
260                    producer: &mut event_source,
261                    events: vec![EventType::InspectSinkRequested],
262                });
263                scope.spawn(async move {
264                    // This should never exit.
265                    let _ = event_source.spawn().await;
266                });
267            }
268        }
269    }
270
271    /// Run archivist to completion.
272    /// # Arguments:
273    /// * `outgoing_channel`- channel to serve outgoing directory on.
274    pub async fn run(
275        mut self,
276        mut fs: ServiceFs<ServiceObj<'static, ()>>,
277        is_embedded: bool,
278        store: fsandbox::CapabilityStoreProxy,
279        request_stream: LifecycleRequestStream,
280    ) -> Result<(), Error> {
281        debug!("Running Archivist.");
282
283        // Start servicing all outgoing services.
284        self.serve_protocols(&mut fs, store).await;
285        let svc_task = self.general_scope.spawn(fs.collect::<()>());
286
287        let _inspect_server_task = inspect_runtime::publish(
288            component::inspector(),
289            inspect_runtime::PublishOptions::default(),
290        );
291
292        let Self {
293            _inspect_repository,
294            mut pipeline_manager,
295            logs_repository,
296            accessor_server: _accessor_server,
297            log_server: _log_server,
298            log_stream_server: _log_stream_server,
299            _inspect_sink_server,
300            general_scope,
301            incoming_events_scope,
302            freeze_server: _freeze_server,
303            servers_scope,
304            event_router,
305            flush_server: _flush_server,
306            sample_server: _sample_server,
307        } = self;
308
309        // Start ingesting events.
310        let (terminate_handle, drain_events_fut) = event_router
311            .start()
312            // panic: can only panic if we didn't register event producers and consumers correctly.
313            .expect("Failed to start event router");
314        general_scope.spawn(drain_events_fut);
315
316        let servers_scope_handle = servers_scope.to_handle();
317        general_scope.spawn(component_lifecycle::on_stop_request(request_stream, || async move {
318            terminate_handle.terminate().await;
319            debug!("Stopped ingesting new CapabilityRequested events");
320            incoming_events_scope.cancel().await;
321            debug!("Cancel all tasks currently executing in our event router");
322            servers_scope_handle.close();
323            logs_repository.stop_accepting_new_log_sinks();
324            debug!("Close any new connections to FIDL servers");
325            svc_task.abort().await;
326            pipeline_manager.cancel().await;
327            debug!("Stop allowing new connections through the incoming namespace.");
328            logs_repository.wait_for_termination().await;
329            debug!("All LogSink connections have finished");
330            servers_scope.join().await;
331            debug!("All servers stopped.");
332        }));
333        if is_embedded {
334            debug!("Entering core loop.");
335        } else {
336            info!("archivist: Entering core loop.");
337        }
338
339        component::health().set_ok();
340        general_scope.await;
341
342        Ok(())
343    }
344
345    async fn serve_protocols(
346        &mut self,
347        fs: &mut ServiceFs<ServiceObj<'static, ()>>,
348        mut store: fsandbox::CapabilityStoreProxy,
349    ) {
350        component::serve_inspect_stats();
351        let mut svc_dir = fs.dir("svc");
352
353        let id_gen = sandbox::CapabilityIdGenerator::new();
354
355        // Serve fuchsia.diagnostics.ArchiveAccessors backed by a pipeline.
356        let accessors_dict_id = self
357            .pipeline_manager
358            .serve_pipelines(Arc::clone(&self.accessor_server), &id_gen, &mut store)
359            .await;
360
361        // Serve fuchsia.logger.Log
362        let log_server = Arc::clone(&self.log_server);
363        svc_dir.add_fidl_service(move |stream| {
364            debug!("fuchsia.logger.Log connection");
365            log_server.spawn(stream);
366        });
367
368        // Serve fuchsia.diagnostics.system.LogFlush
369        let flush_server = Arc::clone(&self.flush_server);
370        svc_dir.add_fidl_service(move |stream| {
371            debug!("fuchsia.diagnostics.system.LogFlush connection");
372            flush_server.spawn(stream);
373        });
374
375        // Install sample server to the accessors dictionary and start serving.
376        // Note that the await is not awaiting on the request stream or anything like that;
377        // the server spawns onto its owned Scope.
378        self.sample_server.install_and_serve(*accessors_dict_id, &id_gen, &mut store).await;
379
380        let server = self.freeze_server.clone();
381        let scope = self.general_scope.clone();
382
383        svc_dir.add_fidl_service(move |stream| {
384            debug!("fuchsia.diagnostics.system.SerialLogControl connection");
385            let server = server.clone();
386            scope.spawn(async move {
387                let _ = server.handle_requests(stream).await;
388            });
389        });
390
391        // Server fuchsia.logger.LogStream
392        let log_stream_server = Arc::clone(&self.log_stream_server);
393        svc_dir.add_fidl_service(move |stream| {
394            debug!("fuchsia.logger.LogStream connection");
395            log_stream_server.spawn(stream);
396        });
397
398        // Server fuchsia.diagnostics.LogSettings
399        let log_settings_server = LogSettingsServer::new(
400            Arc::clone(&self.logs_repository),
401            // Don't create this in the servers scope. We don't care about this protocol for
402            // shutdown purposes.
403            self.general_scope.new_child_with_name("LogSettings"),
404        );
405        svc_dir.add_fidl_service(move |stream| {
406            debug!("fuchsia.diagnostics.LogSettings connection");
407            log_settings_server.spawn(stream);
408        });
409
410        // Serve fuchsia.component.sandbox.Router
411        let router_scope =
412            self.servers_scope.new_child_with_name("fuchsia.component.sandbox.Router");
413        svc_dir.add_fidl_service(move |mut stream: fsandbox::DictionaryRouterRequestStream| {
414            let id_gen = Clone::clone(&id_gen);
415            let store = Clone::clone(&store);
416            router_scope.spawn(async move {
417                while let Ok(Some(request)) = stream.try_next().await {
418                    match request {
419                        fsandbox::DictionaryRouterRequest::Route { payload: _, responder } => {
420                            debug!("Got route request for the dynamic accessors dictionary");
421                            let dup_dict_id = id_gen.next();
422                            store
423                                .duplicate(*accessors_dict_id, dup_dict_id)
424                                .await
425                                .unwrap()
426                                .unwrap();
427                            let capability = store.export(dup_dict_id).await.unwrap().unwrap();
428                            let fsandbox::Capability::Dictionary(dict) = capability else {
429                                let _ = responder.send(Err(fsandbox::RouterError::Internal));
430                                continue;
431                            };
432                            let _ = responder.send(Ok(
433                                fsandbox::DictionaryRouterRouteResponse::Dictionary(dict),
434                            ));
435                        }
436                        fsandbox::DictionaryRouterRequest::_UnknownMethod {
437                            method_type,
438                            ordinal,
439                            ..
440                        } => {
441                            warn!(method_type:?, ordinal; "Got unknown interaction on Router");
442                        }
443                    }
444                }
445            });
446        });
447    }
448}