archivist_lib/
archivist.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::accessor::{ArchiveAccessorServer, BatchRetrievalTimeout};
6use crate::component_lifecycle;
7use crate::error::Error;
8use crate::events::router::{ConsumerConfig, EventRouter, ProducerConfig};
9use crate::events::sources::EventSource;
10use crate::events::types::*;
11use crate::identity::ComponentIdentity;
12use crate::inspect::container::InspectHandle;
13use crate::inspect::repository::InspectRepository;
14use crate::inspect::servers::*;
15use crate::logs::debuglog::KernelDebugLog;
16use crate::logs::repository::{ComponentInitialInterest, LogsRepository};
17use crate::logs::serial::{launch_serial, SerialSink};
18use crate::logs::servers::*;
19use crate::pipeline::PipelineManager;
20use archivist_config::Config;
21use fidl_fuchsia_process_lifecycle::LifecycleRequestStream;
22use fuchsia_component::server::{ServiceFs, ServiceObj};
23use fuchsia_inspect::component;
24use fuchsia_inspect::health::Reporter;
25use futures::prelude::*;
26use log::{debug, error, info, warn};
27use moniker::ExtendedMoniker;
28use std::path::PathBuf;
29use std::str::FromStr;
30use std::sync::Arc;
31use {fidl_fuchsia_component_sandbox as fsandbox, fidl_fuchsia_io as fio, fuchsia_async as fasync};
32
33/// Responsible for initializing an `Archivist` instance. Supports multiple configurations by
34/// either calling or not calling methods on the builder like `serve_test_controller_protocol`.
35pub struct Archivist {
36    /// Handles event routing between archivist parts.
37    event_router: EventRouter,
38
39    /// The diagnostics pipelines that have been installed.
40    pipeline_manager: PipelineManager,
41
42    /// The repository holding Inspect data.
43    _inspect_repository: Arc<InspectRepository>,
44
45    /// The repository holding active log connections.
46    logs_repository: Arc<LogsRepository>,
47
48    /// The server handling fuchsia.diagnostics.ArchiveAccessor
49    accessor_server: Arc<ArchiveAccessorServer>,
50
51    /// The server handling fuchsia.logger.Log
52    log_server: Arc<LogServer>,
53
54    /// The server handling fuchsia.diagnostics.system.LogFlush
55    flush_server: Arc<LogFlushServer>,
56
57    /// The server handling fuchsia.diagnostics.LogStream
58    log_stream_server: Arc<LogStreamServer>,
59
60    /// The server handling fuchsia.inspect.InspectSink
61    _inspect_sink_server: Arc<InspectSinkServer>,
62
63    /// Top level scope.
64    general_scope: fasync::Scope,
65
66    /// Tasks receiving external events from component manager.
67    incoming_events_scope: fasync::Scope,
68
69    /// All tasks for FIDL servers that ingest data into the Archivist must run in this scope.
70    servers_scope: fasync::Scope,
71
72    /// Freeze server.
73    freeze_server: LogFreezeServer,
74}
75
76impl Archivist {
77    /// Creates new instance, sets up inspect and adds 'archive' directory to output folder.
78    /// Also installs `fuchsia.diagnostics.Archive` service.
79    /// Call `install_log_services`
80    pub async fn new(config: Config) -> Self {
81        let general_scope = fasync::Scope::new_with_name("general");
82        let servers_scope = fasync::Scope::new_with_name("servers");
83
84        // Initialize the pipelines that the archivist will expose.
85        let pipeline_manager = PipelineManager::new(
86            PathBuf::from(&config.pipelines_path),
87            component::inspector().root().create_child("pipelines"),
88            component::inspector().root().create_child("archive_accessor_stats"),
89            general_scope.new_child_with_name("pipeline_manager"),
90        )
91        .await;
92
93        // Initialize the core event router
94        let mut event_router =
95            EventRouter::new(component::inspector().root().create_child("events"));
96        let incoming_events_scope = general_scope.new_child_with_name("incoming_events");
97        Self::initialize_external_event_sources(&mut event_router, &incoming_events_scope).await;
98
99        let initial_interests =
100            config.component_initial_interests.into_iter().filter_map(|interest| {
101                ComponentInitialInterest::from_str(&interest)
102                    .map_err(|err| {
103                        warn!(err:?, invalid:% = interest; "Failed to load initial interest");
104                    })
105                    .ok()
106            });
107        let logs_repo = LogsRepository::new(
108            config.logs_max_cached_original_bytes,
109            initial_interests,
110            component::inspector().root(),
111            general_scope.new_child_with_name("logs_repository"),
112        );
113        let (freeze_server, freeze_receiver) = LogFreezeServer::new();
114        if !config.allow_serial_logs.is_empty() {
115            let logs_repo_clone = Arc::clone(&logs_repo);
116            general_scope.spawn(async move {
117                launch_serial(
118                    config.allow_serial_logs,
119                    config.deny_serial_log_tags,
120                    logs_repo_clone,
121                    SerialSink,
122                    freeze_receiver,
123                )
124                .await;
125            });
126        }
127        let inspect_repo = Arc::new(InspectRepository::new(
128            pipeline_manager.weak_pipelines(),
129            general_scope.new_child_with_name("inspect_repository"),
130        ));
131
132        let inspect_sink_server = Arc::new(InspectSinkServer::new(
133            Arc::clone(&inspect_repo),
134            servers_scope.new_child_with_name("InspectSink"),
135        ));
136
137        // Initialize our FIDL servers. This doesn't start serving yet.
138        let accessor_server = Arc::new(ArchiveAccessorServer::new(
139            Arc::clone(&inspect_repo),
140            Arc::clone(&logs_repo),
141            config.maximum_concurrent_snapshots_per_reader,
142            BatchRetrievalTimeout::from_seconds(config.per_component_batch_timeout_seconds),
143            servers_scope.new_child_with_name("ArchiveAccessor"),
144        ));
145
146        let flush_server =
147            Arc::new(LogFlushServer::new(servers_scope.new_child_with_name("LogFlush")));
148
149        let log_server = Arc::new(LogServer::new(
150            Arc::clone(&logs_repo),
151            servers_scope.new_child_with_name("Log"),
152        ));
153        let log_stream_server = Arc::new(LogStreamServer::new(
154            Arc::clone(&logs_repo),
155            servers_scope.new_child_with_name("LogStream"),
156        ));
157
158        // Initialize the external event providers containing incoming diagnostics directories and
159        // log sink connections.
160        event_router.add_consumer(ConsumerConfig {
161            consumer: &logs_repo,
162            events: vec![EventType::LogSinkRequested],
163        });
164        event_router.add_consumer(ConsumerConfig {
165            consumer: &inspect_sink_server,
166            events: vec![EventType::InspectSinkRequested],
167        });
168
169        // Drain klog and publish it to syslog.
170        if config.enable_klog {
171            match KernelDebugLog::new().await {
172                Ok(klog) => logs_repo.drain_debuglog(klog),
173                Err(err) => warn!(
174                    err:?;
175                    "Failed to start the kernel debug log reader. Klog won't be in syslog"
176                ),
177            };
178        }
179
180        // Start related services that should start once the Archivist has started.
181        for name in &config.bind_services {
182            info!("Connecting to service {}", name);
183            let (_local, remote) = zx::Channel::create();
184            if let Err(e) = fdio::service_connect(&format!("/svc/{name}"), remote) {
185                error!("Couldn't connect to service {}: {:?}", name, e);
186            }
187        }
188
189        // TODO(https://fxbug.dev/324494668): remove this when Netstack2 is gone.
190        if let Ok(dir) =
191            fuchsia_fs::directory::open_in_namespace("/netstack-diagnostics", fio::PERM_READABLE)
192        {
193            inspect_repo.add_inspect_handle(
194                Arc::new(ComponentIdentity::new(
195                    ExtendedMoniker::parse_str("core/network/netstack").unwrap(),
196                    "fuchsia-pkg://fuchsia.com/netstack#meta/netstack2.cm",
197                )),
198                InspectHandle::directory(dir),
199            );
200        }
201
202        Self {
203            accessor_server,
204            log_server,
205            flush_server,
206            log_stream_server,
207            event_router,
208            _inspect_sink_server: inspect_sink_server,
209            pipeline_manager,
210            _inspect_repository: inspect_repo,
211            logs_repository: logs_repo,
212            general_scope,
213            servers_scope,
214            freeze_server,
215            incoming_events_scope,
216        }
217    }
218
219    pub async fn initialize_external_event_sources(
220        event_router: &mut EventRouter,
221        scope: &fasync::Scope,
222    ) {
223        match EventSource::new("/events/log_sink_requested_event_stream").await {
224            Err(err) => warn!(err:?; "Failed to create event source for log sink requests"),
225            Ok(mut event_source) => {
226                event_router.add_producer(ProducerConfig {
227                    producer: &mut event_source,
228                    events: vec![EventType::LogSinkRequested],
229                });
230                scope.spawn(async move {
231                    // This should never exit.
232                    let _ = event_source.spawn().await;
233                });
234            }
235        }
236
237        match EventSource::new("/events/inspect_sink_requested_event_stream").await {
238            Err(err) => {
239                warn!(err:?; "Failed to create event source for InspectSink requests");
240            }
241            Ok(mut event_source) => {
242                event_router.add_producer(ProducerConfig {
243                    producer: &mut event_source,
244                    events: vec![EventType::InspectSinkRequested],
245                });
246                scope.spawn(async move {
247                    // This should never exit.
248                    let _ = event_source.spawn().await;
249                });
250            }
251        }
252    }
253
254    /// Run archivist to completion.
255    /// # Arguments:
256    /// * `outgoing_channel`- channel to serve outgoing directory on.
257    pub async fn run(
258        mut self,
259        mut fs: ServiceFs<ServiceObj<'static, ()>>,
260        is_embedded: bool,
261        store: fsandbox::CapabilityStoreProxy,
262        request_stream: LifecycleRequestStream,
263    ) -> Result<(), Error> {
264        debug!("Running Archivist.");
265
266        // Start servicing all outgoing services.
267        self.serve_protocols(&mut fs, store).await;
268        let svc_task = self.general_scope.spawn(fs.collect::<()>());
269
270        let _inspect_server_task = inspect_runtime::publish(
271            component::inspector(),
272            inspect_runtime::PublishOptions::default(),
273        );
274
275        let Self {
276            _inspect_repository,
277            mut pipeline_manager,
278            logs_repository,
279            accessor_server: _accessor_server,
280            log_server: _log_server,
281            log_stream_server: _log_stream_server,
282            _inspect_sink_server,
283            general_scope,
284            incoming_events_scope,
285            freeze_server: _freeze_server,
286            servers_scope,
287            event_router,
288            flush_server: _flush_server,
289        } = self;
290
291        // Start ingesting events.
292        let (terminate_handle, drain_events_fut) = event_router
293            .start()
294            // panic: can only panic if we didn't register event producers and consumers correctly.
295            .expect("Failed to start event router");
296        general_scope.spawn(drain_events_fut);
297
298        let servers_scope_handle = servers_scope.to_handle();
299        general_scope.spawn(component_lifecycle::on_stop_request(request_stream, || async move {
300            terminate_handle.terminate().await;
301            debug!("Stopped ingesting new CapabilityRequested events");
302            incoming_events_scope.cancel().await;
303            debug!("Cancel all tasks currently executing in our event router");
304            servers_scope_handle.close();
305            logs_repository.stop_accepting_new_log_sinks();
306            debug!("Close any new connections to FIDL servers");
307            svc_task.abort().await;
308            pipeline_manager.cancel().await;
309            debug!("Stop allowing new connections through the incoming namespace.");
310            logs_repository.wait_for_termination().await;
311            debug!("All LogSink connections have finished");
312            servers_scope.join().await;
313            debug!("All servers stopped.");
314        }));
315        if is_embedded {
316            debug!("Entering core loop.");
317        } else {
318            info!("archivist: Entering core loop.");
319        }
320
321        component::health().set_ok();
322        general_scope.await;
323
324        Ok(())
325    }
326
327    async fn serve_protocols(
328        &mut self,
329        fs: &mut ServiceFs<ServiceObj<'static, ()>>,
330        mut store: fsandbox::CapabilityStoreProxy,
331    ) {
332        component::serve_inspect_stats();
333        let mut svc_dir = fs.dir("svc");
334
335        let id_gen = sandbox::CapabilityIdGenerator::new();
336
337        // Serve fuchsia.diagnostics.ArchiveAccessors backed by a pipeline.
338        let accessors_dict_id = self
339            .pipeline_manager
340            .serve_pipelines(Arc::clone(&self.accessor_server), &id_gen, &mut store)
341            .await;
342
343        // Serve fuchsia.logger.Log
344        let log_server = Arc::clone(&self.log_server);
345        svc_dir.add_fidl_service(move |stream| {
346            debug!("fuchsia.logger.Log connection");
347            log_server.spawn(stream);
348        });
349
350        // Serve fuchsia.diagnostics.system.LogFlush
351        let flush_server = Arc::clone(&self.flush_server);
352        svc_dir.add_fidl_service(move |stream| {
353            debug!("fuchsia.diagnostics.system.LogFlush connection");
354            flush_server.spawn(stream);
355        });
356
357        let server = self.freeze_server.clone();
358        let scope = self.general_scope.clone();
359
360        svc_dir.add_fidl_service(move |stream| {
361            debug!("fuchsia.diagnostics.system.SerialLogControl connection");
362            let server = server.clone();
363            scope.spawn(async move {
364                let _ = server.handle_requests(stream).await;
365            });
366        });
367
368        // Server fuchsia.logger.LogStream
369        let log_stream_server = Arc::clone(&self.log_stream_server);
370        svc_dir.add_fidl_service(move |stream| {
371            debug!("fuchsia.logger.LogStream connection");
372            log_stream_server.spawn(stream);
373        });
374
375        // Server fuchsia.diagnostics.LogSettings
376        let log_settings_server = LogSettingsServer::new(
377            Arc::clone(&self.logs_repository),
378            // Don't create this in the servers scope. We don't care about this protocol for
379            // shutdown purposes.
380            self.general_scope.new_child_with_name("LogSettings"),
381        );
382        svc_dir.add_fidl_service(move |stream| {
383            debug!("fuchsia.diagnostics.LogSettings connection");
384            log_settings_server.spawn(stream);
385        });
386
387        // Serve fuchsia.component.sandbox.Router
388        let router_scope =
389            self.servers_scope.new_child_with_name("fuchsia.component.sandbox.Router");
390        svc_dir.add_fidl_service(move |mut stream: fsandbox::DictionaryRouterRequestStream| {
391            let id_gen = Clone::clone(&id_gen);
392            let store = Clone::clone(&store);
393            router_scope.spawn(async move {
394                while let Ok(Some(request)) = stream.try_next().await {
395                    match request {
396                        fsandbox::DictionaryRouterRequest::Route { payload: _, responder } => {
397                            debug!("Got route request for the dynamic accessors dictionary");
398                            let dup_dict_id = id_gen.next();
399                            store
400                                .duplicate(*accessors_dict_id, dup_dict_id)
401                                .await
402                                .unwrap()
403                                .unwrap();
404                            let capability = store.export(dup_dict_id).await.unwrap().unwrap();
405                            let fsandbox::Capability::Dictionary(dict) = capability else {
406                                let _ = responder.send(Err(fsandbox::RouterError::Internal));
407                                continue;
408                            };
409                            let _ = responder.send(Ok(
410                                fsandbox::DictionaryRouterRouteResponse::Dictionary(dict),
411                            ));
412                        }
413                        fsandbox::DictionaryRouterRequest::_UnknownMethod {
414                            method_type,
415                            ordinal,
416                            ..
417                        } => {
418                            warn!(method_type:?, ordinal; "Got unknown interaction on Router");
419                        }
420                    }
421                }
422            });
423        });
424    }
425}