persistence/
persist_server.rs1use crate::Scheduler;
6use anyhow::{format_err, Error};
7use fidl::endpoints;
8use fidl_fuchsia_diagnostics_persist::{
9 DataPersistenceMarker, DataPersistenceRequest, DataPersistenceRequestStream, PersistResult,
10};
11use futures::{StreamExt, TryStreamExt};
12use log::*;
13use persistence_config::{ServiceName, Tag};
14use std::collections::HashSet;
15use std::sync::Arc;
16use {fidl_fuchsia_component_sandbox as fsandbox, fuchsia_async as fasync};
17
18pub struct PersistServerData {
19 service_name: ServiceName,
21 tags: HashSet<Tag>,
24 scheduler: Scheduler,
26}
27
28pub(crate) struct PersistServer;
30
31impl PersistServer {
32 pub fn spawn(
34 service_name: ServiceName,
35 tags: Vec<Tag>,
36 scheduler: Scheduler,
37 scope: &fasync::Scope,
38 requests: fsandbox::ReceiverRequestStream,
39 ) {
40 let tags = HashSet::from_iter(tags);
41 let data = Arc::new(PersistServerData { service_name, tags, scheduler });
42
43 let scope_handle = scope.to_handle();
44 scope.spawn(Self::accept_connections(data, requests, scope_handle));
45 }
46
47 async fn accept_connections(
48 data: Arc<PersistServerData>,
49 mut stream: fsandbox::ReceiverRequestStream,
50 scope: fasync::ScopeHandle,
51 ) {
52 while let Some(request) = stream.try_next().await.unwrap() {
53 match request {
54 fsandbox::ReceiverRequest::Receive { channel, control_handle: _ } => {
55 let data = data.clone();
56 scope.spawn(async move {
57 let server_end =
58 endpoints::ServerEnd::<DataPersistenceMarker>::new(channel);
59 let stream: DataPersistenceRequestStream = server_end.into_stream();
60 if let Err(e) = Self::handle_requests(data, stream).await {
61 warn!("error handling persistence request: {e}");
62 }
63 });
64 }
65 fsandbox::ReceiverRequest::_UnknownMethod { ordinal, .. } => {
66 warn!(ordinal:%; "Unknown Receiver request");
67 }
68 }
69 }
70 }
71
72 async fn handle_requests(
73 data: Arc<PersistServerData>,
74 mut stream: DataPersistenceRequestStream,
75 ) -> Result<(), Error> {
76 while let Some(request) = stream.next().await {
77 let request =
78 request.map_err(|e| format_err!("error handling persistence request: {e:?}"))?;
79
80 match request {
81 DataPersistenceRequest::Persist { tag, responder, .. } => {
82 let response = if let Ok(tag) = Tag::new(tag) {
83 if data.tags.contains(&tag) {
84 data.scheduler.schedule(&data.service_name, vec![tag]);
85 PersistResult::Queued
86 } else {
87 PersistResult::BadName
88 }
89 } else {
90 PersistResult::BadName
91 };
92 responder.send(response).map_err(|err| {
93 format_err!("Failed to respond {:?} to client: {}", response, err)
94 })?;
95 }
96 DataPersistenceRequest::PersistTags { tags, responder, .. } => {
97 let (response, tags) = validate_tags(&data.tags, &tags);
98 if !tags.is_empty() {
99 data.scheduler.schedule(&data.service_name, tags);
100 }
101 responder.send(&response).map_err(|err| {
102 format_err!("Failed to respond {:?} to client: {}", response, err)
103 })?;
104 }
105 }
106 }
107 Ok(())
108 }
109}
110
111fn validate_tags(service_tags: &HashSet<Tag>, tags: &[String]) -> (Vec<PersistResult>, Vec<Tag>) {
112 let mut response = vec![];
113 let mut good_tags = vec![];
114 for tag in tags.iter() {
115 if let Ok(tag) = Tag::new(tag.to_string()) {
116 if service_tags.contains(&tag) {
117 response.push(PersistResult::Queued);
118 good_tags.push(tag);
119 } else {
120 response.push(PersistResult::BadName);
121 warn!("Tag '{}' was requested but is not configured", tag);
122 }
123 } else {
124 response.push(PersistResult::BadName);
125 warn!("Tag '{}' was requested but is not a valid tag string", tag);
126 }
127 }
128 (response, good_tags)
129}