1use crate::accessor::{ArchiveAccessorServer, BatchRetrievalTimeout};
6use crate::component_lifecycle;
7use crate::error::Error;
8use crate::events::router::{ConsumerConfig, EventRouter, ProducerConfig};
9use crate::events::sources::EventSource;
10use crate::events::types::*;
11use crate::identity::ComponentIdentity;
12use crate::inspect::container::InspectHandle;
13use crate::inspect::repository::InspectRepository;
14use crate::inspect::servers::*;
15use crate::logs::debuglog::KernelDebugLog;
16use crate::logs::repository::{ComponentInitialInterest, LogsRepository};
17use crate::logs::serial::{launch_serial, SerialSink};
18use crate::logs::servers::*;
19use crate::pipeline::PipelineManager;
20use archivist_config::Config;
21use fidl_fuchsia_diagnostics_system::SerialLogControlRequestStream;
22use fidl_fuchsia_process_lifecycle::LifecycleRequestStream;
23use fuchsia_component::server::{ServiceFs, ServiceObj};
24use fuchsia_inspect::component;
25use fuchsia_inspect::health::Reporter;
26use futures::channel::mpsc::{unbounded, UnboundedSender};
27use futures::prelude::*;
28use log::{debug, error, info, warn};
29use moniker::ExtendedMoniker;
30use std::path::PathBuf;
31use std::str::FromStr;
32use std::sync::Arc;
33use {fidl_fuchsia_component_sandbox as fsandbox, fidl_fuchsia_io as fio, fuchsia_async as fasync};
34
35pub struct Archivist {
38 event_router: EventRouter,
40
41 pipeline_manager: PipelineManager,
43
44 _inspect_repository: Arc<InspectRepository>,
46
47 logs_repository: Arc<LogsRepository>,
49
50 accessor_server: Arc<ArchiveAccessorServer>,
52
53 log_server: Arc<LogServer>,
55
56 log_stream_server: Arc<LogStreamServer>,
58
59 _inspect_sink_server: Arc<InspectSinkServer>,
61
62 general_scope: fasync::Scope,
64
65 incoming_events_scope: fasync::Scope,
67
68 servers_scope: fasync::Scope,
70
71 freeze_sender: UnboundedSender<SerialLogControlRequestStream>,
73}
74
75impl Archivist {
76 pub async fn new(config: Config) -> Self {
80 let general_scope = fasync::Scope::new_with_name("general");
81 let servers_scope = fasync::Scope::new_with_name("servers");
82
83 let pipeline_manager = PipelineManager::new(
85 PathBuf::from(&config.pipelines_path),
86 component::inspector().root().create_child("pipelines"),
87 component::inspector().root().create_child("archive_accessor_stats"),
88 general_scope.new_child_with_name("pipeline_manager"),
89 )
90 .await;
91
92 let mut event_router =
94 EventRouter::new(component::inspector().root().create_child("events"));
95 let incoming_events_scope = general_scope.new_child_with_name("incoming_events");
96 Self::initialize_external_event_sources(&mut event_router, &incoming_events_scope).await;
97
98 let initial_interests =
99 config.component_initial_interests.into_iter().filter_map(|interest| {
100 ComponentInitialInterest::from_str(&interest)
101 .map_err(|err| {
102 warn!(err:?, invalid:% = interest; "Failed to load initial interest");
103 })
104 .ok()
105 });
106 let logs_repo = LogsRepository::new(
107 config.logs_max_cached_original_bytes,
108 initial_interests,
109 component::inspector().root(),
110 general_scope.new_child_with_name("logs_repository"),
111 );
112 let (freeze_sender, freeze_receiver) = unbounded::<SerialLogControlRequestStream>();
113 if !config.allow_serial_logs.is_empty() {
114 let logs_repo_clone = Arc::clone(&logs_repo);
115 general_scope.spawn(async move {
116 launch_serial(
117 config.allow_serial_logs,
118 config.deny_serial_log_tags,
119 logs_repo_clone,
120 SerialSink,
121 freeze_receiver,
122 )
123 .await;
124 });
125 }
126 let inspect_repo = Arc::new(InspectRepository::new(
127 pipeline_manager.weak_pipelines(),
128 general_scope.new_child_with_name("inspect_repository"),
129 ));
130
131 let inspect_sink_server = Arc::new(InspectSinkServer::new(
132 Arc::clone(&inspect_repo),
133 servers_scope.new_child_with_name("InspectSink"),
134 ));
135
136 let accessor_server = Arc::new(ArchiveAccessorServer::new(
138 Arc::clone(&inspect_repo),
139 Arc::clone(&logs_repo),
140 config.maximum_concurrent_snapshots_per_reader,
141 BatchRetrievalTimeout::from_seconds(config.per_component_batch_timeout_seconds),
142 servers_scope.new_child_with_name("ArchiveAccessor"),
143 ));
144
145 let log_server = Arc::new(LogServer::new(
146 Arc::clone(&logs_repo),
147 servers_scope.new_child_with_name("Log"),
148 ));
149 let log_stream_server = Arc::new(LogStreamServer::new(
150 Arc::clone(&logs_repo),
151 servers_scope.new_child_with_name("LogStream"),
152 ));
153
154 event_router.add_consumer(ConsumerConfig {
157 consumer: &logs_repo,
158 events: vec![EventType::LogSinkRequested],
159 });
160 event_router.add_consumer(ConsumerConfig {
161 consumer: &inspect_sink_server,
162 events: vec![EventType::InspectSinkRequested],
163 });
164
165 if config.enable_klog {
167 match KernelDebugLog::new().await {
168 Ok(klog) => logs_repo.drain_debuglog(klog),
169 Err(err) => warn!(
170 err:?;
171 "Failed to start the kernel debug log reader. Klog won't be in syslog"
172 ),
173 };
174 }
175
176 for name in &config.bind_services {
178 info!("Connecting to service {}", name);
179 let (_local, remote) = zx::Channel::create();
180 if let Err(e) = fdio::service_connect(&format!("/svc/{name}"), remote) {
181 error!("Couldn't connect to service {}: {:?}", name, e);
182 }
183 }
184
185 if let Ok(dir) =
187 fuchsia_fs::directory::open_in_namespace("/netstack-diagnostics", fio::PERM_READABLE)
188 {
189 inspect_repo.add_inspect_handle(
190 Arc::new(ComponentIdentity::new(
191 ExtendedMoniker::parse_str("core/network/netstack").unwrap(),
192 "fuchsia-pkg://fuchsia.com/netstack#meta/netstack2.cm",
193 )),
194 InspectHandle::directory(dir),
195 );
196 }
197
198 Self {
199 accessor_server,
200 log_server,
201 log_stream_server,
202 event_router,
203 _inspect_sink_server: inspect_sink_server,
204 pipeline_manager,
205 _inspect_repository: inspect_repo,
206 logs_repository: logs_repo,
207 general_scope,
208 servers_scope,
209 freeze_sender,
210 incoming_events_scope,
211 }
212 }
213
214 pub async fn initialize_external_event_sources(
215 event_router: &mut EventRouter,
216 scope: &fasync::Scope,
217 ) {
218 match EventSource::new("/events/log_sink_requested_event_stream").await {
219 Err(err) => warn!(err:?; "Failed to create event source for log sink requests"),
220 Ok(mut event_source) => {
221 event_router.add_producer(ProducerConfig {
222 producer: &mut event_source,
223 events: vec![EventType::LogSinkRequested],
224 });
225 scope.spawn(async move {
226 let _ = event_source.spawn().await;
228 });
229 }
230 }
231
232 match EventSource::new("/events/inspect_sink_requested_event_stream").await {
233 Err(err) => {
234 warn!(err:?; "Failed to create event source for InspectSink requests");
235 }
236 Ok(mut event_source) => {
237 event_router.add_producer(ProducerConfig {
238 producer: &mut event_source,
239 events: vec![EventType::InspectSinkRequested],
240 });
241 scope.spawn(async move {
242 let _ = event_source.spawn().await;
244 });
245 }
246 }
247 }
248
249 pub async fn run(
253 mut self,
254 mut fs: ServiceFs<ServiceObj<'static, ()>>,
255 is_embedded: bool,
256 store: fsandbox::CapabilityStoreProxy,
257 request_stream: LifecycleRequestStream,
258 ) -> Result<(), Error> {
259 debug!("Running Archivist.");
260
261 self.serve_protocols(&mut fs, store).await;
263 let svc_task = self.general_scope.spawn(fs.collect::<()>());
264
265 let _inspect_server_task = inspect_runtime::publish(
266 component::inspector(),
267 inspect_runtime::PublishOptions::default(),
268 );
269
270 let Self {
271 _inspect_repository,
272 mut pipeline_manager,
273 logs_repository,
274 accessor_server: _accessor_server,
275 log_server: _log_server,
276 log_stream_server: _log_stream_server,
277 _inspect_sink_server,
278 general_scope,
279 incoming_events_scope,
280 freeze_sender: _freeze_sender,
281 servers_scope,
282 event_router,
283 } = self;
284
285 let (terminate_handle, drain_events_fut) = event_router
287 .start()
288 .expect("Failed to start event router");
290 general_scope.spawn(drain_events_fut);
291
292 let servers_scope_handle = servers_scope.to_handle();
293 general_scope.spawn(component_lifecycle::on_stop_request(request_stream, || async move {
294 terminate_handle.terminate().await;
295 debug!("Stopped ingesting new CapabilityRequested events");
296 incoming_events_scope.cancel().await;
297 debug!("Cancel all tasks currently executing in our event router");
298 servers_scope_handle.close();
299 logs_repository.stop_accepting_new_log_sinks();
300 debug!("Close any new connections to FIDL servers");
301 svc_task.abort().await;
302 pipeline_manager.cancel().await;
303 debug!("Stop allowing new connections through the incoming namespace.");
304 logs_repository.wait_for_termination().await;
305 debug!("All LogSink connections have finished");
306 servers_scope.join().await;
307 debug!("All servers stopped.");
308 }));
309 if is_embedded {
310 debug!("Entering core loop.");
311 } else {
312 info!("archivist: Entering core loop.");
313 }
314
315 component::health().set_ok();
316 general_scope.await;
317
318 Ok(())
319 }
320
321 async fn serve_protocols(
322 &mut self,
323 fs: &mut ServiceFs<ServiceObj<'static, ()>>,
324 mut store: fsandbox::CapabilityStoreProxy,
325 ) {
326 component::serve_inspect_stats();
327 let mut svc_dir = fs.dir("svc");
328
329 let id_gen = sandbox::CapabilityIdGenerator::new();
330
331 let accessors_dict_id = self
333 .pipeline_manager
334 .serve_pipelines(Arc::clone(&self.accessor_server), &id_gen, &mut store)
335 .await;
336
337 let log_server = Arc::clone(&self.log_server);
339 svc_dir.add_fidl_service(move |stream| {
340 debug!("fuchsia.logger.Log connection");
341 log_server.spawn(stream);
342 });
343
344 let sender = self.freeze_sender.clone();
345
346 svc_dir.add_fidl_service(move |stream| {
347 debug!("fuchsia.diagnostics.system.SerialLogControl connection");
348 let _ = sender.unbounded_send(stream);
350 });
351
352 let log_stream_server = Arc::clone(&self.log_stream_server);
354 svc_dir.add_fidl_service(move |stream| {
355 debug!("fuchsia.logger.LogStream connection");
356 log_stream_server.spawn(stream);
357 });
358
359 let log_settings_server = LogSettingsServer::new(
361 Arc::clone(&self.logs_repository),
362 self.general_scope.new_child_with_name("LogSettings"),
365 );
366 svc_dir.add_fidl_service(move |stream| {
367 debug!("fuchsia.diagnostics.LogSettings connection");
368 log_settings_server.spawn(stream);
369 });
370
371 let router_scope =
373 self.servers_scope.new_child_with_name("fuchsia.component.sandbox.Router");
374 svc_dir.add_fidl_service(move |mut stream: fsandbox::DictionaryRouterRequestStream| {
375 let id_gen = Clone::clone(&id_gen);
376 let store = Clone::clone(&store);
377 router_scope.spawn(async move {
378 while let Ok(Some(request)) = stream.try_next().await {
379 match request {
380 fsandbox::DictionaryRouterRequest::Route { payload: _, responder } => {
381 debug!("Got route request for the dynamic accessors dictionary");
382 let dup_dict_id = id_gen.next();
383 store
384 .duplicate(*accessors_dict_id, dup_dict_id)
385 .await
386 .unwrap()
387 .unwrap();
388 let capability = store.export(dup_dict_id).await.unwrap().unwrap();
389 let fsandbox::Capability::Dictionary(dict) = capability else {
390 let _ = responder.send(Err(fsandbox::RouterError::Internal));
391 continue;
392 };
393 let _ = responder.send(Ok(
394 fsandbox::DictionaryRouterRouteResponse::Dictionary(dict),
395 ));
396 }
397 fsandbox::DictionaryRouterRequest::_UnknownMethod {
398 method_type,
399 ordinal,
400 ..
401 } => {
402 warn!(method_type:?, ordinal; "Got unknown interaction on Router");
403 }
404 }
405 }
406 });
407 });
408 }
409}