persistence/
persist_server.rs1use crate::Scheduler;
6use anyhow::{format_err, Error};
7use fidl_fuchsia_diagnostics_persist::{
8 DataPersistenceRequest, DataPersistenceRequestStream, PersistResult,
9};
10use fuchsia_async as fasync;
11use futures::StreamExt;
12use log::*;
13use persistence_config::{ServiceName, Tag};
14use std::collections::HashSet;
15use std::sync::Arc;
16
17pub struct PersistServerData {
18 service_name: ServiceName,
20 tags: HashSet<Tag>,
23 scheduler: Scheduler,
25}
26
27pub(crate) struct PersistServer {
28 data: Arc<PersistServerData>,
30
31 scope: fasync::Scope,
33}
34
35impl PersistServer {
36 pub fn create(
37 service_name: ServiceName,
38 tags: Vec<Tag>,
39 scheduler: Scheduler,
40 scope: fasync::Scope,
41 ) -> PersistServer {
42 let tags = HashSet::from_iter(tags);
43 Self { data: Arc::new(PersistServerData { service_name, tags, scheduler }), scope }
44 }
45
46 pub fn spawn(&self, stream: DataPersistenceRequestStream) {
48 let data = self.data.clone();
49 self.scope.spawn(async move {
50 if let Err(e) = Self::handle_requests(data, stream).await {
51 warn!("error handling persistence request: {e}");
52 }
53 });
54 }
55
56 async fn handle_requests(
57 data: Arc<PersistServerData>,
58 mut stream: DataPersistenceRequestStream,
59 ) -> Result<(), Error> {
60 while let Some(request) = stream.next().await {
61 let request =
62 request.map_err(|e| format_err!("error handling persistence request: {e:?}"))?;
63
64 match request {
65 DataPersistenceRequest::Persist { tag, responder, .. } => {
66 let response = if let Ok(tag) = Tag::new(tag) {
67 if data.tags.contains(&tag) {
68 data.scheduler.schedule(&data.service_name, vec![tag]);
69 PersistResult::Queued
70 } else {
71 PersistResult::BadName
72 }
73 } else {
74 PersistResult::BadName
75 };
76 responder.send(response).map_err(|err| {
77 format_err!("Failed to respond {:?} to client: {}", response, err)
78 })?;
79 }
80 DataPersistenceRequest::PersistTags { tags, responder, .. } => {
81 let (response, tags) = validate_tags(&data.tags, &tags);
82 if !tags.is_empty() {
83 data.scheduler.schedule(&data.service_name, tags);
84 }
85 responder.send(&response).map_err(|err| {
86 format_err!("Failed to respond {:?} to client: {}", response, err)
87 })?;
88 }
89 }
90 }
91 Ok(())
92 }
93}
94
95fn validate_tags(service_tags: &HashSet<Tag>, tags: &[String]) -> (Vec<PersistResult>, Vec<Tag>) {
96 let mut response = vec![];
97 let mut good_tags = vec![];
98 for tag in tags.iter() {
99 if let Ok(tag) = Tag::new(tag.to_string()) {
100 if service_tags.contains(&tag) {
101 response.push(PersistResult::Queued);
102 good_tags.push(tag);
103 } else {
104 response.push(PersistResult::BadName);
105 warn!("Tag '{}' was requested but is not configured", tag);
106 }
107 } else {
108 response.push(PersistResult::BadName);
109 warn!("Tag '{}' was requested but is not a valid tag string", tag);
110 }
111 }
112 (response, good_tags)
113}