service_broker/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{Context, Result, bail, format_err};
6use fidl::HandleBased;
7use fidl::endpoints::ServerEnd;
8use fuchsia_component::directory::AsRefDirectory;
9use fuchsia_component::server::{ServiceFs, ServiceObj, ServiceObjTrait};
10use fuchsia_fs::directory::{WatchEvent, Watcher};
11use futures::prelude::*;
12use std::clone::Clone;
13use {
14    fidl_fuchsia_data as fdata, fidl_fuchsia_io as fio, fidl_fuchsia_process as fprocess,
15    fidl_fuchsia_process_lifecycle as fpl,
16};
17
18async fn wait_for_first_instance(svc: &fio::DirectoryProxy) -> Result<String> {
19    const INPUT_SERVICE: &str = "input";
20    let (service_dir, request) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
21    svc.as_ref_directory().open(INPUT_SERVICE, fio::Flags::PROTOCOL_DIRECTORY, request.into())?;
22    let watcher = Watcher::new(&service_dir).await.context("failed to create watcher")?;
23    let mut stream =
24        watcher.map(|result| result.context("failed to get watcher event")).try_filter_map(|msg| {
25            futures::future::ok(match msg.event {
26                WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
27                    if msg.filename == std::path::Path::new(".") {
28                        None
29                    } else {
30                        Some(msg.filename)
31                    }
32                }
33                _ => None,
34            })
35        });
36    let first = stream.try_next().await?.unwrap();
37    let filename = first.to_str().ok_or_else(|| format_err!("to_str for filename failed"))?;
38    Ok(format!("{INPUT_SERVICE}/{filename}"))
39}
40
41async fn connect_request(svc: fio::DirectoryProxy, request: zx::Channel, protocol_name: &str) {
42    let instance_dir = wait_for_first_instance(&svc).await;
43    let Ok(instance_dir) = instance_dir else {
44        log::error!("[service-broker] Failed to forward connection for {protocol_name}");
45        return;
46    };
47
48    if let Err(e) = svc.as_ref_directory().open(
49        format!("{instance_dir}/{protocol_name}").as_str(),
50        fio::Flags::PROTOCOL_SERVICE,
51        request,
52    ) {
53        log::error!(
54            "[service-broker] Failed to forward connection to {instance_dir}/{protocol_name}: {e}"
55        );
56    }
57}
58
59async fn first_instance_to_protocol<'a>(
60    svc: fio::DirectoryProxy,
61    fs: &mut ServiceFs<ServiceObj<'a, ()>>,
62    protocol_name: &str,
63    scope: &'a fuchsia_async::Scope,
64) -> Result<()> {
65    if protocol_name == "" {
66        bail!("Invalid protocol name provided");
67    }
68
69    let protocol_name = protocol_name.to_string();
70
71    fs.dir("svc").add_service_at("output", move |request: zx::Channel| {
72        let svc = Clone::clone(&svc);
73        let protocol_name = protocol_name.clone();
74        scope.spawn(async move {
75            connect_request(svc, request, &protocol_name).await;
76        });
77
78        Some(())
79    });
80
81    Ok(())
82}
83
84async fn first_instance_to_default<T: ServiceObjTrait>(
85    svc: fio::DirectoryProxy,
86    fs: &mut ServiceFs<T>,
87) -> Result<()> {
88    // TODO(surajmalhotra): Do this wait every time we get a connection request to handle cases
89    // where the instance goes away and comes back.
90    let instance_dir_path = wait_for_first_instance(&svc).await?;
91    let (instance_dir, request) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
92    svc.as_ref_directory().open(
93        &instance_dir_path,
94        fio::Flags::PROTOCOL_DIRECTORY,
95        request.into(),
96    )?;
97
98    fs.dir("svc").dir("output").add_remote("default", instance_dir);
99    Ok(())
100}
101
102async fn filter_and_rename<T: ServiceObjTrait>(
103    _svc: fio::DirectoryProxy,
104    _fs: &mut ServiceFs<T>,
105    _filter: &Vec<String>,
106    _rename: &Vec<String>,
107) -> Result<()> {
108    unimplemented!();
109    // Add a bunch of directories which forward requests?
110}
111
112fn get_value<'a>(dict: &'a fdata::Dictionary, key: &str) -> Option<&'a fdata::DictionaryValue> {
113    match &dict.entries {
114        Some(entries) => {
115            for entry in entries {
116                if entry.key == key {
117                    return entry.value.as_ref().map(|val| &**val);
118                }
119            }
120            None
121        }
122        _ => None,
123    }
124}
125
126fn get_program_string<'a>(program: &'a fdata::Dictionary, key: &str) -> Result<&'a str> {
127    if let Some(fdata::DictionaryValue::Str(value)) = get_value(program, key) {
128        Ok(value)
129    } else {
130        Err(format_err!("{key} not found in program or is not a string"))
131    }
132}
133
134fn get_program_strvec<'a>(
135    program: &'a fdata::Dictionary,
136    key: &str,
137) -> Result<Option<&'a Vec<String>>> {
138    match get_value(program, key) {
139        Some(args_value) => match args_value {
140            fdata::DictionaryValue::StrVec(vec) => Ok(Some(vec)),
141            _ => Err(format_err!(
142                "Expected {key} in program to be vector of strings, found something else"
143            )),
144        },
145        None => Ok(None),
146    }
147}
148
149pub async fn main(
150    ns_entries: Vec<fprocess::NameInfo>,
151    directory_request: ServerEnd<fio::DirectoryMarker>,
152    lifecycle: ServerEnd<fpl::LifecycleMarker>,
153    program: Option<fdata::Dictionary>,
154) -> Result<()> {
155    drop(lifecycle);
156    if directory_request.is_invalid_handle() {
157        bail!("No valid handle found for outgoing directory");
158    }
159    let Some(svc) = ns_entries.into_iter().find(|e| e.path == "/svc") else {
160        bail!("No /svc in namespace");
161    };
162    let Some(program) = program else {
163        bail!("No program section provided");
164    };
165    let scope = fuchsia_async::Scope::new();
166    let svc = svc.directory.into_proxy();
167    let mut fs = ServiceFs::new();
168    match get_program_string(&program, "policy")? {
169        "first_instance_to_protocol" => {
170            let protocol_name = get_program_string(&program, "protocol_name")?;
171            first_instance_to_protocol(svc, &mut fs, protocol_name, &scope).await
172        }
173        "first_instance_to_default" => first_instance_to_default(svc, &mut fs).await,
174        "filter_and_rename" => {
175            let empty = vec![];
176            let filter = get_program_strvec(&program, "filter")?.unwrap_or(&empty);
177            let rename = get_program_strvec(&program, "rename")?.unwrap_or(&empty);
178            filter_and_rename(svc, &mut fs, filter, rename).await
179        }
180        policy => Err(format_err!("Unsupported policy specified: {policy}")),
181    }?;
182
183    log::debug!("[service-broker] Initialized.");
184
185    fs.serve_connection(directory_request).context("failed to serve outgoing namespace")?;
186    fs.collect::<()>().await;
187    Ok(())
188}
189
190#[cfg(test)]
191mod tests {
192    #[fuchsia::test]
193    async fn smoke_test() {
194        assert!(true);
195    }
196}