1mod constants;
8mod fetcher;
9mod file_handler;
10mod inspect_server;
11mod persist_server;
12mod scheduler;
13
14use anyhow::{bail, format_err, Context, Error};
15use argh::FromArgs;
16use fetcher::Fetcher;
17use fidl::endpoints;
18use fuchsia_component::client;
19use fuchsia_component::server::ServiceFs;
20use fuchsia_inspect::component;
21use fuchsia_inspect::health::Reporter;
22use futures::{StreamExt, TryStreamExt};
23use log::*;
24use persist_server::PersistServer;
25use persistence_config::Config;
26use scheduler::Scheduler;
27use zx::BootInstant;
28use {
29 fidl_fuchsia_component_sandbox as fsandbox, fidl_fuchsia_update as fupdate,
30 fuchsia_async as fasync,
31};
32
33pub const PROGRAM_NAME: &str = "persistence";
35pub const PERSIST_NODE_NAME: &str = "persist";
36pub const PUBLISHED_TIME_KEY: &str = "published";
38
39#[derive(FromArgs, Debug, PartialEq)]
41#[argh(subcommand, name = "persistence")]
42pub struct CommandLine {}
43
44macro_rules! on_error {
47 ($value:expr, $error_message:expr) => {
48 $value.or_else(|e| {
49 let message = format!($error_message, e);
50 warn!("{}", message);
51 bail!("{}", message)
52 })
53 };
54}
55
56pub async fn main(_args: CommandLine) -> Result<(), Error> {
57 info!("Starting Diagnostics Persistence Service service");
58 let mut health = component::health();
59 let config =
60 on_error!(persistence_config::load_configuration_files(), "Error loading configs: {}")?;
61 let inspector = component::inspector();
62 let _inspect_server_task =
63 inspect_runtime::publish(inspector, inspect_runtime::PublishOptions::default());
64
65 file_handler::forget_old_data(&config);
66
67 let (fetch_requester, _fetcher_task) =
69 on_error!(Fetcher::new(&config), "Error initializing fetcher: {}")?;
70
71 let scope = fasync::Scope::new();
72 let scheduler = Scheduler::new(scope.to_handle(), fetch_requester, &config);
73
74 let scope = fasync::Scope::new();
76 let services_scope = scope.new_child_with_name("services");
77
78 let _service_scopes = spawn_persist_services(&config, scheduler, &services_scope)
79 .await
80 .expect("Error spawning persist services");
81
82 scope.spawn(async move {
87 info!("Waiting for post-boot update check...");
88 let (notifier_client, mut notifier_request_stream) =
89 fidl::endpoints::create_request_stream::<fupdate::NotifierMarker>();
90 match fuchsia_component::client::connect_to_protocol::<fupdate::ListenerMarker>() {
91 Ok(proxy) => {
92 if let Err(e) = proxy.notify_on_first_update_check(
93 fupdate::ListenerNotifyOnFirstUpdateCheckRequest {
94 notifier: Some(notifier_client),
95 ..Default::default()
96 },
97 ) {
98 error!(e:?; "Error subscribing to first update check; not publishing");
99 return;
100 }
101 }
102 Err(e) => {
103 warn!(
104 e:?;
105 "Unable to connect to fuchsia.update.Listener; will publish immediately"
106 );
107 }
108 }
109
110 match notifier_request_stream.try_next().await {
111 Ok(Some(fupdate::NotifierRequest::Notify { control_handle: _ })) => {}
112 Ok(None) => {
113 warn!("Did not receive update notification; not publishing");
114 return;
115 }
116 Err(e) => {
117 error!("Error waiting for update notification; not publishing: {e}");
118 return;
119 }
120 }
121
122 info!("...Update check has completed; publishing previous boot data");
124 inspector.root().record_child(PERSIST_NODE_NAME, |node| {
125 inspect_server::serve_persisted_data(node);
126 health.set_ok();
127 info!("Diagnostics Persistence Service ready");
128 });
129 inspector.root().record_int(PUBLISHED_TIME_KEY, BootInstant::get().into_nanos());
130 });
131
132 scope.await;
133
134 Ok(())
135}
136
137enum IncomingRequest {
138 Router(fsandbox::DictionaryRouterRequestStream),
139}
140
141async fn spawn_persist_services(
144 config: &Config,
145 scheduler: Scheduler,
146 scope: &fasync::Scope,
147) -> Result<Vec<fasync::Scope>, Error> {
148 let store = client::connect_to_protocol::<fsandbox::CapabilityStoreMarker>().unwrap();
149 let id_gen = sandbox::CapabilityIdGenerator::new();
150
151 let services_dict = id_gen.next();
152 store
153 .dictionary_create(services_dict)
154 .await
155 .context("Failed to send FIDL to create dictionary")?
156 .map_err(|e| format_err!("Failed to create dictionary: {e:?}"))?;
157
158 let mut service_scopes = Vec::with_capacity(config.len());
160
161 for (service_name, tags) in config {
162 let connector_id = id_gen.next();
163 let (receiver, receiver_stream) =
164 endpoints::create_request_stream::<fsandbox::ReceiverMarker>();
165
166 store
167 .connector_create(connector_id, receiver)
168 .await
169 .context("Failed to send FIDL to create connector")?
170 .map_err(|e| format_err!("Failed to create connector: {e:?}"))?;
171
172 store
173 .dictionary_insert(
174 services_dict,
175 &fsandbox::DictionaryItem {
176 key: format!("{}-{}", constants::PERSIST_SERVICE_NAME_PREFIX, service_name),
177 value: connector_id,
178 },
179 )
180 .await
181 .context(
182 "Failed to send FIDL to insert into diagnostics-persist-capabilities dictionary",
183 )?
184 .map_err(|e| {
185 format_err!(
186 "Failed to insert into diagnostics-persist-capabilities dictionary: {e:?}"
187 )
188 })?;
189
190 let service_scope = scope.new_child_with_name(service_name.clone());
191 PersistServer::spawn(
192 service_name.clone(),
193 tags.keys().cloned().collect(),
194 scheduler.clone(),
195 &service_scope,
196 receiver_stream,
197 );
198 service_scopes.push(service_scope);
199 }
200
201 let mut fs = ServiceFs::new();
203 fs.dir("svc").add_fidl_service(IncomingRequest::Router);
204 fs.take_and_serve_directory_handle().expect("Failed to take service directory handle");
205 scope.spawn(fs.for_each_concurrent(None, move |IncomingRequest::Router(mut stream)| {
206 let store = store.clone();
207 let id_gen = id_gen.clone();
208 async move {
209 while let Ok(Some(request)) = stream.try_next().await {
210 match request {
211 fsandbox::DictionaryRouterRequest::Route { payload: _, responder } => {
212 let dup_dict_id = id_gen.next();
213 store.duplicate(services_dict, dup_dict_id).await.unwrap().unwrap();
214 let capability = store.export(dup_dict_id).await.unwrap().unwrap();
215 let fsandbox::Capability::Dictionary(dict) = capability else {
216 panic!("capability was not a dictionary? {capability:?}");
217 };
218 let _ = responder
219 .send(Ok(fsandbox::DictionaryRouterRouteResponse::Dictionary(dict)));
220 }
221 fsandbox::DictionaryRouterRequest::_UnknownMethod { ordinal, .. } => {
222 warn!(ordinal:%; "Unknown DictionaryRouter request");
223 }
224 }
225 }
226 }
227 }));
228
229 Ok(service_scopes)
230}