archivist_lib/pipeline/
privacy_pipeline.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use crate::accessor::{ArchiveAccessorServer, ArchiveAccessorTranslator, ArchiveAccessorWriter};
5use crate::configs;
6use crate::diagnostics::AccessorStats;
7use crate::error::Error;
8use crate::pipeline::StaticHierarchyAllowlist;
9use fidl::endpoints::{DiscoverableProtocolMarker, ProtocolMarker, ServerEnd};
10use fuchsia_fs::directory;
11use fuchsia_sync::RwLock;
12use futures::TryStreamExt;
13use log::{debug, warn};
14use moniker::ExtendedMoniker;
15use std::borrow::Cow;
16use std::ops::Deref;
17use std::path::{Path, PathBuf};
18use std::sync::{Arc, Weak};
19use {
20    fidl_fuchsia_component_sandbox as fsandbox, fidl_fuchsia_diagnostics as fdiagnostics,
21    fidl_fuchsia_diagnostics_host as fdiagnostics_host, fidl_fuchsia_io as fio,
22    fuchsia_async as fasync, fuchsia_inspect as inspect,
23};
24
25const ALL_PIPELINE_NAME: &str = "all";
26
27struct PipelineParameters {
28    has_config: bool,
29    name: Cow<'static, str>,
30    empty_behavior: configs::EmptyBehavior,
31}
32
33#[derive(Copy, Clone)]
34pub struct PipelinesDictionaryId(u64);
35
36impl Deref for PipelinesDictionaryId {
37    type Target = u64;
38
39    fn deref(&self) -> &Self::Target {
40        &self.0
41    }
42}
43
44/// Overlay that mediates connections between servers and the central
45/// data repository. The overlay is provided static configurations that
46/// make it unique to a specific pipeline, and uses those static configurations
47/// to offer filtered access to the central repository.
48pub struct Pipeline {
49    /// The name of the pipeline.
50    name: Cow<'static, str>,
51
52    /// Contains information about the configuration of the pipeline.
53    _pipeline_node: Option<inspect::Node>,
54
55    /// Contains information about the accessor requests done for this pipeline.
56    stats: AccessorStats,
57
58    /// The statically declared allowlist for data exfiltration on this pipeline.
59    static_allowlist: RwLock<StaticHierarchyAllowlist>,
60}
61
62impl Pipeline {
63    fn new(
64        parameters: PipelineParameters,
65        pipelines_path: &Path,
66        parent_node: &inspect::Node,
67        accessor_stats_node: &inspect::Node,
68    ) -> Self {
69        let mut _pipeline_node = None;
70        let path = format!("{}/{}", pipelines_path.display(), parameters.name);
71        let mut static_selectors = None;
72        if parameters.has_config {
73            let node = parent_node.create_child(parameters.name.as_ref());
74            let mut config =
75                configs::PipelineConfig::from_directory(path, parameters.empty_behavior);
76            config.record_to_inspect(&node);
77            _pipeline_node = Some(node);
78            if !config.disable_filtering {
79                static_selectors = config.take_inspect_selectors();
80            }
81        }
82        let stats = AccessorStats::new(accessor_stats_node.create_child(parameters.name.as_ref()));
83        Pipeline {
84            _pipeline_node,
85            stats,
86            name: parameters.name,
87            static_allowlist: RwLock::new(StaticHierarchyAllowlist::new(static_selectors)),
88        }
89    }
90
91    fn protocol_name(&self) -> Cow<'_, str> {
92        self.protocol_name_inner::<fdiagnostics::ArchiveAccessorMarker>()
93    }
94
95    fn host_protocol_name(&self) -> Cow<'_, str> {
96        self.protocol_name_inner::<fdiagnostics_host::ArchiveAccessorMarker>()
97    }
98
99    fn protocol_name_inner<P: DiscoverableProtocolMarker>(&self) -> Cow<'_, str> {
100        if self.name.as_ref() == ALL_PIPELINE_NAME {
101            Cow::Borrowed(P::PROTOCOL_NAME)
102        } else {
103            Cow::Owned(format!("{}.{}", P::PROTOCOL_NAME, self.name))
104        }
105    }
106
107    pub fn accessor_stats(&self) -> &AccessorStats {
108        &self.stats
109    }
110
111    pub fn remove_component(&self, moniker: &ExtendedMoniker) {
112        self.static_allowlist.write().remove_component(moniker);
113    }
114
115    pub fn add_component(&self, moniker: &ExtendedMoniker) -> Result<(), Error> {
116        self.static_allowlist.write().add_component(moniker.clone())
117    }
118
119    pub fn static_hierarchy_allowlist(&self) -> StaticHierarchyAllowlist {
120        // TODO(https://fxbug.dev/42159044): can we avoid cloning here? This clone is not super expensive
121        // as it'll be just cloning arcs, but we could be more efficient here.
122        // Due to lock semantics we can't just return a reference at the moment as it leads to
123        // an ABBA lock between inspect insertion into the repo and inspect reading.
124        self.static_allowlist.read().clone()
125    }
126}
127
128#[cfg(test)]
129impl Pipeline {
130    pub fn for_test(static_selectors: Option<Vec<fdiagnostics::Selector>>) -> Self {
131        Pipeline {
132            _pipeline_node: None,
133            name: Cow::Borrowed("test"),
134            stats: AccessorStats::new(Default::default()),
135            static_allowlist: RwLock::new(StaticHierarchyAllowlist::new(static_selectors)),
136        }
137    }
138}
139
140pub struct PipelineManager {
141    pipelines: Vec<Arc<Pipeline>>,
142    _pipelines_node: inspect::Node,
143    _accessor_stats_node: inspect::Node,
144    scope: Option<fasync::Scope>,
145}
146
147impl PipelineManager {
148    pub async fn new(
149        pipelines_path: PathBuf,
150        pipelines_node: inspect::Node,
151        accessor_stats_node: inspect::Node,
152        scope: fasync::Scope,
153    ) -> Self {
154        let mut pipelines = vec![];
155        if let Ok(dir) =
156            directory::open_in_namespace(pipelines_path.to_str().unwrap(), fio::PERM_READABLE)
157        {
158            for entry in directory::readdir(&dir).await.expect("read dir") {
159                if !matches!(entry.kind, directory::DirentKind::Directory) {
160                    continue;
161                }
162                let empty_behavior = if entry.name == "feedback" {
163                    configs::EmptyBehavior::DoNotFilter
164                } else {
165                    configs::EmptyBehavior::Disable
166                };
167                let parameters = PipelineParameters {
168                    has_config: true,
169                    name: Cow::Owned(entry.name),
170                    empty_behavior,
171                };
172                pipelines.push(Arc::new(Pipeline::new(
173                    parameters,
174                    &pipelines_path,
175                    &pipelines_node,
176                    &accessor_stats_node,
177                )));
178            }
179        }
180        pipelines.push(Arc::new(Pipeline::new(
181            PipelineParameters {
182                has_config: false,
183                name: Cow::Borrowed(ALL_PIPELINE_NAME),
184                empty_behavior: configs::EmptyBehavior::Disable,
185            },
186            &pipelines_path,
187            &pipelines_node,
188            &accessor_stats_node,
189        )));
190        Self {
191            pipelines,
192            _pipelines_node: pipelines_node,
193            _accessor_stats_node: accessor_stats_node,
194            scope: Some(scope),
195        }
196    }
197
198    pub fn weak_pipelines(&self) -> Vec<Weak<Pipeline>> {
199        self.pipelines.iter().map(Arc::downgrade).collect::<Vec<_>>()
200    }
201
202    pub async fn cancel(&mut self) {
203        if let Some(scope) = self.scope.take() {
204            scope.cancel().await;
205        }
206    }
207
208    pub async fn serve_pipelines(
209        &self,
210        accessor_server: Arc<ArchiveAccessorServer>,
211        id_gen: &sandbox::CapabilityIdGenerator,
212        capability_store: &mut fsandbox::CapabilityStoreProxy,
213    ) -> PipelinesDictionaryId {
214        let accessors_dict_id = id_gen.next();
215        capability_store.dictionary_create(accessors_dict_id).await.unwrap().unwrap();
216        debug!("Will serve {} pipelines", self.pipelines.len());
217        for pipeline in &self.pipelines {
218            debug!("Installing spawning receivers for {}", pipeline.name);
219            let accessor_pipeline = Arc::clone(pipeline);
220            // Unwrap: safe, we must not cancel before serving.
221            self.scope.as_ref().unwrap().spawn(handle_receiver_requests::<
222                fdiagnostics::ArchiveAccessorMarker,
223            >(
224                get_receiver_stream(
225                    pipeline.protocol_name(),
226                    accessors_dict_id,
227                    id_gen,
228                    capability_store,
229                )
230                .await,
231                Arc::clone(&accessor_server),
232                Arc::clone(&accessor_pipeline),
233            ));
234            // Unwrap: safe, we must not cancel before serving.
235            self.scope.as_ref().unwrap().spawn(handle_receiver_requests::<
236                fdiagnostics_host::ArchiveAccessorMarker,
237            >(
238                get_receiver_stream(
239                    pipeline.host_protocol_name(),
240                    accessors_dict_id,
241                    id_gen,
242                    capability_store,
243                )
244                .await,
245                Arc::clone(&accessor_server),
246                Arc::clone(&accessor_pipeline),
247            ));
248        }
249
250        PipelinesDictionaryId(accessors_dict_id)
251    }
252}
253
254async fn get_receiver_stream(
255    protocol_name: Cow<'_, str>,
256    accessors_dict_id: u64,
257    id_gen: &sandbox::CapabilityIdGenerator,
258    capability_store: &mut fsandbox::CapabilityStoreProxy,
259) -> fsandbox::ReceiverRequestStream {
260    let (accessor_receiver_client, receiver_stream) =
261        fidl::endpoints::create_request_stream::<fsandbox::ReceiverMarker>();
262    let connector_id = id_gen.next();
263    capability_store
264        .connector_create(connector_id, accessor_receiver_client)
265        .await
266        .unwrap()
267        .unwrap();
268    debug!("Added {protocol_name} to the accessors dictionary.");
269    capability_store
270        .dictionary_insert(
271            accessors_dict_id,
272            &fsandbox::DictionaryItem { key: protocol_name.into_owned(), value: connector_id },
273        )
274        .await
275        .unwrap()
276        .unwrap();
277    receiver_stream
278}
279
280async fn handle_receiver_requests<P>(
281    mut receiver_stream: fsandbox::ReceiverRequestStream,
282    accessor_server: Arc<ArchiveAccessorServer>,
283    pipeline: Arc<Pipeline>,
284) where
285    P: ProtocolMarker,
286    P::RequestStream: ArchiveAccessorTranslator + Send + 'static,
287    <P::RequestStream as ArchiveAccessorTranslator>::InnerDataRequestChannel:
288        ArchiveAccessorWriter + Send,
289{
290    while let Some(request) = receiver_stream.try_next().await.unwrap() {
291        match request {
292            fsandbox::ReceiverRequest::Receive { channel, control_handle: _ } => {
293                debug!("Handling receive request for: {} -> {}", pipeline.name, P::DEBUG_NAME);
294                let server_end = ServerEnd::<P>::new(channel);
295                accessor_server.spawn_server::<P::RequestStream>(
296                    Arc::clone(&pipeline),
297                    server_end.into_stream(),
298                );
299            }
300            fsandbox::ReceiverRequest::_UnknownMethod { method_type, ordinal, .. } => {
301                warn!(method_type:?, ordinal; "Got unknown interaction on Receiver");
302            }
303        }
304    }
305}