persistence/
file_handler.rs1use diagnostics_data::ExtendedMoniker;
6use glob::glob;
7use log::{info, warn};
8use persistence_config::{Config, ServiceName, Tag};
9use serde::ser::SerializeMap;
10use serde::{Serialize, Serializer};
11use serde_json::Value;
12use std::collections::HashMap;
13use std::fs;
14
15const CURRENT_PATH: &str = "/cache/current";
16const PREVIOUS_PATH: &str = "/cache/previous";
17
18pub(crate) struct PersistSchema {
19 pub timestamps: Timestamps,
20 pub payload: PersistPayload,
21}
22
23pub(crate) enum PersistPayload {
24 Data(PersistData),
25 Error(String),
26}
27
28pub(crate) struct PersistData {
29 pub data_length: usize,
30 pub entries: HashMap<ExtendedMoniker, Value>,
31}
32
33#[derive(Clone, Serialize)]
34pub(crate) struct Timestamps {
35 pub before_monotonic: i64,
36 pub before_utc: i64,
37 pub after_monotonic: i64,
38 pub after_utc: i64,
39}
40
41const TIMESTAMPS_KEY: &str = "@timestamps";
43const SIZE_KEY: &str = "@persist_size";
44const ERROR_KEY: &str = ":error";
45const ERROR_DESCRIPTION_KEY: &str = "description";
46
47impl Serialize for PersistSchema {
48 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
49 where
50 S: Serializer,
51 {
52 match &self.payload {
53 PersistPayload::Data(data) => {
54 let mut s = serializer.serialize_map(Some(data.entries.len() + 2))?;
55 s.serialize_entry(TIMESTAMPS_KEY, &self.timestamps)?;
56 s.serialize_entry(SIZE_KEY, &data.data_length)?;
57 for (k, v) in data.entries.iter() {
58 s.serialize_entry(&k.to_string(), v)?;
59 }
60 s.end()
61 }
62 PersistPayload::Error(error) => {
63 let mut s = serializer.serialize_map(Some(2))?;
64 s.serialize_entry(TIMESTAMPS_KEY, &self.timestamps)?;
65 s.serialize_entry(ERROR_KEY, &ErrorHelper(error))?;
66 s.end()
67 }
68 }
69 }
70}
71
72impl PersistSchema {
73 pub(crate) fn error(timestamps: Timestamps, description: String) -> Self {
74 Self { timestamps, payload: PersistPayload::Error(description) }
75 }
76}
77
78struct ErrorHelper<'a>(&'a str);
79
80impl Serialize for ErrorHelper<'_> {
81 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
82 where
83 S: Serializer,
84 {
85 let mut s = serializer.serialize_map(Some(1))?;
86 s.serialize_entry(ERROR_DESCRIPTION_KEY, self.0)?;
87 s.end()
88 }
89}
90
91pub fn forget_old_data(config: &Config) {
99 info!(
100 "Forgetting persisted inspect data from two boots ago, except for tags with persist_across_boot enabled"
101 );
102
103 fs::remove_dir_all(PREVIOUS_PATH)
105 .map_err(|e| info!("Could not delete {}: {:?}", PREVIOUS_PATH, e))
106 .ok();
107 fs::rename(CURRENT_PATH, PREVIOUS_PATH)
108 .map_err(|e| info!("Could not move {} to {}: {:?}", CURRENT_PATH, PREVIOUS_PATH, e))
109 .ok();
110
111 let mut copied_count = 0;
113
114 for (service, tag) in config.iter().flat_map(|(service, tags)| {
115 tags.iter().filter(|(_, c)| c.persist_across_boot).map(move |(tag, _)| (service, tag))
116 }) {
117 match fs::read(format!("{PREVIOUS_PATH}/{service}/{tag}")) {
118 Ok(data) => {
119 match fs::create_dir(format!("{CURRENT_PATH}/{service}")) {
120 Ok(()) => {}
121 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {}
122 Err(e) => {
123 warn!("Error creating directory {CURRENT_PATH}/{service}: {e:?}");
124 continue;
125 }
126 }
127 match fs::write(format!("{CURRENT_PATH}/{service}/{tag}"), data) {
128 Ok(()) => {
129 copied_count += 1;
130 }
131 Err(e) => {
132 warn!("Error writing persisted data for {service}/{tag}: {e:?}");
133 }
134 }
135 }
136 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
137 }
139 Err(e) => {
140 warn!("Error reading persisted data for {service}/{tag}: {e:?}")
141 }
142 }
143 }
144
145 info!("Persisted {copied_count} tags across boot");
146}
147
148pub(crate) fn write(service_name: &ServiceName, tag: &Tag, data: &PersistSchema) {
150 let path = format!("{CURRENT_PATH}/{service_name}");
152 if let Err(e) = fs::create_dir_all(&path) {
153 warn!("Could not create directory {}: {:?}", path, e);
154 }
155 let data = match serde_json::to_string(data) {
156 Ok(data) => data,
157 Err(e) => {
158 warn!("Could not serialize data - unexpected error {e}");
159 return;
160 }
161 };
162 if let Err(e) = fs::write(format!("{path}/{tag}"), data) {
163 warn!("Could not write file {}/{}: {:?}", path, tag, e);
164 }
165}
166
167pub(crate) struct ServiceEntry {
168 pub name: String,
169 pub data: Vec<TagEntry>,
170}
171
172pub(crate) struct TagEntry {
173 pub name: String,
174 pub data: String,
175}
176
177pub(crate) fn remembered_data() -> impl Iterator<Item = ServiceEntry> {
180 glob(&format!("{PREVIOUS_PATH}/*"))
183 .expect("Failed to read previous-path glob pattern")
184 .filter_map(|p| match p {
185 Ok(path) => {
186 path.file_name().map(|p| p.to_string_lossy().to_string())
187 }
188 Err(e) => {
189 warn!("Encountered GlobError; contents could not be read to determine if glob pattern was matched: {e:?}");
190 None
191 }
192 })
193 .map(|service_name| {
194 let entries: Vec<TagEntry> = glob(&format!("{PREVIOUS_PATH}/{service_name}/*"))
195 .expect("Failed to read previous service persistence pattern")
196 .filter_map(|p| match p {
197 Ok(path) => path
198 .file_name()
199 .map(|tag| (path.clone(), tag.to_string_lossy().to_string())),
200 Err(ref e) => {
201 warn!("Failed to retrieve text persisted at path {p:?}: {e:?}");
202 None
203 }
204 })
205 .filter_map(|(path, tag)| match fs::read(&path) {
206 Ok(text) => match std::str::from_utf8(&text) {
210 Ok(contents) => Some(TagEntry { name: tag, data: contents.to_owned() }),
211 Err(e) => {
212 warn!("Failed to parse persisted bytes at path: {path:?} into text: {e:?}");
213 None
214 }
215 },
216 Err(e) => {
217 warn!("Failed to retrieve text persisted at path: {path:?}: {e:?}");
218 None
219 }
220 })
221 .collect();
222
223 if entries.is_empty() {
224 info!("No data available to persist for {service_name:?}.");
225 } else {
226 info!("{} data entries available to persist for {service_name:?}.", entries.len());
227 }
228
229 ServiceEntry { name: service_name, data: entries }
230 })
231}