1use crate::accessor::{ArchiveAccessorServer, ArchiveAccessorTranslator, ArchiveAccessorWriter};
5use crate::configs;
6use crate::diagnostics::AccessorStats;
7use crate::error::Error;
8use crate::pipeline::StaticHierarchyAllowlist;
9use fidl::endpoints::{DiscoverableProtocolMarker, ProtocolMarker, ServerEnd};
10use fuchsia_fs::directory;
11use fuchsia_sync::RwLock;
12use futures::TryStreamExt;
13use log::{debug, warn};
14use moniker::ExtendedMoniker;
15use std::borrow::Cow;
16use std::ops::Deref;
17use std::path::{Path, PathBuf};
18use std::sync::{Arc, Weak};
19use {
20 fidl_fuchsia_component_sandbox as fsandbox, fidl_fuchsia_diagnostics as fdiagnostics,
21 fidl_fuchsia_diagnostics_host as fdiagnostics_host, fidl_fuchsia_io as fio,
22 fuchsia_async as fasync, fuchsia_inspect as inspect,
23};
24
25const ALL_PIPELINE_NAME: &str = "all";
26
27struct PipelineParameters {
28 has_config: bool,
29 name: Cow<'static, str>,
30 empty_behavior: configs::EmptyBehavior,
31}
32
33#[derive(Copy, Clone)]
34pub struct PipelinesDictionaryId(u64);
35
36impl Deref for PipelinesDictionaryId {
37 type Target = u64;
38
39 fn deref(&self) -> &Self::Target {
40 &self.0
41 }
42}
43
44pub struct Pipeline {
49 name: Cow<'static, str>,
51
52 _pipeline_node: Option<inspect::Node>,
54
55 stats: AccessorStats,
57
58 static_allowlist: RwLock<StaticHierarchyAllowlist>,
60}
61
62impl Pipeline {
63 fn new(
64 parameters: PipelineParameters,
65 pipelines_path: &Path,
66 parent_node: &inspect::Node,
67 accessor_stats_node: &inspect::Node,
68 ) -> Self {
69 let mut _pipeline_node = None;
70 let path = format!("{}/{}", pipelines_path.display(), parameters.name);
71 let mut static_selectors = None;
72 if parameters.has_config {
73 let node = parent_node.create_child(parameters.name.as_ref());
74 let mut config =
75 configs::PipelineConfig::from_directory(path, parameters.empty_behavior);
76 config.record_to_inspect(&node);
77 _pipeline_node = Some(node);
78 if !config.disable_filtering {
79 static_selectors = config.take_inspect_selectors();
80 }
81 }
82 let stats = AccessorStats::new(accessor_stats_node.create_child(parameters.name.as_ref()));
83 Pipeline {
84 _pipeline_node,
85 stats,
86 name: parameters.name,
87 static_allowlist: RwLock::new(StaticHierarchyAllowlist::new(static_selectors)),
88 }
89 }
90
91 fn protocol_name(&self) -> Cow<'_, str> {
92 self.protocol_name_inner::<fdiagnostics::ArchiveAccessorMarker>()
93 }
94
95 fn host_protocol_name(&self) -> Cow<'_, str> {
96 self.protocol_name_inner::<fdiagnostics_host::ArchiveAccessorMarker>()
97 }
98
99 fn protocol_name_inner<P: DiscoverableProtocolMarker>(&self) -> Cow<'_, str> {
100 if self.name.as_ref() == ALL_PIPELINE_NAME {
101 Cow::Borrowed(P::PROTOCOL_NAME)
102 } else {
103 Cow::Owned(format!("{}.{}", P::PROTOCOL_NAME, self.name))
104 }
105 }
106
107 pub fn accessor_stats(&self) -> &AccessorStats {
108 &self.stats
109 }
110
111 pub fn remove_component(&self, moniker: &ExtendedMoniker) {
112 self.static_allowlist.write().remove_component(moniker);
113 }
114
115 pub fn add_component(&self, moniker: &ExtendedMoniker) -> Result<(), Error> {
116 self.static_allowlist.write().add_component(moniker.clone())
117 }
118
119 pub fn static_hierarchy_allowlist(&self) -> StaticHierarchyAllowlist {
120 self.static_allowlist.read().clone()
125 }
126}
127
128#[cfg(test)]
129impl Pipeline {
130 pub fn for_test(static_selectors: Option<Vec<fdiagnostics::Selector>>) -> Self {
131 Pipeline {
132 _pipeline_node: None,
133 name: Cow::Borrowed("test"),
134 stats: AccessorStats::new(Default::default()),
135 static_allowlist: RwLock::new(StaticHierarchyAllowlist::new(static_selectors)),
136 }
137 }
138}
139
140pub struct PipelineManager {
141 pipelines: Vec<Arc<Pipeline>>,
142 _pipelines_node: inspect::Node,
143 _accessor_stats_node: inspect::Node,
144 scope: Option<fasync::Scope>,
145}
146
147impl PipelineManager {
148 pub async fn new(
149 pipelines_path: PathBuf,
150 pipelines_node: inspect::Node,
151 accessor_stats_node: inspect::Node,
152 scope: fasync::Scope,
153 ) -> Self {
154 let mut pipelines = vec![];
155 if let Ok(dir) =
156 directory::open_in_namespace(pipelines_path.to_str().unwrap(), fio::PERM_READABLE)
157 {
158 for entry in directory::readdir(&dir).await.expect("read dir") {
159 if !matches!(entry.kind, directory::DirentKind::Directory) {
160 continue;
161 }
162 let empty_behavior = if entry.name == "feedback" {
163 configs::EmptyBehavior::DoNotFilter
164 } else {
165 configs::EmptyBehavior::Disable
166 };
167 let parameters = PipelineParameters {
168 has_config: true,
169 name: Cow::Owned(entry.name),
170 empty_behavior,
171 };
172 pipelines.push(Arc::new(Pipeline::new(
173 parameters,
174 &pipelines_path,
175 &pipelines_node,
176 &accessor_stats_node,
177 )));
178 }
179 }
180 pipelines.push(Arc::new(Pipeline::new(
181 PipelineParameters {
182 has_config: false,
183 name: Cow::Borrowed(ALL_PIPELINE_NAME),
184 empty_behavior: configs::EmptyBehavior::Disable,
185 },
186 &pipelines_path,
187 &pipelines_node,
188 &accessor_stats_node,
189 )));
190 Self {
191 pipelines,
192 _pipelines_node: pipelines_node,
193 _accessor_stats_node: accessor_stats_node,
194 scope: Some(scope),
195 }
196 }
197
198 pub fn weak_pipelines(&self) -> Vec<Weak<Pipeline>> {
199 self.pipelines.iter().map(Arc::downgrade).collect::<Vec<_>>()
200 }
201
202 pub async fn cancel(&mut self) {
203 if let Some(scope) = self.scope.take() {
204 scope.cancel().await;
205 }
206 }
207
208 pub async fn serve_pipelines(
209 &self,
210 accessor_server: Arc<ArchiveAccessorServer>,
211 id_gen: &sandbox::CapabilityIdGenerator,
212 capability_store: &mut fsandbox::CapabilityStoreProxy,
213 ) -> PipelinesDictionaryId {
214 let accessors_dict_id = id_gen.next();
215 capability_store.dictionary_create(accessors_dict_id).await.unwrap().unwrap();
216 debug!("Will serve {} pipelines", self.pipelines.len());
217 for pipeline in &self.pipelines {
218 debug!("Installing spawning receivers for {}", pipeline.name);
219 let accessor_pipeline = Arc::clone(pipeline);
220 self.scope.as_ref().unwrap().spawn(handle_receiver_requests::<
222 fdiagnostics::ArchiveAccessorMarker,
223 >(
224 get_receiver_stream(
225 pipeline.protocol_name(),
226 accessors_dict_id,
227 id_gen,
228 capability_store,
229 )
230 .await,
231 Arc::clone(&accessor_server),
232 Arc::clone(&accessor_pipeline),
233 ));
234 self.scope.as_ref().unwrap().spawn(handle_receiver_requests::<
236 fdiagnostics_host::ArchiveAccessorMarker,
237 >(
238 get_receiver_stream(
239 pipeline.host_protocol_name(),
240 accessors_dict_id,
241 id_gen,
242 capability_store,
243 )
244 .await,
245 Arc::clone(&accessor_server),
246 Arc::clone(&accessor_pipeline),
247 ));
248 }
249
250 PipelinesDictionaryId(accessors_dict_id)
251 }
252}
253
254async fn get_receiver_stream(
255 protocol_name: Cow<'_, str>,
256 accessors_dict_id: u64,
257 id_gen: &sandbox::CapabilityIdGenerator,
258 capability_store: &mut fsandbox::CapabilityStoreProxy,
259) -> fsandbox::ReceiverRequestStream {
260 let (accessor_receiver_client, receiver_stream) =
261 fidl::endpoints::create_request_stream::<fsandbox::ReceiverMarker>();
262 let connector_id = id_gen.next();
263 capability_store
264 .connector_create(connector_id, accessor_receiver_client)
265 .await
266 .unwrap()
267 .unwrap();
268 debug!("Added {protocol_name} to the accessors dictionary.");
269 capability_store
270 .dictionary_insert(
271 accessors_dict_id,
272 &fsandbox::DictionaryItem { key: protocol_name.into_owned(), value: connector_id },
273 )
274 .await
275 .unwrap()
276 .unwrap();
277 receiver_stream
278}
279
280async fn handle_receiver_requests<P>(
281 mut receiver_stream: fsandbox::ReceiverRequestStream,
282 accessor_server: Arc<ArchiveAccessorServer>,
283 pipeline: Arc<Pipeline>,
284) where
285 P: ProtocolMarker,
286 P::RequestStream: ArchiveAccessorTranslator + Send + 'static,
287 <P::RequestStream as ArchiveAccessorTranslator>::InnerDataRequestChannel:
288 ArchiveAccessorWriter + Send,
289{
290 while let Some(request) = receiver_stream.try_next().await.unwrap() {
291 match request {
292 fsandbox::ReceiverRequest::Receive { channel, control_handle: _ } => {
293 debug!("Handling receive request for: {} -> {}", pipeline.name, P::DEBUG_NAME);
294 let server_end = ServerEnd::<P>::new(channel);
295 accessor_server.spawn_server::<P::RequestStream>(
296 Arc::clone(&pipeline),
297 server_end.into_stream(),
298 );
299 }
300 fsandbox::ReceiverRequest::_UnknownMethod { method_type, ordinal, .. } => {
301 warn!(method_type:?, ordinal; "Got unknown interaction on Receiver");
302 }
303 }
304 }
305}