persistence/
persist_server.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::Scheduler;
6use anyhow::{format_err, Error};
7use fidl_fuchsia_diagnostics_persist::{
8    DataPersistenceRequest, DataPersistenceRequestStream, PersistResult,
9};
10use fuchsia_async as fasync;
11use futures::StreamExt;
12use log::*;
13use persistence_config::{ServiceName, Tag};
14use std::collections::HashSet;
15use std::sync::Arc;
16
17pub struct PersistServerData {
18    // Service name that this persist server is hosting.
19    service_name: ServiceName,
20    // Mapping from a string tag to an archive reader
21    // configured to fetch a specific set of selectors.
22    tags: HashSet<Tag>,
23    // Scheduler that will handle the persist requests
24    scheduler: Scheduler,
25}
26
27pub(crate) struct PersistServer {
28    /// Persist server data.
29    data: Arc<PersistServerData>,
30
31    /// Scope in which we spawn the server task.
32    scope: fasync::Scope,
33}
34
35impl PersistServer {
36    pub fn create(
37        service_name: ServiceName,
38        tags: Vec<Tag>,
39        scheduler: Scheduler,
40        scope: fasync::Scope,
41    ) -> PersistServer {
42        let tags = HashSet::from_iter(tags);
43        Self { data: Arc::new(PersistServerData { service_name, tags, scheduler }), scope }
44    }
45
46    /// Spawn a task to handle requests from components.
47    pub fn spawn(&self, stream: DataPersistenceRequestStream) {
48        let data = self.data.clone();
49        self.scope.spawn(async move {
50            if let Err(e) = Self::handle_requests(data, stream).await {
51                warn!("error handling persistence request: {e}");
52            }
53        });
54    }
55
56    async fn handle_requests(
57        data: Arc<PersistServerData>,
58        mut stream: DataPersistenceRequestStream,
59    ) -> Result<(), Error> {
60        while let Some(request) = stream.next().await {
61            let request =
62                request.map_err(|e| format_err!("error handling persistence request: {e:?}"))?;
63
64            match request {
65                DataPersistenceRequest::Persist { tag, responder, .. } => {
66                    let response = if let Ok(tag) = Tag::new(tag) {
67                        if data.tags.contains(&tag) {
68                            data.scheduler.schedule(&data.service_name, vec![tag]);
69                            PersistResult::Queued
70                        } else {
71                            PersistResult::BadName
72                        }
73                    } else {
74                        PersistResult::BadName
75                    };
76                    responder.send(response).map_err(|err| {
77                        format_err!("Failed to respond {:?} to client: {}", response, err)
78                    })?;
79                }
80                DataPersistenceRequest::PersistTags { tags, responder, .. } => {
81                    let (response, tags) = validate_tags(&data.tags, &tags);
82                    if !tags.is_empty() {
83                        data.scheduler.schedule(&data.service_name, tags);
84                    }
85                    responder.send(&response).map_err(|err| {
86                        format_err!("Failed to respond {:?} to client: {}", response, err)
87                    })?;
88                }
89            }
90        }
91        Ok(())
92    }
93}
94
95fn validate_tags(service_tags: &HashSet<Tag>, tags: &[String]) -> (Vec<PersistResult>, Vec<Tag>) {
96    let mut response = vec![];
97    let mut good_tags = vec![];
98    for tag in tags.iter() {
99        if let Ok(tag) = Tag::new(tag.to_string()) {
100            if service_tags.contains(&tag) {
101                response.push(PersistResult::Queued);
102                good_tags.push(tag);
103            } else {
104                response.push(PersistResult::BadName);
105                warn!("Tag '{}' was requested but is not configured", tag);
106            }
107        } else {
108            response.push(PersistResult::BadName);
109            warn!("Tag '{}' was requested but is not a valid tag string", tag);
110        }
111    }
112    (response, good_tags)
113}