archivist_lib/
archivist.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::accessor::{ArchiveAccessorServer, BatchRetrievalTimeout};
6use crate::component_lifecycle;
7use crate::error::Error;
8use crate::events::router::{ConsumerConfig, EventRouter, ProducerConfig};
9use crate::events::sources::EventSource;
10use crate::events::types::*;
11use crate::identity::ComponentIdentity;
12use crate::inspect::container::InspectHandle;
13use crate::inspect::repository::InspectRepository;
14use crate::inspect::servers::*;
15use crate::logs::debuglog::KernelDebugLog;
16use crate::logs::repository::{ComponentInitialInterest, LogsRepository};
17use crate::logs::serial::{SerialConfig, SerialSink};
18use crate::logs::servers::*;
19use crate::pipeline::PipelineManager;
20use archivist_config::Config;
21use fidl_fuchsia_process_lifecycle::LifecycleRequestStream;
22use fuchsia_component::server::{ServiceFs, ServiceObj};
23use fuchsia_inspect::component;
24use fuchsia_inspect::health::Reporter;
25use futures::prelude::*;
26use log::{debug, error, info, warn};
27use moniker::ExtendedMoniker;
28use std::path::PathBuf;
29use std::str::FromStr;
30use std::sync::Arc;
31use {fidl_fuchsia_component_sandbox as fsandbox, fidl_fuchsia_io as fio, fuchsia_async as fasync};
32
33/// Responsible for initializing an `Archivist` instance. Supports multiple configurations by
34/// either calling or not calling methods on the builder like `serve_test_controller_protocol`.
35pub struct Archivist {
36    /// Handles event routing between archivist parts.
37    event_router: EventRouter,
38
39    /// The diagnostics pipelines that have been installed.
40    pipeline_manager: PipelineManager,
41
42    /// The repository holding Inspect data.
43    _inspect_repository: Arc<InspectRepository>,
44
45    /// The repository holding active log connections.
46    logs_repository: Arc<LogsRepository>,
47
48    /// The server handling fuchsia.diagnostics.ArchiveAccessor
49    accessor_server: Arc<ArchiveAccessorServer>,
50
51    /// The server handling fuchsia.logger.Log
52    log_server: Arc<LogServer>,
53
54    /// The server handling fuchsia.diagnostics.LogStream
55    log_stream_server: Arc<LogStreamServer>,
56
57    /// The server handling fuchsia.inspect.InspectSink
58    _inspect_sink_server: Arc<InspectSinkServer>,
59
60    /// Top level scope.
61    general_scope: fasync::Scope,
62
63    /// Tasks receiving external events from component manager.
64    incoming_events_scope: fasync::Scope,
65
66    /// All tasks for FIDL servers that ingest data into the Archivist must run in this scope.
67    servers_scope: fasync::Scope,
68}
69
70impl Archivist {
71    /// Creates new instance, sets up inspect and adds 'archive' directory to output folder.
72    /// Also installs `fuchsia.diagnostics.Archive` service.
73    /// Call `install_log_services`
74    pub async fn new(config: Config) -> Self {
75        let general_scope = fasync::Scope::new_with_name("general");
76        let servers_scope = fasync::Scope::new_with_name("servers");
77
78        // Initialize the pipelines that the archivist will expose.
79        let pipeline_manager = PipelineManager::new(
80            PathBuf::from(&config.pipelines_path),
81            component::inspector().root().create_child("pipelines"),
82            component::inspector().root().create_child("archive_accessor_stats"),
83            general_scope.new_child_with_name("pipeline_manager"),
84        )
85        .await;
86
87        // Initialize the core event router
88        let mut event_router =
89            EventRouter::new(component::inspector().root().create_child("events"));
90        let incoming_events_scope = general_scope.new_child_with_name("incoming_events");
91        Self::initialize_external_event_sources(&mut event_router, &incoming_events_scope).await;
92
93        let initial_interests =
94            config.component_initial_interests.into_iter().filter_map(|interest| {
95                ComponentInitialInterest::from_str(&interest)
96                    .map_err(|err| {
97                        warn!(err:?, invalid:% = interest; "Failed to load initial interest");
98                    })
99                    .ok()
100            });
101        let logs_repo = LogsRepository::new(
102            config.logs_max_cached_original_bytes,
103            initial_interests,
104            component::inspector().root(),
105            general_scope.new_child_with_name("logs_repository"),
106        );
107        if !config.allow_serial_logs.is_empty() {
108            let write_logs_to_serial =
109                SerialConfig::new(config.allow_serial_logs, config.deny_serial_log_tags)
110                    .write_logs(Arc::clone(&logs_repo), SerialSink);
111            general_scope.spawn(write_logs_to_serial);
112        }
113        let inspect_repo = Arc::new(InspectRepository::new(
114            pipeline_manager.weak_pipelines(),
115            general_scope.new_child_with_name("inspect_repository"),
116        ));
117
118        let inspect_sink_server = Arc::new(InspectSinkServer::new(
119            Arc::clone(&inspect_repo),
120            servers_scope.new_child_with_name("InspectSink"),
121        ));
122
123        // Initialize our FIDL servers. This doesn't start serving yet.
124        let accessor_server = Arc::new(ArchiveAccessorServer::new(
125            Arc::clone(&inspect_repo),
126            Arc::clone(&logs_repo),
127            config.maximum_concurrent_snapshots_per_reader,
128            BatchRetrievalTimeout::from_seconds(config.per_component_batch_timeout_seconds),
129            servers_scope.new_child_with_name("ArchiveAccessor"),
130        ));
131
132        let log_server = Arc::new(LogServer::new(
133            Arc::clone(&logs_repo),
134            servers_scope.new_child_with_name("Log"),
135        ));
136        let log_stream_server = Arc::new(LogStreamServer::new(
137            Arc::clone(&logs_repo),
138            servers_scope.new_child_with_name("LogStream"),
139        ));
140
141        // Initialize the external event providers containing incoming diagnostics directories and
142        // log sink connections.
143        event_router.add_consumer(ConsumerConfig {
144            consumer: &logs_repo,
145            events: vec![EventType::LogSinkRequested],
146        });
147        event_router.add_consumer(ConsumerConfig {
148            consumer: &inspect_sink_server,
149            events: vec![EventType::InspectSinkRequested],
150        });
151
152        // Drain klog and publish it to syslog.
153        if config.enable_klog {
154            match KernelDebugLog::new().await {
155                Ok(klog) => logs_repo.drain_debuglog(klog),
156                Err(err) => warn!(
157                    err:?;
158                    "Failed to start the kernel debug log reader. Klog won't be in syslog"
159                ),
160            };
161        }
162
163        // Start related services that should start once the Archivist has started.
164        for name in &config.bind_services {
165            info!("Connecting to service {}", name);
166            let (_local, remote) = zx::Channel::create();
167            if let Err(e) = fdio::service_connect(&format!("/svc/{name}"), remote) {
168                error!("Couldn't connect to service {}: {:?}", name, e);
169            }
170        }
171
172        // TODO(https://fxbug.dev/324494668): remove this when Netstack2 is gone.
173        if let Ok(dir) =
174            fuchsia_fs::directory::open_in_namespace("/netstack-diagnostics", fio::PERM_READABLE)
175        {
176            inspect_repo.add_inspect_handle(
177                Arc::new(ComponentIdentity::new(
178                    ExtendedMoniker::parse_str("core/network/netstack").unwrap(),
179                    "fuchsia-pkg://fuchsia.com/netstack#meta/netstack2.cm",
180                )),
181                InspectHandle::directory(dir),
182            );
183        }
184
185        Self {
186            accessor_server,
187            log_server,
188            log_stream_server,
189            event_router,
190            _inspect_sink_server: inspect_sink_server,
191            pipeline_manager,
192            _inspect_repository: inspect_repo,
193            logs_repository: logs_repo,
194            general_scope,
195            servers_scope,
196            incoming_events_scope,
197        }
198    }
199
200    pub async fn initialize_external_event_sources(
201        event_router: &mut EventRouter,
202        scope: &fasync::Scope,
203    ) {
204        match EventSource::new("/events/log_sink_requested_event_stream").await {
205            Err(err) => warn!(err:?; "Failed to create event source for log sink requests"),
206            Ok(mut event_source) => {
207                event_router.add_producer(ProducerConfig {
208                    producer: &mut event_source,
209                    events: vec![EventType::LogSinkRequested],
210                });
211                scope.spawn(async move {
212                    // This should never exit.
213                    let _ = event_source.spawn().await;
214                });
215            }
216        }
217
218        match EventSource::new("/events/inspect_sink_requested_event_stream").await {
219            Err(err) => {
220                warn!(err:?; "Failed to create event source for InspectSink requests");
221            }
222            Ok(mut event_source) => {
223                event_router.add_producer(ProducerConfig {
224                    producer: &mut event_source,
225                    events: vec![EventType::InspectSinkRequested],
226                });
227                scope.spawn(async move {
228                    // This should never exit.
229                    let _ = event_source.spawn().await;
230                });
231            }
232        }
233    }
234
235    /// Run archivist to completion.
236    /// # Arguments:
237    /// * `outgoing_channel`- channel to serve outgoing directory on.
238    pub async fn run(
239        mut self,
240        mut fs: ServiceFs<ServiceObj<'static, ()>>,
241        is_embedded: bool,
242        store: fsandbox::CapabilityStoreProxy,
243        request_stream: LifecycleRequestStream,
244    ) -> Result<(), Error> {
245        debug!("Running Archivist.");
246
247        // Start servicing all outgoing services.
248        self.serve_protocols(&mut fs, store).await;
249        let svc_task = self.general_scope.spawn(fs.collect::<()>());
250
251        let _inspect_server_task = inspect_runtime::publish(
252            component::inspector(),
253            inspect_runtime::PublishOptions::default(),
254        );
255
256        let Self {
257            _inspect_repository,
258            mut pipeline_manager,
259            logs_repository,
260            accessor_server: _accessor_server,
261            log_server: _log_server,
262            log_stream_server: _log_stream_server,
263            _inspect_sink_server,
264            general_scope,
265            incoming_events_scope,
266            servers_scope,
267            event_router,
268        } = self;
269
270        // Start ingesting events.
271        let (terminate_handle, drain_events_fut) = event_router
272            .start()
273            // panic: can only panic if we didn't register event producers and consumers correctly.
274            .expect("Failed to start event router");
275        general_scope.spawn(drain_events_fut);
276
277        let servers_scope_handle = servers_scope.to_handle();
278        general_scope.spawn(component_lifecycle::on_stop_request(request_stream, || async move {
279            terminate_handle.terminate().await;
280            debug!("Stopped ingesting new CapabilityRequested events");
281            incoming_events_scope.cancel().await;
282            debug!("Cancel all tasks currently executing in our event router");
283            servers_scope_handle.close();
284            logs_repository.stop_accepting_new_log_sinks();
285            debug!("Close any new connections to FIDL servers");
286            svc_task.cancel().await;
287            pipeline_manager.cancel().await;
288            debug!("Stop allowing new connections through the incoming namespace.");
289            logs_repository.wait_for_termination().await;
290            debug!("All LogSink connections have finished");
291            servers_scope.join().await;
292            debug!("All servers stopped.");
293        }));
294        if is_embedded {
295            debug!("Entering core loop.");
296        } else {
297            info!("archivist: Entering core loop.");
298        }
299
300        component::health().set_ok();
301        general_scope.await;
302
303        Ok(())
304    }
305
306    async fn serve_protocols(
307        &mut self,
308        fs: &mut ServiceFs<ServiceObj<'static, ()>>,
309        mut store: fsandbox::CapabilityStoreProxy,
310    ) {
311        component::serve_inspect_stats();
312        let mut svc_dir = fs.dir("svc");
313
314        let id_gen = sandbox::CapabilityIdGenerator::new();
315
316        // Serve fuchsia.diagnostics.ArchiveAccessors backed by a pipeline.
317        let accessors_dict_id = self
318            .pipeline_manager
319            .serve_pipelines(Arc::clone(&self.accessor_server), &id_gen, &mut store)
320            .await;
321
322        // Serve fuchsia.logger.Log
323        let log_server = Arc::clone(&self.log_server);
324        svc_dir.add_fidl_service(move |stream| {
325            debug!("fuchsia.logger.Log connection");
326            log_server.spawn(stream);
327        });
328
329        // Server fuchsia.logger.LogStream
330        let log_stream_server = Arc::clone(&self.log_stream_server);
331        svc_dir.add_fidl_service(move |stream| {
332            debug!("fuchsia.logger.LogStream connection");
333            log_stream_server.spawn(stream);
334        });
335
336        // Server fuchsia.diagnostics.LogSettings
337        let log_settings_server = LogSettingsServer::new(
338            Arc::clone(&self.logs_repository),
339            // Don't create this in the servers scope. We don't care about this protocol for
340            // shutdown purposes.
341            self.general_scope.new_child_with_name("LogSettings"),
342        );
343        svc_dir.add_fidl_service(move |stream| {
344            debug!("fuchsia.diagnostics.LogSettings connection");
345            log_settings_server.spawn(stream);
346        });
347
348        // Serve fuchsia.component.sandbox.Router
349        let router_scope =
350            self.servers_scope.new_child_with_name("fuchsia.component.sandbox.Router");
351        svc_dir.add_fidl_service(move |mut stream: fsandbox::DictionaryRouterRequestStream| {
352            let id_gen = Clone::clone(&id_gen);
353            let store = Clone::clone(&store);
354            router_scope.spawn(async move {
355                while let Ok(Some(request)) = stream.try_next().await {
356                    match request {
357                        fsandbox::DictionaryRouterRequest::Route { payload: _, responder } => {
358                            debug!("Got route request for the dynamic accessors dictionary");
359                            let dup_dict_id = id_gen.next();
360                            store
361                                .duplicate(*accessors_dict_id, dup_dict_id)
362                                .await
363                                .unwrap()
364                                .unwrap();
365                            let capability = store.export(dup_dict_id).await.unwrap().unwrap();
366                            let fsandbox::Capability::Dictionary(dict) = capability else {
367                                let _ = responder.send(Err(fsandbox::RouterError::Internal));
368                                continue;
369                            };
370                            let _ = responder.send(Ok(
371                                fsandbox::DictionaryRouterRouteResponse::Dictionary(dict),
372                            ));
373                        }
374                        fsandbox::DictionaryRouterRequest::_UnknownMethod {
375                            method_type,
376                            ordinal,
377                            ..
378                        } => {
379                            warn!(method_type:?, ordinal; "Got unknown interaction on Router");
380                        }
381                    }
382                }
383            });
384        });
385    }
386}