1use crate::accessor::{ArchiveAccessorServer, BatchRetrievalTimeout};
6use crate::component_lifecycle;
7use crate::error::Error;
8use crate::events::router::{ConsumerConfig, EventRouter, ProducerConfig};
9use crate::events::sources::EventSource;
10use crate::events::types::*;
11use crate::identity::ComponentIdentity;
12use crate::inspect::container::InspectHandle;
13use crate::inspect::repository::InspectRepository;
14use crate::inspect::servers::*;
15use crate::logs::debuglog::KernelDebugLog;
16use crate::logs::repository::{ComponentInitialInterest, LogsRepository};
17use crate::logs::serial::{launch_serial, SerialSink};
18use crate::logs::servers::*;
19use crate::pipeline::PipelineManager;
20use archivist_config::Config;
21use fidl_fuchsia_process_lifecycle::LifecycleRequestStream;
22use fuchsia_component::server::{ServiceFs, ServiceObj};
23use fuchsia_inspect::component;
24use fuchsia_inspect::health::Reporter;
25use futures::prelude::*;
26use log::{debug, error, info, warn};
27use moniker::ExtendedMoniker;
28use std::path::PathBuf;
29use std::str::FromStr;
30use std::sync::Arc;
31use {fidl_fuchsia_component_sandbox as fsandbox, fidl_fuchsia_io as fio, fuchsia_async as fasync};
32
33pub struct Archivist {
36 event_router: EventRouter,
38
39 pipeline_manager: PipelineManager,
41
42 _inspect_repository: Arc<InspectRepository>,
44
45 logs_repository: Arc<LogsRepository>,
47
48 accessor_server: Arc<ArchiveAccessorServer>,
50
51 log_server: Arc<LogServer>,
53
54 flush_server: Arc<LogFlushServer>,
56
57 log_stream_server: Arc<LogStreamServer>,
59
60 _inspect_sink_server: Arc<InspectSinkServer>,
62
63 general_scope: fasync::Scope,
65
66 incoming_events_scope: fasync::Scope,
68
69 servers_scope: fasync::Scope,
71
72 freeze_server: LogFreezeServer,
74}
75
76impl Archivist {
77 pub async fn new(config: Config) -> Self {
81 let general_scope = fasync::Scope::new_with_name("general");
82 let servers_scope = fasync::Scope::new_with_name("servers");
83
84 let pipeline_manager = PipelineManager::new(
86 PathBuf::from(&config.pipelines_path),
87 component::inspector().root().create_child("pipelines"),
88 component::inspector().root().create_child("archive_accessor_stats"),
89 general_scope.new_child_with_name("pipeline_manager"),
90 )
91 .await;
92
93 let mut event_router =
95 EventRouter::new(component::inspector().root().create_child("events"));
96 let incoming_events_scope = general_scope.new_child_with_name("incoming_events");
97 Self::initialize_external_event_sources(&mut event_router, &incoming_events_scope).await;
98
99 let initial_interests =
100 config.component_initial_interests.into_iter().filter_map(|interest| {
101 ComponentInitialInterest::from_str(&interest)
102 .map_err(|err| {
103 warn!(err:?, invalid:% = interest; "Failed to load initial interest");
104 })
105 .ok()
106 });
107 let logs_repo = LogsRepository::new(
108 config.logs_max_cached_original_bytes,
109 initial_interests,
110 component::inspector().root(),
111 general_scope.new_child_with_name("logs_repository"),
112 );
113 let (freeze_server, freeze_receiver) = LogFreezeServer::new();
114 if !config.allow_serial_logs.is_empty() {
115 let logs_repo_clone = Arc::clone(&logs_repo);
116 general_scope.spawn(async move {
117 launch_serial(
118 config.allow_serial_logs,
119 config.deny_serial_log_tags,
120 logs_repo_clone,
121 SerialSink,
122 freeze_receiver,
123 )
124 .await;
125 });
126 }
127 let inspect_repo = Arc::new(InspectRepository::new(
128 pipeline_manager.weak_pipelines(),
129 general_scope.new_child_with_name("inspect_repository"),
130 ));
131
132 let inspect_sink_server = Arc::new(InspectSinkServer::new(
133 Arc::clone(&inspect_repo),
134 servers_scope.new_child_with_name("InspectSink"),
135 ));
136
137 let accessor_server = Arc::new(ArchiveAccessorServer::new(
139 Arc::clone(&inspect_repo),
140 Arc::clone(&logs_repo),
141 config.maximum_concurrent_snapshots_per_reader,
142 BatchRetrievalTimeout::from_seconds(config.per_component_batch_timeout_seconds),
143 servers_scope.new_child_with_name("ArchiveAccessor"),
144 ));
145
146 let flush_server =
147 Arc::new(LogFlushServer::new(servers_scope.new_child_with_name("LogFlush")));
148
149 let log_server = Arc::new(LogServer::new(
150 Arc::clone(&logs_repo),
151 servers_scope.new_child_with_name("Log"),
152 ));
153 let log_stream_server = Arc::new(LogStreamServer::new(
154 Arc::clone(&logs_repo),
155 servers_scope.new_child_with_name("LogStream"),
156 ));
157
158 event_router.add_consumer(ConsumerConfig {
161 consumer: &logs_repo,
162 events: vec![EventType::LogSinkRequested],
163 });
164 event_router.add_consumer(ConsumerConfig {
165 consumer: &inspect_sink_server,
166 events: vec![EventType::InspectSinkRequested],
167 });
168
169 if config.enable_klog {
171 match KernelDebugLog::new().await {
172 Ok(klog) => logs_repo.drain_debuglog(klog),
173 Err(err) => warn!(
174 err:?;
175 "Failed to start the kernel debug log reader. Klog won't be in syslog"
176 ),
177 };
178 }
179
180 for name in &config.bind_services {
182 info!("Connecting to service {}", name);
183 let (_local, remote) = zx::Channel::create();
184 if let Err(e) = fdio::service_connect(&format!("/svc/{name}"), remote) {
185 error!("Couldn't connect to service {}: {:?}", name, e);
186 }
187 }
188
189 if let Ok(dir) =
191 fuchsia_fs::directory::open_in_namespace("/netstack-diagnostics", fio::PERM_READABLE)
192 {
193 inspect_repo.add_inspect_handle(
194 Arc::new(ComponentIdentity::new(
195 ExtendedMoniker::parse_str("core/network/netstack").unwrap(),
196 "fuchsia-pkg://fuchsia.com/netstack#meta/netstack2.cm",
197 )),
198 InspectHandle::directory(dir),
199 );
200 }
201
202 Self {
203 accessor_server,
204 log_server,
205 flush_server,
206 log_stream_server,
207 event_router,
208 _inspect_sink_server: inspect_sink_server,
209 pipeline_manager,
210 _inspect_repository: inspect_repo,
211 logs_repository: logs_repo,
212 general_scope,
213 servers_scope,
214 freeze_server,
215 incoming_events_scope,
216 }
217 }
218
219 pub async fn initialize_external_event_sources(
220 event_router: &mut EventRouter,
221 scope: &fasync::Scope,
222 ) {
223 match EventSource::new("/events/log_sink_requested_event_stream").await {
224 Err(err) => warn!(err:?; "Failed to create event source for log sink requests"),
225 Ok(mut event_source) => {
226 event_router.add_producer(ProducerConfig {
227 producer: &mut event_source,
228 events: vec![EventType::LogSinkRequested],
229 });
230 scope.spawn(async move {
231 let _ = event_source.spawn().await;
233 });
234 }
235 }
236
237 match EventSource::new("/events/inspect_sink_requested_event_stream").await {
238 Err(err) => {
239 warn!(err:?; "Failed to create event source for InspectSink requests");
240 }
241 Ok(mut event_source) => {
242 event_router.add_producer(ProducerConfig {
243 producer: &mut event_source,
244 events: vec![EventType::InspectSinkRequested],
245 });
246 scope.spawn(async move {
247 let _ = event_source.spawn().await;
249 });
250 }
251 }
252 }
253
254 pub async fn run(
258 mut self,
259 mut fs: ServiceFs<ServiceObj<'static, ()>>,
260 is_embedded: bool,
261 store: fsandbox::CapabilityStoreProxy,
262 request_stream: LifecycleRequestStream,
263 ) -> Result<(), Error> {
264 debug!("Running Archivist.");
265
266 self.serve_protocols(&mut fs, store).await;
268 let svc_task = self.general_scope.spawn(fs.collect::<()>());
269
270 let _inspect_server_task = inspect_runtime::publish(
271 component::inspector(),
272 inspect_runtime::PublishOptions::default(),
273 );
274
275 let Self {
276 _inspect_repository,
277 mut pipeline_manager,
278 logs_repository,
279 accessor_server: _accessor_server,
280 log_server: _log_server,
281 log_stream_server: _log_stream_server,
282 _inspect_sink_server,
283 general_scope,
284 incoming_events_scope,
285 freeze_server: _freeze_server,
286 servers_scope,
287 event_router,
288 flush_server: _flush_server,
289 } = self;
290
291 let (terminate_handle, drain_events_fut) = event_router
293 .start()
294 .expect("Failed to start event router");
296 general_scope.spawn(drain_events_fut);
297
298 let servers_scope_handle = servers_scope.to_handle();
299 general_scope.spawn(component_lifecycle::on_stop_request(request_stream, || async move {
300 terminate_handle.terminate().await;
301 debug!("Stopped ingesting new CapabilityRequested events");
302 incoming_events_scope.cancel().await;
303 debug!("Cancel all tasks currently executing in our event router");
304 servers_scope_handle.close();
305 logs_repository.stop_accepting_new_log_sinks();
306 debug!("Close any new connections to FIDL servers");
307 svc_task.abort().await;
308 pipeline_manager.cancel().await;
309 debug!("Stop allowing new connections through the incoming namespace.");
310 logs_repository.wait_for_termination().await;
311 debug!("All LogSink connections have finished");
312 servers_scope.join().await;
313 debug!("All servers stopped.");
314 }));
315 if is_embedded {
316 debug!("Entering core loop.");
317 } else {
318 info!("archivist: Entering core loop.");
319 }
320
321 component::health().set_ok();
322 general_scope.await;
323
324 Ok(())
325 }
326
327 async fn serve_protocols(
328 &mut self,
329 fs: &mut ServiceFs<ServiceObj<'static, ()>>,
330 mut store: fsandbox::CapabilityStoreProxy,
331 ) {
332 component::serve_inspect_stats();
333 let mut svc_dir = fs.dir("svc");
334
335 let id_gen = sandbox::CapabilityIdGenerator::new();
336
337 let accessors_dict_id = self
339 .pipeline_manager
340 .serve_pipelines(Arc::clone(&self.accessor_server), &id_gen, &mut store)
341 .await;
342
343 let log_server = Arc::clone(&self.log_server);
345 svc_dir.add_fidl_service(move |stream| {
346 debug!("fuchsia.logger.Log connection");
347 log_server.spawn(stream);
348 });
349
350 let flush_server = Arc::clone(&self.flush_server);
352 svc_dir.add_fidl_service(move |stream| {
353 debug!("fuchsia.diagnostics.system.LogFlush connection");
354 flush_server.spawn(stream);
355 });
356
357 let server = self.freeze_server.clone();
358 let scope = self.general_scope.clone();
359
360 svc_dir.add_fidl_service(move |stream| {
361 debug!("fuchsia.diagnostics.system.SerialLogControl connection");
362 let server = server.clone();
363 scope.spawn(async move {
364 let _ = server.handle_requests(stream).await;
365 });
366 });
367
368 let log_stream_server = Arc::clone(&self.log_stream_server);
370 svc_dir.add_fidl_service(move |stream| {
371 debug!("fuchsia.logger.LogStream connection");
372 log_stream_server.spawn(stream);
373 });
374
375 let log_settings_server = LogSettingsServer::new(
377 Arc::clone(&self.logs_repository),
378 self.general_scope.new_child_with_name("LogSettings"),
381 );
382 svc_dir.add_fidl_service(move |stream| {
383 debug!("fuchsia.diagnostics.LogSettings connection");
384 log_settings_server.spawn(stream);
385 });
386
387 let router_scope =
389 self.servers_scope.new_child_with_name("fuchsia.component.sandbox.Router");
390 svc_dir.add_fidl_service(move |mut stream: fsandbox::DictionaryRouterRequestStream| {
391 let id_gen = Clone::clone(&id_gen);
392 let store = Clone::clone(&store);
393 router_scope.spawn(async move {
394 while let Ok(Some(request)) = stream.try_next().await {
395 match request {
396 fsandbox::DictionaryRouterRequest::Route { payload: _, responder } => {
397 debug!("Got route request for the dynamic accessors dictionary");
398 let dup_dict_id = id_gen.next();
399 store
400 .duplicate(*accessors_dict_id, dup_dict_id)
401 .await
402 .unwrap()
403 .unwrap();
404 let capability = store.export(dup_dict_id).await.unwrap().unwrap();
405 let fsandbox::Capability::Dictionary(dict) = capability else {
406 let _ = responder.send(Err(fsandbox::RouterError::Internal));
407 continue;
408 };
409 let _ = responder.send(Ok(
410 fsandbox::DictionaryRouterRouteResponse::Dictionary(dict),
411 ));
412 }
413 fsandbox::DictionaryRouterRequest::_UnknownMethod {
414 method_type,
415 ordinal,
416 ..
417 } => {
418 warn!(method_type:?, ordinal; "Got unknown interaction on Router");
419 }
420 }
421 }
422 });
423 });
424 }
425}