persistence/
persist_server.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::Scheduler;
6use anyhow::{format_err, Error};
7use fidl::endpoints;
8use fidl_fuchsia_diagnostics_persist::{
9    DataPersistenceMarker, DataPersistenceRequest, DataPersistenceRequestStream, PersistResult,
10};
11use futures::{StreamExt, TryStreamExt};
12use log::*;
13use persistence_config::{ServiceName, Tag};
14use std::collections::HashSet;
15use std::sync::Arc;
16use {fidl_fuchsia_component_sandbox as fsandbox, fuchsia_async as fasync};
17
18pub struct PersistServerData {
19    // Service name that this persist server is hosting.
20    service_name: ServiceName,
21    // Mapping from a string tag to an archive reader
22    // configured to fetch a specific set of selectors.
23    tags: HashSet<Tag>,
24    // Scheduler that will handle the persist requests
25    scheduler: Scheduler,
26}
27
28/// PersistServer handles all requests for a single persistence service.
29pub(crate) struct PersistServer;
30
31impl PersistServer {
32    /// Spawn a task to handle requests from components through a dynamic dictionary.
33    pub fn spawn(
34        service_name: ServiceName,
35        tags: Vec<Tag>,
36        scheduler: Scheduler,
37        scope: &fasync::Scope,
38        requests: fsandbox::ReceiverRequestStream,
39    ) {
40        let tags = HashSet::from_iter(tags);
41        let data = Arc::new(PersistServerData { service_name, tags, scheduler });
42
43        let scope_handle = scope.to_handle();
44        scope.spawn(Self::accept_connections(data, requests, scope_handle));
45    }
46
47    async fn accept_connections(
48        data: Arc<PersistServerData>,
49        mut stream: fsandbox::ReceiverRequestStream,
50        scope: fasync::ScopeHandle,
51    ) {
52        while let Some(request) = stream.try_next().await.unwrap() {
53            match request {
54                fsandbox::ReceiverRequest::Receive { channel, control_handle: _ } => {
55                    let data = data.clone();
56                    scope.spawn(async move {
57                        let server_end =
58                            endpoints::ServerEnd::<DataPersistenceMarker>::new(channel);
59                        let stream: DataPersistenceRequestStream = server_end.into_stream();
60                        if let Err(e) = Self::handle_requests(data, stream).await {
61                            warn!("error handling persistence request: {e}");
62                        }
63                    });
64                }
65                fsandbox::ReceiverRequest::_UnknownMethod { ordinal, .. } => {
66                    warn!(ordinal:%; "Unknown Receiver request");
67                }
68            }
69        }
70    }
71
72    async fn handle_requests(
73        data: Arc<PersistServerData>,
74        mut stream: DataPersistenceRequestStream,
75    ) -> Result<(), Error> {
76        while let Some(request) = stream.next().await {
77            let request =
78                request.map_err(|e| format_err!("error handling persistence request: {e:?}"))?;
79
80            match request {
81                DataPersistenceRequest::Persist { tag, responder, .. } => {
82                    let response = if let Ok(tag) = Tag::new(tag) {
83                        if data.tags.contains(&tag) {
84                            data.scheduler.schedule(&data.service_name, vec![tag]);
85                            PersistResult::Queued
86                        } else {
87                            PersistResult::BadName
88                        }
89                    } else {
90                        PersistResult::BadName
91                    };
92                    responder.send(response).map_err(|err| {
93                        format_err!("Failed to respond {:?} to client: {}", response, err)
94                    })?;
95                }
96                DataPersistenceRequest::PersistTags { tags, responder, .. } => {
97                    let (response, tags) = validate_tags(&data.tags, &tags);
98                    if !tags.is_empty() {
99                        data.scheduler.schedule(&data.service_name, tags);
100                    }
101                    responder.send(&response).map_err(|err| {
102                        format_err!("Failed to respond {:?} to client: {}", response, err)
103                    })?;
104                }
105            }
106        }
107        Ok(())
108    }
109}
110
111fn validate_tags(service_tags: &HashSet<Tag>, tags: &[String]) -> (Vec<PersistResult>, Vec<Tag>) {
112    let mut response = vec![];
113    let mut good_tags = vec![];
114    for tag in tags.iter() {
115        if let Ok(tag) = Tag::new(tag.to_string()) {
116            if service_tags.contains(&tag) {
117                response.push(PersistResult::Queued);
118                good_tags.push(tag);
119            } else {
120                response.push(PersistResult::BadName);
121                warn!("Tag '{}' was requested but is not configured", tag);
122            }
123        } else {
124            response.push(PersistResult::BadName);
125            warn!("Tag '{}' was requested but is not a valid tag string", tag);
126        }
127    }
128    (response, good_tags)
129}