1mod constants;
8mod fetcher;
9mod file_handler;
10mod inspect_server;
11mod persist_server;
12mod scheduler;
13
14use anyhow::{Context, Error, format_err};
15use argh::FromArgs;
16use fidl::endpoints;
17use fuchsia_component::client;
18use fuchsia_component::server::ServiceFs;
19use fuchsia_inspect::component;
20use fuchsia_inspect::health::Reporter;
21use fuchsia_runtime::{HandleInfo, HandleType};
22use futures::{FutureExt, StreamExt, TryStreamExt, select};
23use log::*;
24use persist_server::PersistServer;
25use persistence_build_config::Config as BuildConfig;
26use persistence_config::Config;
27use scheduler::Scheduler;
28use std::pin::pin;
29use zx::BootInstant;
30use {
31 fidl_fuchsia_component_sandbox as fsandbox, fidl_fuchsia_process_lifecycle as flifecycle,
32 fidl_fuchsia_update as fupdate, fuchsia_async as fasync,
33};
34
35pub const PROGRAM_NAME: &str = "persistence";
37pub const PERSIST_NODE_NAME: &str = "persist";
38pub const PUBLISHED_TIME_KEY: &str = "published";
40
41#[derive(FromArgs, Debug, PartialEq)]
43#[argh(subcommand, name = "persistence")]
44pub struct CommandLine {}
45
46pub async fn main(_args: CommandLine) -> Result<(), Error> {
47 info!("Starting Diagnostics Persistence Service service");
48 let lifecycle =
49 fuchsia_runtime::take_startup_handle(HandleInfo::new(HandleType::Lifecycle, 0)).unwrap();
50 let lifecycle: zx::Channel = lifecycle.into();
51 let lifecycle: endpoints::ServerEnd<flifecycle::LifecycleMarker> = lifecycle.into();
52 let (mut lifecycle_request_stream, _) = lifecycle.into_stream_and_control_handle();
53 let mut lifecycle_task = pin!(
54 async move {
55 match lifecycle_request_stream.next().await {
56 Some(Ok(flifecycle::LifecycleRequest::Stop { .. })) => {
57 debug!("Received stop request");
58 }
59 Some(Err(e)) => {
60 error!("Received FIDL error from Lifecycle: {e:?}");
61 std::future::pending::<()>().await
62 }
63 None => {
64 debug!("Lifecycle request stream closed");
65 std::future::pending::<()>().await
66 }
67 }
68 }
69 .fuse()
70 );
71
72 let mut health = component::health();
73 let config = persistence_config::load_configuration_files().context("Error loading configs")?;
74 let build_config = BuildConfig::take_from_startup_handle();
75 let inspector = component::inspector();
76 inspector.root().record_child("config", |config_node| build_config.record_inspect(config_node));
77 let _inspect_server_task =
78 inspect_runtime::publish(inspector, inspect_runtime::PublishOptions::default());
79
80 file_handler::forget_old_data(&config);
81
82 let scope = fasync::Scope::new();
83 let scheduler =
84 Scheduler::new(scope.to_handle(), &config).context("Error creating scheduler")?;
85
86 let scope = fasync::Scope::new();
88 let (outgoing_dir_task, service_scope) =
89 spawn_persist_services(&config, scheduler).await.expect("Error spawning persist services");
90
91 scope.spawn(async move {
96 if build_config.skip_update_check {
97 info!("Skipping the update check, publishing previous boot data");
98 } else if let Err(e) = wait_for_update().await {
99 warn!(e:?; "Will not publish previous boot data");
100 return;
101 }
102
103 inspector.root().record_child(PERSIST_NODE_NAME, |node| {
104 inspect_server::serve_persisted_data(node);
105 health.set_ok();
106 info!("Diagnostics Persistence Service ready");
107 });
108 inspector.root().record_int(PUBLISHED_TIME_KEY, BootInstant::get().into_nanos());
109 });
110
111 let mut outgoing_dir_task = outgoing_dir_task.fuse();
112
113 select! {
114 _ = lifecycle_task => {
115 info!("Stopping due to lifecycle request");
116 service_scope.cancel().await;
117 scope.cancel().await;
118 },
119 _ = outgoing_dir_task => {
120 info!("Stopping due to idle activity");
121 service_scope.cancel().await;
122 scope.join().await;
123 },
124 }
125
126 Ok(())
127}
128
129async fn wait_for_update() -> Result<(), Error> {
130 info!("Waiting for post-boot update check...");
131 let (notifier_client, mut notifier_request_stream) =
132 fidl::endpoints::create_request_stream::<fupdate::NotifierMarker>();
133 match fuchsia_component::client::connect_to_protocol::<fupdate::ListenerMarker>() {
134 Ok(proxy) => {
135 proxy.notify_on_first_update_check(
136 fupdate::ListenerNotifyOnFirstUpdateCheckRequest {
137 notifier: Some(notifier_client),
138 ..Default::default()
139 },
140 )?;
141 }
142 Err(e) => {
143 warn!(
144 e:?;
145 "Unable to connect to fuchsia.update.Listener; will publish immediately"
146 );
147
148 return Ok(());
149 }
150 }
151
152 match notifier_request_stream.try_next().await {
153 Ok(Some(fupdate::NotifierRequest::Notify { control_handle: _ })) => {}
154 Ok(None) => {
155 return Err(anyhow::anyhow!("Did not receive update notification; not publishing"));
156 }
157 Err(e) => {
158 return Err(anyhow::anyhow!(
159 "Error waiting for update notification; not publishing: {e}"
160 ));
161 }
162 }
163
164 info!("...Update check has completed; publishing previous boot data");
166 Ok(())
167}
168
169enum IncomingRequest {
170 Router(fsandbox::DictionaryRouterRequestStream),
171}
172
173async fn spawn_persist_services(
176 config: &Config,
177 scheduler: Scheduler,
178) -> Result<(impl Future<Output = ()>, fasync::Scope), Error> {
179 let store = client::connect_to_protocol::<fsandbox::CapabilityStoreMarker>().unwrap();
180 let id_gen = sandbox::CapabilityIdGenerator::new();
181
182 let services_dict = id_gen.next();
183 store
184 .dictionary_create(services_dict)
185 .await
186 .context("Failed to send FIDL to create dictionary")?
187 .map_err(|e| format_err!("Failed to create dictionary: {e:?}"))?;
188
189 let service_scope = fasync::Scope::new();
190 for service_name in config.keys() {
191 let connector_id = id_gen.next();
192 let (receiver, receiver_stream) =
193 endpoints::create_request_stream::<fsandbox::ReceiverMarker>();
194
195 store
196 .connector_create(connector_id, receiver)
197 .await
198 .context("Failed to send FIDL to create connector")?
199 .map_err(|e| format_err!("Failed to create connector: {e:?}"))?;
200
201 store
202 .dictionary_insert(
203 services_dict,
204 &fsandbox::DictionaryItem {
205 key: format!("{}-{}", constants::PERSIST_SERVICE_NAME_PREFIX, service_name),
206 value: connector_id,
207 },
208 )
209 .await
210 .context(
211 "Failed to send FIDL to insert into diagnostics-persist-capabilities dictionary",
212 )?
213 .map_err(|e| {
214 format_err!(
215 "Failed to insert into diagnostics-persist-capabilities dictionary: {e:?}"
216 )
217 })?;
218
219 PersistServer::spawn(
220 service_name.clone(),
221 scheduler.clone(),
222 service_scope.to_handle(),
223 receiver_stream,
224 );
225 }
226
227 let mut fs = ServiceFs::new();
229 fs.dir("svc").add_fidl_service(IncomingRequest::Router);
230 fs.take_and_serve_directory_handle().expect("Failed to take service directory handle");
231 let outgoing_dir_task =
232 fs.for_each_concurrent(None, move |IncomingRequest::Router(mut stream)| {
233 let store = store.clone();
234 let id_gen = id_gen.clone();
235 async move {
236 while let Ok(Some(request)) = stream.try_next().await {
237 match request {
238 fsandbox::DictionaryRouterRequest::Route { payload: _, responder } => {
239 let dup_dict_id = id_gen.next();
240 store.duplicate(services_dict, dup_dict_id).await.unwrap().unwrap();
241 let capability = store.export(dup_dict_id).await.unwrap().unwrap();
242 let fsandbox::Capability::Dictionary(dict) = capability else {
243 panic!("capability was not a dictionary? {capability:?}");
244 };
245 let _ = responder.send(Ok(
246 fsandbox::DictionaryRouterRouteResponse::Dictionary(dict),
247 ));
248 }
249 fsandbox::DictionaryRouterRequest::_UnknownMethod { ordinal, .. } => {
250 warn!(ordinal:%; "Unknown DictionaryRouter request");
251 }
252 }
253 }
254 }
255 });
256
257 Ok((outgoing_dir_task, service_scope))
258}