persistence/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! `diagnostics-persistence` component persists Inspect VMOs and serves them at the next boot.
6
7mod constants;
8mod fetcher;
9mod file_handler;
10mod inspect_server;
11mod persist_server;
12mod scheduler;
13
14use anyhow::{bail, format_err, Context, Error};
15use argh::FromArgs;
16use fetcher::Fetcher;
17use fidl::endpoints;
18use fuchsia_component::client;
19use fuchsia_component::server::ServiceFs;
20use fuchsia_inspect::component;
21use fuchsia_inspect::health::Reporter;
22use futures::{StreamExt, TryStreamExt};
23use log::*;
24use persist_server::PersistServer;
25use persistence_config::Config;
26use scheduler::Scheduler;
27use zx::BootInstant;
28use {
29    fidl_fuchsia_component_sandbox as fsandbox, fidl_fuchsia_update as fupdate,
30    fuchsia_async as fasync,
31};
32
33/// The name of the subcommand and the logs-tag.
34pub const PROGRAM_NAME: &str = "persistence";
35pub const PERSIST_NODE_NAME: &str = "persist";
36/// Added after persisted data is fully published
37pub const PUBLISHED_TIME_KEY: &str = "published";
38
39/// Command line args
40#[derive(FromArgs, Debug, PartialEq)]
41#[argh(subcommand, name = "persistence")]
42pub struct CommandLine {}
43
44// on_error logs any errors from `value` and then returns a Result.
45// value must return a Result; error_message must contain one {} to put the error in.
46macro_rules! on_error {
47    ($value:expr, $error_message:expr) => {
48        $value.or_else(|e| {
49            let message = format!($error_message, e);
50            warn!("{}", message);
51            bail!("{}", message)
52        })
53    };
54}
55
56pub async fn main(_args: CommandLine) -> Result<(), Error> {
57    info!("Starting Diagnostics Persistence Service service");
58    let mut health = component::health();
59    let config =
60        on_error!(persistence_config::load_configuration_files(), "Error loading configs: {}")?;
61    let inspector = component::inspector();
62    let _inspect_server_task =
63        inspect_runtime::publish(inspector, inspect_runtime::PublishOptions::default());
64
65    file_handler::forget_old_data(&config);
66
67    // Create the Inspect fetcher
68    let (fetch_requester, _fetcher_task) =
69        on_error!(Fetcher::new(&config), "Error initializing fetcher: {}")?;
70
71    let scope = fasync::Scope::new();
72    let scheduler = Scheduler::new(scope.to_handle(), fetch_requester, &config);
73
74    // Add a persistence fidl service for each service defined in the config files.
75    let scope = fasync::Scope::new();
76    let services_scope = scope.new_child_with_name("services");
77
78    let _service_scopes = spawn_persist_services(&config, scheduler, &services_scope)
79        .await
80        .expect("Error spawning persist services");
81
82    // Before serving previous data, wait until the post-boot system update check has finished.
83    // Note: We're already accepting persist requests. If we receive a request, store
84    // some data, and then cache is cleared after data is persisted, that data will be lost. This
85    // is correct behavior - we don't want to remember anything from before the cache was cleared.
86    scope.spawn(async move {
87        info!("Waiting for post-boot update check...");
88        let (notifier_client, mut notifier_request_stream) =
89            fidl::endpoints::create_request_stream::<fupdate::NotifierMarker>();
90        match fuchsia_component::client::connect_to_protocol::<fupdate::ListenerMarker>() {
91            Ok(proxy) => {
92                if let Err(e) = proxy.notify_on_first_update_check(
93                    fupdate::ListenerNotifyOnFirstUpdateCheckRequest {
94                        notifier: Some(notifier_client),
95                        ..Default::default()
96                    },
97                ) {
98                    error!(e:?; "Error subscribing to first update check; not publishing");
99                    return;
100                }
101            }
102            Err(e) => {
103                warn!(
104                    e:?;
105                    "Unable to connect to fuchsia.update.Listener; will publish immediately"
106                );
107            }
108        }
109
110        match notifier_request_stream.try_next().await {
111            Ok(Some(fupdate::NotifierRequest::Notify { control_handle: _ })) => {}
112            Ok(None) => {
113                warn!("Did not receive update notification; not publishing");
114                return;
115            }
116            Err(e) => {
117                error!("Error waiting for update notification; not publishing: {e}");
118                return;
119            }
120        }
121
122        // Start serving previous boot data
123        info!("...Update check has completed; publishing previous boot data");
124        inspector.root().record_child(PERSIST_NODE_NAME, |node| {
125            inspect_server::serve_persisted_data(node);
126            health.set_ok();
127            info!("Diagnostics Persistence Service ready");
128        });
129        inspector.root().record_int(PUBLISHED_TIME_KEY, BootInstant::get().into_nanos());
130    });
131
132    scope.await;
133
134    Ok(())
135}
136
137enum IncomingRequest {
138    Router(fsandbox::DictionaryRouterRequestStream),
139}
140
141// Serve a DataPersistence capability for each service defined in `config` using
142// a dynamic dictionary.
143async fn spawn_persist_services(
144    config: &Config,
145    scheduler: Scheduler,
146    scope: &fasync::Scope,
147) -> Result<Vec<fasync::Scope>, Error> {
148    let store = client::connect_to_protocol::<fsandbox::CapabilityStoreMarker>().unwrap();
149    let id_gen = sandbox::CapabilityIdGenerator::new();
150
151    let services_dict = id_gen.next();
152    store
153        .dictionary_create(services_dict)
154        .await
155        .context("Failed to send FIDL to create dictionary")?
156        .map_err(|e| format_err!("Failed to create dictionary: {e:?}"))?;
157
158    // Register each service with the exposed CFv2 dynamic dictionary.
159    let mut service_scopes = Vec::with_capacity(config.len());
160
161    for (service_name, tags) in config {
162        let connector_id = id_gen.next();
163        let (receiver, receiver_stream) =
164            endpoints::create_request_stream::<fsandbox::ReceiverMarker>();
165
166        store
167            .connector_create(connector_id, receiver)
168            .await
169            .context("Failed to send FIDL to create connector")?
170            .map_err(|e| format_err!("Failed to create connector: {e:?}"))?;
171
172        store
173            .dictionary_insert(
174                services_dict,
175                &fsandbox::DictionaryItem {
176                    key: format!("{}-{}", constants::PERSIST_SERVICE_NAME_PREFIX, service_name),
177                    value: connector_id,
178                },
179            )
180            .await
181            .context(
182                "Failed to send FIDL to insert into diagnostics-persist-capabilities dictionary",
183            )?
184            .map_err(|e| {
185                format_err!(
186                    "Failed to insert into diagnostics-persist-capabilities dictionary: {e:?}"
187                )
188            })?;
189
190        let service_scope = scope.new_child_with_name(service_name.clone());
191        PersistServer::spawn(
192            service_name.clone(),
193            tags.keys().cloned().collect(),
194            scheduler.clone(),
195            &service_scope,
196            receiver_stream,
197        );
198        service_scopes.push(service_scope);
199    }
200
201    // Expose the dynamic dictionary.
202    let mut fs = ServiceFs::new();
203    fs.dir("svc").add_fidl_service(IncomingRequest::Router);
204    fs.take_and_serve_directory_handle().expect("Failed to take service directory handle");
205    scope.spawn(fs.for_each_concurrent(None, move |IncomingRequest::Router(mut stream)| {
206        let store = store.clone();
207        let id_gen = id_gen.clone();
208        async move {
209            while let Ok(Some(request)) = stream.try_next().await {
210                match request {
211                    fsandbox::DictionaryRouterRequest::Route { payload: _, responder } => {
212                        let dup_dict_id = id_gen.next();
213                        store.duplicate(services_dict, dup_dict_id).await.unwrap().unwrap();
214                        let capability = store.export(dup_dict_id).await.unwrap().unwrap();
215                        let fsandbox::Capability::Dictionary(dict) = capability else {
216                            panic!("capability was not a dictionary? {capability:?}");
217                        };
218                        let _ = responder
219                            .send(Ok(fsandbox::DictionaryRouterRouteResponse::Dictionary(dict)));
220                    }
221                    fsandbox::DictionaryRouterRequest::_UnknownMethod { ordinal, .. } => {
222                        warn!(ordinal:%; "Unknown DictionaryRouter request");
223                    }
224                }
225            }
226        }
227    }));
228
229    Ok(service_scopes)
230}