1use crate::accessor::{ArchiveAccessorServer, BatchRetrievalTimeout};
6use crate::component_lifecycle;
7use crate::error::Error;
8use crate::events::router::{ConsumerConfig, EventRouter, ProducerConfig};
9use crate::events::sources::EventSource;
10use crate::events::types::*;
11use crate::identity::ComponentIdentity;
12use crate::inspect::container::InspectHandle;
13use crate::inspect::repository::InspectRepository;
14use crate::inspect::servers::*;
15use crate::logs::debuglog::KernelDebugLog;
16use crate::logs::repository::{ComponentInitialInterest, LogsRepository};
17use crate::logs::serial::{SerialConfig, SerialSink};
18use crate::logs::servers::*;
19use crate::pipeline::PipelineManager;
20use archivist_config::Config;
21use fidl_fuchsia_process_lifecycle::LifecycleRequestStream;
22use fuchsia_component::server::{ServiceFs, ServiceObj};
23use fuchsia_inspect::component;
24use fuchsia_inspect::health::Reporter;
25use futures::prelude::*;
26use log::{debug, error, info, warn};
27use moniker::ExtendedMoniker;
28use std::path::PathBuf;
29use std::str::FromStr;
30use std::sync::Arc;
31use {fidl_fuchsia_component_sandbox as fsandbox, fidl_fuchsia_io as fio, fuchsia_async as fasync};
32
33pub struct Archivist {
36 event_router: EventRouter,
38
39 pipeline_manager: PipelineManager,
41
42 _inspect_repository: Arc<InspectRepository>,
44
45 logs_repository: Arc<LogsRepository>,
47
48 accessor_server: Arc<ArchiveAccessorServer>,
50
51 log_server: Arc<LogServer>,
53
54 log_stream_server: Arc<LogStreamServer>,
56
57 _inspect_sink_server: Arc<InspectSinkServer>,
59
60 general_scope: fasync::Scope,
62
63 incoming_events_scope: fasync::Scope,
65
66 servers_scope: fasync::Scope,
68}
69
70impl Archivist {
71 pub async fn new(config: Config) -> Self {
75 let general_scope = fasync::Scope::new_with_name("general");
76 let servers_scope = fasync::Scope::new_with_name("servers");
77
78 let pipeline_manager = PipelineManager::new(
80 PathBuf::from(&config.pipelines_path),
81 component::inspector().root().create_child("pipelines"),
82 component::inspector().root().create_child("archive_accessor_stats"),
83 general_scope.new_child_with_name("pipeline_manager"),
84 )
85 .await;
86
87 let mut event_router =
89 EventRouter::new(component::inspector().root().create_child("events"));
90 let incoming_events_scope = general_scope.new_child_with_name("incoming_events");
91 Self::initialize_external_event_sources(&mut event_router, &incoming_events_scope).await;
92
93 let initial_interests =
94 config.component_initial_interests.into_iter().filter_map(|interest| {
95 ComponentInitialInterest::from_str(&interest)
96 .map_err(|err| {
97 warn!(err:?, invalid:% = interest; "Failed to load initial interest");
98 })
99 .ok()
100 });
101 let logs_repo = LogsRepository::new(
102 config.logs_max_cached_original_bytes,
103 initial_interests,
104 component::inspector().root(),
105 general_scope.new_child_with_name("logs_repository"),
106 );
107 if !config.allow_serial_logs.is_empty() {
108 let write_logs_to_serial =
109 SerialConfig::new(config.allow_serial_logs, config.deny_serial_log_tags)
110 .write_logs(Arc::clone(&logs_repo), SerialSink);
111 general_scope.spawn(write_logs_to_serial);
112 }
113 let inspect_repo = Arc::new(InspectRepository::new(
114 pipeline_manager.weak_pipelines(),
115 general_scope.new_child_with_name("inspect_repository"),
116 ));
117
118 let inspect_sink_server = Arc::new(InspectSinkServer::new(
119 Arc::clone(&inspect_repo),
120 servers_scope.new_child_with_name("InspectSink"),
121 ));
122
123 let accessor_server = Arc::new(ArchiveAccessorServer::new(
125 Arc::clone(&inspect_repo),
126 Arc::clone(&logs_repo),
127 config.maximum_concurrent_snapshots_per_reader,
128 BatchRetrievalTimeout::from_seconds(config.per_component_batch_timeout_seconds),
129 servers_scope.new_child_with_name("ArchiveAccessor"),
130 ));
131
132 let log_server = Arc::new(LogServer::new(
133 Arc::clone(&logs_repo),
134 servers_scope.new_child_with_name("Log"),
135 ));
136 let log_stream_server = Arc::new(LogStreamServer::new(
137 Arc::clone(&logs_repo),
138 servers_scope.new_child_with_name("LogStream"),
139 ));
140
141 event_router.add_consumer(ConsumerConfig {
144 consumer: &logs_repo,
145 events: vec![EventType::LogSinkRequested],
146 });
147 event_router.add_consumer(ConsumerConfig {
148 consumer: &inspect_sink_server,
149 events: vec![EventType::InspectSinkRequested],
150 });
151
152 if config.enable_klog {
154 match KernelDebugLog::new().await {
155 Ok(klog) => logs_repo.drain_debuglog(klog),
156 Err(err) => warn!(
157 err:?;
158 "Failed to start the kernel debug log reader. Klog won't be in syslog"
159 ),
160 };
161 }
162
163 for name in &config.bind_services {
165 info!("Connecting to service {}", name);
166 let (_local, remote) = zx::Channel::create();
167 if let Err(e) = fdio::service_connect(&format!("/svc/{name}"), remote) {
168 error!("Couldn't connect to service {}: {:?}", name, e);
169 }
170 }
171
172 if let Ok(dir) =
174 fuchsia_fs::directory::open_in_namespace("/netstack-diagnostics", fio::PERM_READABLE)
175 {
176 inspect_repo.add_inspect_handle(
177 Arc::new(ComponentIdentity::new(
178 ExtendedMoniker::parse_str("core/network/netstack").unwrap(),
179 "fuchsia-pkg://fuchsia.com/netstack#meta/netstack2.cm",
180 )),
181 InspectHandle::directory(dir),
182 );
183 }
184
185 Self {
186 accessor_server,
187 log_server,
188 log_stream_server,
189 event_router,
190 _inspect_sink_server: inspect_sink_server,
191 pipeline_manager,
192 _inspect_repository: inspect_repo,
193 logs_repository: logs_repo,
194 general_scope,
195 servers_scope,
196 incoming_events_scope,
197 }
198 }
199
200 pub async fn initialize_external_event_sources(
201 event_router: &mut EventRouter,
202 scope: &fasync::Scope,
203 ) {
204 match EventSource::new("/events/log_sink_requested_event_stream").await {
205 Err(err) => warn!(err:?; "Failed to create event source for log sink requests"),
206 Ok(mut event_source) => {
207 event_router.add_producer(ProducerConfig {
208 producer: &mut event_source,
209 events: vec![EventType::LogSinkRequested],
210 });
211 scope.spawn(async move {
212 let _ = event_source.spawn().await;
214 });
215 }
216 }
217
218 match EventSource::new("/events/inspect_sink_requested_event_stream").await {
219 Err(err) => {
220 warn!(err:?; "Failed to create event source for InspectSink requests");
221 }
222 Ok(mut event_source) => {
223 event_router.add_producer(ProducerConfig {
224 producer: &mut event_source,
225 events: vec![EventType::InspectSinkRequested],
226 });
227 scope.spawn(async move {
228 let _ = event_source.spawn().await;
230 });
231 }
232 }
233 }
234
235 pub async fn run(
239 mut self,
240 mut fs: ServiceFs<ServiceObj<'static, ()>>,
241 is_embedded: bool,
242 store: fsandbox::CapabilityStoreProxy,
243 request_stream: LifecycleRequestStream,
244 ) -> Result<(), Error> {
245 debug!("Running Archivist.");
246
247 self.serve_protocols(&mut fs, store).await;
249 let svc_task = self.general_scope.spawn(fs.collect::<()>());
250
251 let _inspect_server_task = inspect_runtime::publish(
252 component::inspector(),
253 inspect_runtime::PublishOptions::default(),
254 );
255
256 let Self {
257 _inspect_repository,
258 mut pipeline_manager,
259 logs_repository,
260 accessor_server: _accessor_server,
261 log_server: _log_server,
262 log_stream_server: _log_stream_server,
263 _inspect_sink_server,
264 general_scope,
265 incoming_events_scope,
266 servers_scope,
267 event_router,
268 } = self;
269
270 let (terminate_handle, drain_events_fut) = event_router
272 .start()
273 .expect("Failed to start event router");
275 general_scope.spawn(drain_events_fut);
276
277 let servers_scope_handle = servers_scope.to_handle();
278 general_scope.spawn(component_lifecycle::on_stop_request(request_stream, || async move {
279 terminate_handle.terminate().await;
280 debug!("Stopped ingesting new CapabilityRequested events");
281 incoming_events_scope.cancel().await;
282 debug!("Cancel all tasks currently executing in our event router");
283 servers_scope_handle.close();
284 logs_repository.stop_accepting_new_log_sinks();
285 debug!("Close any new connections to FIDL servers");
286 svc_task.cancel().await;
287 pipeline_manager.cancel().await;
288 debug!("Stop allowing new connections through the incoming namespace.");
289 logs_repository.wait_for_termination().await;
290 debug!("All LogSink connections have finished");
291 servers_scope.join().await;
292 debug!("All servers stopped.");
293 }));
294 if is_embedded {
295 debug!("Entering core loop.");
296 } else {
297 info!("archivist: Entering core loop.");
298 }
299
300 component::health().set_ok();
301 general_scope.await;
302
303 Ok(())
304 }
305
306 async fn serve_protocols(
307 &mut self,
308 fs: &mut ServiceFs<ServiceObj<'static, ()>>,
309 mut store: fsandbox::CapabilityStoreProxy,
310 ) {
311 component::serve_inspect_stats();
312 let mut svc_dir = fs.dir("svc");
313
314 let id_gen = sandbox::CapabilityIdGenerator::new();
315
316 let accessors_dict_id = self
318 .pipeline_manager
319 .serve_pipelines(Arc::clone(&self.accessor_server), &id_gen, &mut store)
320 .await;
321
322 let log_server = Arc::clone(&self.log_server);
324 svc_dir.add_fidl_service(move |stream| {
325 debug!("fuchsia.logger.Log connection");
326 log_server.spawn(stream);
327 });
328
329 let log_stream_server = Arc::clone(&self.log_stream_server);
331 svc_dir.add_fidl_service(move |stream| {
332 debug!("fuchsia.logger.LogStream connection");
333 log_stream_server.spawn(stream);
334 });
335
336 let log_settings_server = LogSettingsServer::new(
338 Arc::clone(&self.logs_repository),
339 self.general_scope.new_child_with_name("LogSettings"),
342 );
343 svc_dir.add_fidl_service(move |stream| {
344 debug!("fuchsia.diagnostics.LogSettings connection");
345 log_settings_server.spawn(stream);
346 });
347
348 let router_scope =
350 self.servers_scope.new_child_with_name("fuchsia.component.sandbox.Router");
351 svc_dir.add_fidl_service(move |mut stream: fsandbox::DictionaryRouterRequestStream| {
352 let id_gen = Clone::clone(&id_gen);
353 let store = Clone::clone(&store);
354 router_scope.spawn(async move {
355 while let Ok(Some(request)) = stream.try_next().await {
356 match request {
357 fsandbox::DictionaryRouterRequest::Route { payload: _, responder } => {
358 debug!("Got route request for the dynamic accessors dictionary");
359 let dup_dict_id = id_gen.next();
360 store
361 .duplicate(*accessors_dict_id, dup_dict_id)
362 .await
363 .unwrap()
364 .unwrap();
365 let capability = store.export(dup_dict_id).await.unwrap().unwrap();
366 let fsandbox::Capability::Dictionary(dict) = capability else {
367 let _ = responder.send(Err(fsandbox::RouterError::Internal));
368 continue;
369 };
370 let _ = responder.send(Ok(
371 fsandbox::DictionaryRouterRouteResponse::Dictionary(dict),
372 ));
373 }
374 fsandbox::DictionaryRouterRequest::_UnknownMethod {
375 method_type,
376 ordinal,
377 ..
378 } => {
379 warn!(method_type:?, ordinal; "Got unknown interaction on Router");
380 }
381 }
382 }
383 });
384 });
385 }
386}