1use crate::accessor::{ArchiveAccessorServer, BatchRetrievalTimeout};
6use crate::component_lifecycle;
7use crate::error::Error;
8use crate::events::router::{ConsumerConfig, EventRouter, ProducerConfig};
9use crate::events::sources::EventSource;
10use crate::events::types::*;
11use crate::identity::ComponentIdentity;
12use crate::inspect::container::InspectHandle;
13use crate::inspect::repository::InspectRepository;
14use crate::inspect::servers::*;
15use crate::logs::debuglog::KernelDebugLog;
16use crate::logs::repository::{ComponentInitialInterest, LogsRepository};
17use crate::logs::serial::{SerialSink, launch_serial};
18use crate::logs::servers::*;
19use crate::pipeline::PipelineManager;
20use archivist_config::Config;
21use fidl_fuchsia_process_lifecycle::LifecycleRequestStream;
22use fuchsia_component::server::{ServiceFs, ServiceObj};
23use fuchsia_inspect::component;
24use fuchsia_inspect::health::Reporter;
25use futures::channel::mpsc::unbounded;
26use futures::prelude::*;
27use log::{debug, error, info, warn};
28use moniker::ExtendedMoniker;
29use std::path::PathBuf;
30use std::str::FromStr;
31use std::sync::Arc;
32use {fidl_fuchsia_component_sandbox as fsandbox, fidl_fuchsia_io as fio, fuchsia_async as fasync};
33
34pub struct Archivist {
37 event_router: EventRouter,
39
40 pipeline_manager: PipelineManager,
42
43 _inspect_repository: Arc<InspectRepository>,
45
46 logs_repository: Arc<LogsRepository>,
48
49 accessor_server: Arc<ArchiveAccessorServer>,
51
52 log_server: Arc<LogServer>,
54
55 flush_server: Arc<LogFlushServer>,
57
58 log_stream_server: Arc<LogStreamServer>,
60
61 _inspect_sink_server: Arc<InspectSinkServer>,
63
64 general_scope: fasync::Scope,
66
67 incoming_events_scope: fasync::Scope,
69
70 servers_scope: fasync::Scope,
72
73 freeze_server: LogFreezeServer,
75
76 sample_server: Arc<SampleServer>,
78}
79
80impl Archivist {
81 pub async fn new(ring_buffer: ring_buffer::Reader, config: Config) -> Self {
85 let general_scope = fasync::Scope::new_with_name("general");
86 let servers_scope = fasync::Scope::new_with_name("servers");
87
88 let pipeline_manager = PipelineManager::new(
90 PathBuf::from(&config.pipelines_path),
91 component::inspector().root().create_child("pipelines"),
92 component::inspector().root().create_child("archive_accessor_stats"),
93 general_scope.new_child_with_name("pipeline_manager"),
94 )
95 .await;
96
97 let mut event_router =
99 EventRouter::new(component::inspector().root().create_child("events"));
100 let incoming_events_scope = general_scope.new_child_with_name("incoming_events");
101 Self::initialize_external_event_sources(&mut event_router, &incoming_events_scope).await;
102
103 let initial_interests =
104 config.component_initial_interests.into_iter().filter_map(|interest| {
105 ComponentInitialInterest::from_str(&interest)
106 .map_err(|err| {
107 warn!(err:?, invalid:% = interest; "Failed to load initial interest");
108 })
109 .ok()
110 });
111 let logs_repo = LogsRepository::new(
112 ring_buffer,
113 initial_interests,
114 component::inspector().root(),
115 general_scope.new_child_with_name("logs_repository"),
116 );
117 let (freeze_server, freeze_receiver) = LogFreezeServer::new();
118 let (flush_sender, flush_receiver) = unbounded();
119 if !config.allow_serial_logs.is_empty() {
120 let logs_repo_clone = Arc::clone(&logs_repo);
121 general_scope.spawn(async move {
122 launch_serial(
123 config.allow_serial_logs,
124 config.deny_serial_log_tags,
125 logs_repo_clone,
126 SerialSink,
127 freeze_receiver,
128 flush_receiver,
129 )
130 .await;
131 });
132 }
133 let inspect_repo = Arc::new(InspectRepository::new(
134 pipeline_manager.weak_pipelines(),
135 general_scope.new_child_with_name("inspect_repository"),
136 ));
137
138 let inspect_sink_server = Arc::new(InspectSinkServer::new(
139 Arc::clone(&inspect_repo),
140 servers_scope.new_child_with_name("InspectSink"),
141 ));
142
143 let sample_server = Arc::new(SampleServer::new(
144 Arc::clone(&inspect_repo),
145 zx::MonotonicDuration::from_seconds(
146 config.fuchsia_diagnostics_sample_min_period_seconds,
147 ),
148 servers_scope.new_child_with_name("SampleServer"),
149 ));
150
151 let accessor_server = Arc::new(ArchiveAccessorServer::new(
153 Arc::clone(&inspect_repo),
154 Arc::clone(&logs_repo),
155 config.maximum_concurrent_snapshots_per_reader,
156 BatchRetrievalTimeout::from_seconds(config.per_component_batch_timeout_seconds),
157 servers_scope.new_child_with_name("ArchiveAccessor"),
158 ));
159
160 let flush_server = Arc::new(LogFlushServer::new(
161 servers_scope.new_child_with_name("LogFlush"),
162 flush_sender,
163 ));
164
165 let log_server = Arc::new(LogServer::new(
166 Arc::clone(&logs_repo),
167 servers_scope.new_child_with_name("Log"),
168 ));
169 let log_stream_server = Arc::new(LogStreamServer::new(
170 Arc::clone(&logs_repo),
171 servers_scope.new_child_with_name("LogStream"),
172 ));
173
174 event_router.add_consumer(ConsumerConfig {
177 consumer: &logs_repo,
178 events: vec![EventType::LogSinkRequested],
179 });
180 event_router.add_consumer(ConsumerConfig {
181 consumer: &inspect_sink_server,
182 events: vec![EventType::InspectSinkRequested],
183 });
184
185 if config.enable_klog {
187 match KernelDebugLog::new().await {
188 Ok(klog) => logs_repo.drain_debuglog(klog),
189 Err(err) => warn!(
190 err:?;
191 "Failed to start the kernel debug log reader. Klog won't be in syslog"
192 ),
193 };
194 }
195
196 for name in &config.bind_services {
198 info!("Connecting to service {}", name);
199 let (_local, remote) = zx::Channel::create();
200 if let Err(e) = fdio::service_connect(&format!("/svc/{name}"), remote) {
201 error!("Couldn't connect to service {}: {:?}", name, e);
202 }
203 }
204
205 if let Ok(dir) =
207 fuchsia_fs::directory::open_in_namespace("/netstack-diagnostics", fio::PERM_READABLE)
208 {
209 inspect_repo.add_inspect_handle(
210 Arc::new(ComponentIdentity::new(
211 ExtendedMoniker::parse_str("core/network/netstack").unwrap(),
212 "fuchsia-pkg://fuchsia.com/netstack#meta/netstack2.cm",
213 )),
214 InspectHandle::directory(dir),
215 );
216 }
217
218 Self {
219 accessor_server,
220 log_server,
221 flush_server,
222 log_stream_server,
223 event_router,
224 _inspect_sink_server: inspect_sink_server,
225 pipeline_manager,
226 _inspect_repository: inspect_repo,
227 logs_repository: logs_repo,
228 general_scope,
229 servers_scope,
230 freeze_server,
231 incoming_events_scope,
232 sample_server,
233 }
234 }
235
236 pub async fn initialize_external_event_sources(
237 event_router: &mut EventRouter,
238 scope: &fasync::Scope,
239 ) {
240 match EventSource::new("/events/log_sink_requested_event_stream").await {
241 Err(err) => warn!(err:?; "Failed to create event source for log sink requests"),
242 Ok(mut event_source) => {
243 event_router.add_producer(ProducerConfig {
244 producer: &mut event_source,
245 events: vec![EventType::LogSinkRequested],
246 });
247 scope.spawn(async move {
248 let _ = event_source.spawn().await;
250 });
251 }
252 }
253
254 match EventSource::new("/events/inspect_sink_requested_event_stream").await {
255 Err(err) => {
256 warn!(err:?; "Failed to create event source for InspectSink requests");
257 }
258 Ok(mut event_source) => {
259 event_router.add_producer(ProducerConfig {
260 producer: &mut event_source,
261 events: vec![EventType::InspectSinkRequested],
262 });
263 scope.spawn(async move {
264 let _ = event_source.spawn().await;
266 });
267 }
268 }
269 }
270
271 pub async fn run(
275 mut self,
276 mut fs: ServiceFs<ServiceObj<'static, ()>>,
277 is_embedded: bool,
278 store: fsandbox::CapabilityStoreProxy,
279 request_stream: LifecycleRequestStream,
280 ) -> Result<(), Error> {
281 debug!("Running Archivist.");
282
283 self.serve_protocols(&mut fs, store).await;
285 let svc_task = self.general_scope.spawn(fs.collect::<()>());
286
287 let _inspect_server_task = inspect_runtime::publish(
288 component::inspector(),
289 inspect_runtime::PublishOptions::default(),
290 );
291
292 let Self {
293 _inspect_repository,
294 mut pipeline_manager,
295 logs_repository,
296 accessor_server: _accessor_server,
297 log_server: _log_server,
298 log_stream_server: _log_stream_server,
299 _inspect_sink_server,
300 general_scope,
301 incoming_events_scope,
302 freeze_server: _freeze_server,
303 servers_scope,
304 event_router,
305 flush_server: _flush_server,
306 sample_server: _sample_server,
307 } = self;
308
309 let (terminate_handle, drain_events_fut) = event_router
311 .start()
312 .expect("Failed to start event router");
314 general_scope.spawn(drain_events_fut);
315
316 let servers_scope_handle = servers_scope.to_handle();
317 general_scope.spawn(component_lifecycle::on_stop_request(request_stream, || async move {
318 terminate_handle.terminate().await;
319 debug!("Stopped ingesting new CapabilityRequested events");
320 incoming_events_scope.cancel().await;
321 debug!("Cancel all tasks currently executing in our event router");
322 servers_scope_handle.close();
323 logs_repository.stop_accepting_new_log_sinks();
324 debug!("Close any new connections to FIDL servers");
325 svc_task.abort().await;
326 pipeline_manager.cancel().await;
327 debug!("Stop allowing new connections through the incoming namespace.");
328 logs_repository.wait_for_termination().await;
329 debug!("All LogSink connections have finished");
330 servers_scope.join().await;
331 debug!("All servers stopped.");
332 }));
333 if is_embedded {
334 debug!("Entering core loop.");
335 } else {
336 info!("archivist: Entering core loop.");
337 }
338
339 component::health().set_ok();
340 general_scope.await;
341
342 Ok(())
343 }
344
345 async fn serve_protocols(
346 &mut self,
347 fs: &mut ServiceFs<ServiceObj<'static, ()>>,
348 mut store: fsandbox::CapabilityStoreProxy,
349 ) {
350 component::serve_inspect_stats();
351 let mut svc_dir = fs.dir("svc");
352
353 let id_gen = sandbox::CapabilityIdGenerator::new();
354
355 let accessors_dict_id = self
357 .pipeline_manager
358 .serve_pipelines(Arc::clone(&self.accessor_server), &id_gen, &mut store)
359 .await;
360
361 let log_server = Arc::clone(&self.log_server);
363 svc_dir.add_fidl_service(move |stream| {
364 debug!("fuchsia.logger.Log connection");
365 log_server.spawn(stream);
366 });
367
368 let flush_server = Arc::clone(&self.flush_server);
370 svc_dir.add_fidl_service(move |stream| {
371 debug!("fuchsia.diagnostics.system.LogFlush connection");
372 flush_server.spawn(stream);
373 });
374
375 self.sample_server.install_and_serve(*accessors_dict_id, &id_gen, &mut store).await;
379
380 let server = self.freeze_server.clone();
381 let scope = self.general_scope.clone();
382
383 svc_dir.add_fidl_service(move |stream| {
384 debug!("fuchsia.diagnostics.system.SerialLogControl connection");
385 let server = server.clone();
386 scope.spawn(async move {
387 let _ = server.handle_requests(stream).await;
388 });
389 });
390
391 let log_stream_server = Arc::clone(&self.log_stream_server);
393 svc_dir.add_fidl_service(move |stream| {
394 debug!("fuchsia.logger.LogStream connection");
395 log_stream_server.spawn(stream);
396 });
397
398 let log_settings_server = LogSettingsServer::new(
400 Arc::clone(&self.logs_repository),
401 self.general_scope.new_child_with_name("LogSettings"),
404 );
405 svc_dir.add_fidl_service(move |stream| {
406 debug!("fuchsia.diagnostics.LogSettings connection");
407 log_settings_server.spawn(stream);
408 });
409
410 let router_scope =
412 self.servers_scope.new_child_with_name("fuchsia.component.sandbox.Router");
413 svc_dir.add_fidl_service(move |mut stream: fsandbox::DictionaryRouterRequestStream| {
414 let id_gen = Clone::clone(&id_gen);
415 let store = Clone::clone(&store);
416 router_scope.spawn(async move {
417 while let Ok(Some(request)) = stream.try_next().await {
418 match request {
419 fsandbox::DictionaryRouterRequest::Route { payload: _, responder } => {
420 debug!("Got route request for the dynamic accessors dictionary");
421 let dup_dict_id = id_gen.next();
422 store
423 .duplicate(*accessors_dict_id, dup_dict_id)
424 .await
425 .unwrap()
426 .unwrap();
427 let capability = store.export(dup_dict_id).await.unwrap().unwrap();
428 let fsandbox::Capability::Dictionary(dict) = capability else {
429 let _ = responder.send(Err(fsandbox::RouterError::Internal));
430 continue;
431 };
432 let _ = responder.send(Ok(
433 fsandbox::DictionaryRouterRouteResponse::Dictionary(dict),
434 ));
435 }
436 fsandbox::DictionaryRouterRequest::_UnknownMethod {
437 method_type,
438 ordinal,
439 ..
440 } => {
441 warn!(method_type:?, ordinal; "Got unknown interaction on Router");
442 }
443 }
444 }
445 });
446 });
447 }
448}