archivist_lib/
archivist.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::accessor::{ArchiveAccessorServer, BatchRetrievalTimeout};
6use crate::component_lifecycle;
7use crate::error::Error;
8use crate::events::router::{ConsumerConfig, EventRouter, ProducerConfig};
9use crate::events::sources::EventSource;
10use crate::events::types::*;
11use crate::identity::ComponentIdentity;
12use crate::inspect::container::InspectHandle;
13use crate::inspect::repository::InspectRepository;
14use crate::inspect::servers::*;
15use crate::logs::debuglog::KernelDebugLog;
16use crate::logs::repository::{ComponentInitialInterest, LogsRepository};
17use crate::logs::serial::{launch_serial, SerialSink};
18use crate::logs::servers::*;
19use crate::pipeline::PipelineManager;
20use archivist_config::Config;
21use fidl_fuchsia_diagnostics_system::SerialLogControlRequestStream;
22use fidl_fuchsia_process_lifecycle::LifecycleRequestStream;
23use fuchsia_component::server::{ServiceFs, ServiceObj};
24use fuchsia_inspect::component;
25use fuchsia_inspect::health::Reporter;
26use futures::channel::mpsc::{unbounded, UnboundedSender};
27use futures::prelude::*;
28use log::{debug, error, info, warn};
29use moniker::ExtendedMoniker;
30use std::path::PathBuf;
31use std::str::FromStr;
32use std::sync::Arc;
33use {fidl_fuchsia_component_sandbox as fsandbox, fidl_fuchsia_io as fio, fuchsia_async as fasync};
34
35/// Responsible for initializing an `Archivist` instance. Supports multiple configurations by
36/// either calling or not calling methods on the builder like `serve_test_controller_protocol`.
37pub struct Archivist {
38    /// Handles event routing between archivist parts.
39    event_router: EventRouter,
40
41    /// The diagnostics pipelines that have been installed.
42    pipeline_manager: PipelineManager,
43
44    /// The repository holding Inspect data.
45    _inspect_repository: Arc<InspectRepository>,
46
47    /// The repository holding active log connections.
48    logs_repository: Arc<LogsRepository>,
49
50    /// The server handling fuchsia.diagnostics.ArchiveAccessor
51    accessor_server: Arc<ArchiveAccessorServer>,
52
53    /// The server handling fuchsia.logger.Log
54    log_server: Arc<LogServer>,
55
56    /// The server handling fuchsia.diagnostics.LogStream
57    log_stream_server: Arc<LogStreamServer>,
58
59    /// The server handling fuchsia.inspect.InspectSink
60    _inspect_sink_server: Arc<InspectSinkServer>,
61
62    /// Top level scope.
63    general_scope: fasync::Scope,
64
65    /// Tasks receiving external events from component manager.
66    incoming_events_scope: fasync::Scope,
67
68    /// All tasks for FIDL servers that ingest data into the Archivist must run in this scope.
69    servers_scope: fasync::Scope,
70
71    /// Freeze sender, for sending the log freeze channel over when we get FDIO requests
72    freeze_sender: UnboundedSender<SerialLogControlRequestStream>,
73}
74
75impl Archivist {
76    /// Creates new instance, sets up inspect and adds 'archive' directory to output folder.
77    /// Also installs `fuchsia.diagnostics.Archive` service.
78    /// Call `install_log_services`
79    pub async fn new(config: Config) -> Self {
80        let general_scope = fasync::Scope::new_with_name("general");
81        let servers_scope = fasync::Scope::new_with_name("servers");
82
83        // Initialize the pipelines that the archivist will expose.
84        let pipeline_manager = PipelineManager::new(
85            PathBuf::from(&config.pipelines_path),
86            component::inspector().root().create_child("pipelines"),
87            component::inspector().root().create_child("archive_accessor_stats"),
88            general_scope.new_child_with_name("pipeline_manager"),
89        )
90        .await;
91
92        // Initialize the core event router
93        let mut event_router =
94            EventRouter::new(component::inspector().root().create_child("events"));
95        let incoming_events_scope = general_scope.new_child_with_name("incoming_events");
96        Self::initialize_external_event_sources(&mut event_router, &incoming_events_scope).await;
97
98        let initial_interests =
99            config.component_initial_interests.into_iter().filter_map(|interest| {
100                ComponentInitialInterest::from_str(&interest)
101                    .map_err(|err| {
102                        warn!(err:?, invalid:% = interest; "Failed to load initial interest");
103                    })
104                    .ok()
105            });
106        let logs_repo = LogsRepository::new(
107            config.logs_max_cached_original_bytes,
108            initial_interests,
109            component::inspector().root(),
110            general_scope.new_child_with_name("logs_repository"),
111        );
112        let (freeze_sender, freeze_receiver) = unbounded::<SerialLogControlRequestStream>();
113        if !config.allow_serial_logs.is_empty() {
114            let logs_repo_clone = Arc::clone(&logs_repo);
115            general_scope.spawn(async move {
116                launch_serial(
117                    config.allow_serial_logs,
118                    config.deny_serial_log_tags,
119                    logs_repo_clone,
120                    SerialSink,
121                    freeze_receiver,
122                )
123                .await;
124            });
125        }
126        let inspect_repo = Arc::new(InspectRepository::new(
127            pipeline_manager.weak_pipelines(),
128            general_scope.new_child_with_name("inspect_repository"),
129        ));
130
131        let inspect_sink_server = Arc::new(InspectSinkServer::new(
132            Arc::clone(&inspect_repo),
133            servers_scope.new_child_with_name("InspectSink"),
134        ));
135
136        // Initialize our FIDL servers. This doesn't start serving yet.
137        let accessor_server = Arc::new(ArchiveAccessorServer::new(
138            Arc::clone(&inspect_repo),
139            Arc::clone(&logs_repo),
140            config.maximum_concurrent_snapshots_per_reader,
141            BatchRetrievalTimeout::from_seconds(config.per_component_batch_timeout_seconds),
142            servers_scope.new_child_with_name("ArchiveAccessor"),
143        ));
144
145        let log_server = Arc::new(LogServer::new(
146            Arc::clone(&logs_repo),
147            servers_scope.new_child_with_name("Log"),
148        ));
149        let log_stream_server = Arc::new(LogStreamServer::new(
150            Arc::clone(&logs_repo),
151            servers_scope.new_child_with_name("LogStream"),
152        ));
153
154        // Initialize the external event providers containing incoming diagnostics directories and
155        // log sink connections.
156        event_router.add_consumer(ConsumerConfig {
157            consumer: &logs_repo,
158            events: vec![EventType::LogSinkRequested],
159        });
160        event_router.add_consumer(ConsumerConfig {
161            consumer: &inspect_sink_server,
162            events: vec![EventType::InspectSinkRequested],
163        });
164
165        // Drain klog and publish it to syslog.
166        if config.enable_klog {
167            match KernelDebugLog::new().await {
168                Ok(klog) => logs_repo.drain_debuglog(klog),
169                Err(err) => warn!(
170                    err:?;
171                    "Failed to start the kernel debug log reader. Klog won't be in syslog"
172                ),
173            };
174        }
175
176        // Start related services that should start once the Archivist has started.
177        for name in &config.bind_services {
178            info!("Connecting to service {}", name);
179            let (_local, remote) = zx::Channel::create();
180            if let Err(e) = fdio::service_connect(&format!("/svc/{name}"), remote) {
181                error!("Couldn't connect to service {}: {:?}", name, e);
182            }
183        }
184
185        // TODO(https://fxbug.dev/324494668): remove this when Netstack2 is gone.
186        if let Ok(dir) =
187            fuchsia_fs::directory::open_in_namespace("/netstack-diagnostics", fio::PERM_READABLE)
188        {
189            inspect_repo.add_inspect_handle(
190                Arc::new(ComponentIdentity::new(
191                    ExtendedMoniker::parse_str("core/network/netstack").unwrap(),
192                    "fuchsia-pkg://fuchsia.com/netstack#meta/netstack2.cm",
193                )),
194                InspectHandle::directory(dir),
195            );
196        }
197
198        Self {
199            accessor_server,
200            log_server,
201            log_stream_server,
202            event_router,
203            _inspect_sink_server: inspect_sink_server,
204            pipeline_manager,
205            _inspect_repository: inspect_repo,
206            logs_repository: logs_repo,
207            general_scope,
208            servers_scope,
209            freeze_sender,
210            incoming_events_scope,
211        }
212    }
213
214    pub async fn initialize_external_event_sources(
215        event_router: &mut EventRouter,
216        scope: &fasync::Scope,
217    ) {
218        match EventSource::new("/events/log_sink_requested_event_stream").await {
219            Err(err) => warn!(err:?; "Failed to create event source for log sink requests"),
220            Ok(mut event_source) => {
221                event_router.add_producer(ProducerConfig {
222                    producer: &mut event_source,
223                    events: vec![EventType::LogSinkRequested],
224                });
225                scope.spawn(async move {
226                    // This should never exit.
227                    let _ = event_source.spawn().await;
228                });
229            }
230        }
231
232        match EventSource::new("/events/inspect_sink_requested_event_stream").await {
233            Err(err) => {
234                warn!(err:?; "Failed to create event source for InspectSink requests");
235            }
236            Ok(mut event_source) => {
237                event_router.add_producer(ProducerConfig {
238                    producer: &mut event_source,
239                    events: vec![EventType::InspectSinkRequested],
240                });
241                scope.spawn(async move {
242                    // This should never exit.
243                    let _ = event_source.spawn().await;
244                });
245            }
246        }
247    }
248
249    /// Run archivist to completion.
250    /// # Arguments:
251    /// * `outgoing_channel`- channel to serve outgoing directory on.
252    pub async fn run(
253        mut self,
254        mut fs: ServiceFs<ServiceObj<'static, ()>>,
255        is_embedded: bool,
256        store: fsandbox::CapabilityStoreProxy,
257        request_stream: LifecycleRequestStream,
258    ) -> Result<(), Error> {
259        debug!("Running Archivist.");
260
261        // Start servicing all outgoing services.
262        self.serve_protocols(&mut fs, store).await;
263        let svc_task = self.general_scope.spawn(fs.collect::<()>());
264
265        let _inspect_server_task = inspect_runtime::publish(
266            component::inspector(),
267            inspect_runtime::PublishOptions::default(),
268        );
269
270        let Self {
271            _inspect_repository,
272            mut pipeline_manager,
273            logs_repository,
274            accessor_server: _accessor_server,
275            log_server: _log_server,
276            log_stream_server: _log_stream_server,
277            _inspect_sink_server,
278            general_scope,
279            incoming_events_scope,
280            freeze_sender: _freeze_sender,
281            servers_scope,
282            event_router,
283        } = self;
284
285        // Start ingesting events.
286        let (terminate_handle, drain_events_fut) = event_router
287            .start()
288            // panic: can only panic if we didn't register event producers and consumers correctly.
289            .expect("Failed to start event router");
290        general_scope.spawn(drain_events_fut);
291
292        let servers_scope_handle = servers_scope.to_handle();
293        general_scope.spawn(component_lifecycle::on_stop_request(request_stream, || async move {
294            terminate_handle.terminate().await;
295            debug!("Stopped ingesting new CapabilityRequested events");
296            incoming_events_scope.cancel().await;
297            debug!("Cancel all tasks currently executing in our event router");
298            servers_scope_handle.close();
299            logs_repository.stop_accepting_new_log_sinks();
300            debug!("Close any new connections to FIDL servers");
301            svc_task.abort().await;
302            pipeline_manager.cancel().await;
303            debug!("Stop allowing new connections through the incoming namespace.");
304            logs_repository.wait_for_termination().await;
305            debug!("All LogSink connections have finished");
306            servers_scope.join().await;
307            debug!("All servers stopped.");
308        }));
309        if is_embedded {
310            debug!("Entering core loop.");
311        } else {
312            info!("archivist: Entering core loop.");
313        }
314
315        component::health().set_ok();
316        general_scope.await;
317
318        Ok(())
319    }
320
321    async fn serve_protocols(
322        &mut self,
323        fs: &mut ServiceFs<ServiceObj<'static, ()>>,
324        mut store: fsandbox::CapabilityStoreProxy,
325    ) {
326        component::serve_inspect_stats();
327        let mut svc_dir = fs.dir("svc");
328
329        let id_gen = sandbox::CapabilityIdGenerator::new();
330
331        // Serve fuchsia.diagnostics.ArchiveAccessors backed by a pipeline.
332        let accessors_dict_id = self
333            .pipeline_manager
334            .serve_pipelines(Arc::clone(&self.accessor_server), &id_gen, &mut store)
335            .await;
336
337        // Serve fuchsia.logger.Log
338        let log_server = Arc::clone(&self.log_server);
339        svc_dir.add_fidl_service(move |stream| {
340            debug!("fuchsia.logger.Log connection");
341            log_server.spawn(stream);
342        });
343
344        let sender = self.freeze_sender.clone();
345
346        svc_dir.add_fidl_service(move |stream| {
347            debug!("fuchsia.diagnostics.system.SerialLogControl connection");
348            // Let the channel close if the server has stopped running.
349            let _ = sender.unbounded_send(stream);
350        });
351
352        // Server fuchsia.logger.LogStream
353        let log_stream_server = Arc::clone(&self.log_stream_server);
354        svc_dir.add_fidl_service(move |stream| {
355            debug!("fuchsia.logger.LogStream connection");
356            log_stream_server.spawn(stream);
357        });
358
359        // Server fuchsia.diagnostics.LogSettings
360        let log_settings_server = LogSettingsServer::new(
361            Arc::clone(&self.logs_repository),
362            // Don't create this in the servers scope. We don't care about this protocol for
363            // shutdown purposes.
364            self.general_scope.new_child_with_name("LogSettings"),
365        );
366        svc_dir.add_fidl_service(move |stream| {
367            debug!("fuchsia.diagnostics.LogSettings connection");
368            log_settings_server.spawn(stream);
369        });
370
371        // Serve fuchsia.component.sandbox.Router
372        let router_scope =
373            self.servers_scope.new_child_with_name("fuchsia.component.sandbox.Router");
374        svc_dir.add_fidl_service(move |mut stream: fsandbox::DictionaryRouterRequestStream| {
375            let id_gen = Clone::clone(&id_gen);
376            let store = Clone::clone(&store);
377            router_scope.spawn(async move {
378                while let Ok(Some(request)) = stream.try_next().await {
379                    match request {
380                        fsandbox::DictionaryRouterRequest::Route { payload: _, responder } => {
381                            debug!("Got route request for the dynamic accessors dictionary");
382                            let dup_dict_id = id_gen.next();
383                            store
384                                .duplicate(*accessors_dict_id, dup_dict_id)
385                                .await
386                                .unwrap()
387                                .unwrap();
388                            let capability = store.export(dup_dict_id).await.unwrap().unwrap();
389                            let fsandbox::Capability::Dictionary(dict) = capability else {
390                                let _ = responder.send(Err(fsandbox::RouterError::Internal));
391                                continue;
392                            };
393                            let _ = responder.send(Ok(
394                                fsandbox::DictionaryRouterRouteResponse::Dictionary(dict),
395                            ));
396                        }
397                        fsandbox::DictionaryRouterRequest::_UnknownMethod {
398                            method_type,
399                            ordinal,
400                            ..
401                        } => {
402                            warn!(method_type:?, ordinal; "Got unknown interaction on Router");
403                        }
404                    }
405                }
406            });
407        });
408    }
409}