service_broker/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{bail, format_err, Context, Result};
6use fidl::endpoints::{Proxy, ServerEnd};
7use fidl::HandleBased;
8use fuchsia_component::directory::AsRefDirectory;
9use fuchsia_component::server::{ServiceFs, ServiceObj, ServiceObjTrait};
10use fuchsia_fs::directory::{WatchEvent, Watcher};
11use futures::prelude::*;
12use {
13    fidl_fuchsia_data as fdata, fidl_fuchsia_io as fio, fidl_fuchsia_process as fprocess,
14    fidl_fuchsia_process_lifecycle as fpl,
15};
16
17async fn wait_for_first_instance(svc: &fio::DirectoryProxy) -> Result<String> {
18    const INPUT_SERVICE: &str = "input";
19    let (service_dir, request) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
20    svc.as_ref_directory().open(INPUT_SERVICE, fio::Flags::PROTOCOL_DIRECTORY, request.into())?;
21    let watcher = Watcher::new(&service_dir).await.context("failed to create watcher")?;
22    let mut stream =
23        watcher.map(|result| result.context("failed to get watcher event")).try_filter_map(|msg| {
24            futures::future::ok(match msg.event {
25                WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
26                    if msg.filename == std::path::Path::new(".") {
27                        None
28                    } else {
29                        Some(msg.filename)
30                    }
31                }
32                _ => None,
33            })
34        });
35    let first = stream.try_next().await?.unwrap();
36    let filename = first.to_str().ok_or_else(|| format_err!("to_str for filename failed"))?;
37    Ok(format!("{INPUT_SERVICE}/{filename}"))
38}
39
40async fn first_instance_to_protocol<'a>(
41    svc: fio::DirectoryProxy,
42    fs: &mut ServiceFs<ServiceObj<'a, ()>>,
43    protocol_name: &str,
44) -> Result<()> {
45    if protocol_name == "" {
46        bail!("Invalid protocol name provided");
47    }
48
49    // TODO(surajmalhotra): Do this wait every time we get a connection request to handle cases
50    // where the instance goes away and comes back.
51    let instance_dir = wait_for_first_instance(&svc).await?;
52
53    let svc = svc.into_channel().unwrap().into_zx_channel();
54    let protocol_name = protocol_name.to_string();
55    fs.dir("svc").add_service_at("output", move |request: zx::Channel| {
56        if let Err(_) =
57            fdio::service_connect_at(&svc, &format!("{instance_dir}/{protocol_name}"), request)
58        {
59            log::error!(
60                "[service-broker] Failed to forward connection to {instance_dir}/{protocol_name}"
61            );
62        }
63        Some(())
64    });
65
66    Ok(())
67}
68
69async fn first_instance_to_default<T: ServiceObjTrait>(
70    svc: fio::DirectoryProxy,
71    fs: &mut ServiceFs<T>,
72) -> Result<()> {
73    // TODO(surajmalhotra): Do this wait every time we get a connection request to handle cases
74    // where the instance goes away and comes back.
75    let instance_dir_path = wait_for_first_instance(&svc).await?;
76    let (instance_dir, request) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
77    svc.as_ref_directory().open(
78        &instance_dir_path,
79        fio::Flags::PROTOCOL_DIRECTORY,
80        request.into(),
81    )?;
82
83    fs.dir("svc").dir("output").add_remote("default", instance_dir);
84    Ok(())
85}
86
87async fn filter_and_rename<T: ServiceObjTrait>(
88    _svc: fio::DirectoryProxy,
89    _fs: &mut ServiceFs<T>,
90    _filter: &Vec<String>,
91    _rename: &Vec<String>,
92) -> Result<()> {
93    unimplemented!();
94    // Add a bunch of directories which forward requests?
95}
96
97fn get_value<'a>(dict: &'a fdata::Dictionary, key: &str) -> Option<&'a fdata::DictionaryValue> {
98    match &dict.entries {
99        Some(entries) => {
100            for entry in entries {
101                if entry.key == key {
102                    return entry.value.as_ref().map(|val| &**val);
103                }
104            }
105            None
106        }
107        _ => None,
108    }
109}
110
111fn get_program_string<'a>(program: &'a fdata::Dictionary, key: &str) -> Result<&'a str> {
112    if let Some(fdata::DictionaryValue::Str(value)) = get_value(program, key) {
113        Ok(value)
114    } else {
115        Err(format_err!("{key} not found in program or is not a string"))
116    }
117}
118
119fn get_program_strvec<'a>(
120    program: &'a fdata::Dictionary,
121    key: &str,
122) -> Result<Option<&'a Vec<String>>> {
123    match get_value(program, key) {
124        Some(args_value) => match args_value {
125            fdata::DictionaryValue::StrVec(vec) => Ok(Some(vec)),
126            _ => Err(format_err!(
127                "Expected {key} in program to be vector of strings, found something else"
128            )),
129        },
130        None => Ok(None),
131    }
132}
133
134pub async fn main(
135    ns_entries: Vec<fprocess::NameInfo>,
136    directory_request: ServerEnd<fio::DirectoryMarker>,
137    lifecycle: ServerEnd<fpl::LifecycleMarker>,
138    program: Option<fdata::Dictionary>,
139) -> Result<()> {
140    drop(lifecycle);
141    if directory_request.is_invalid_handle() {
142        bail!("No valid handle found for outgoing directory");
143    }
144    let Some(svc) = ns_entries.into_iter().find(|e| e.path == "/svc") else {
145        bail!("No /svc in namespace");
146    };
147    let Some(program) = program else {
148        bail!("No program section provided");
149    };
150    let svc = svc.directory.into_proxy();
151    let mut fs = ServiceFs::new();
152    match get_program_string(&program, "policy")? {
153        "first_instance_to_protocol" => {
154            let protocol_name = get_program_string(&program, "protocol_name")?;
155            first_instance_to_protocol(svc, &mut fs, protocol_name).await
156        }
157        "first_instance_to_default" => first_instance_to_default(svc, &mut fs).await,
158        "filter_and_rename" => {
159            let empty = vec![];
160            let filter = get_program_strvec(&program, "filter")?.unwrap_or(&empty);
161            let rename = get_program_strvec(&program, "rename")?.unwrap_or(&empty);
162            filter_and_rename(svc, &mut fs, filter, rename).await
163        }
164        policy => Err(format_err!("Unsupported policy specified: {policy}")),
165    }?;
166
167    log::debug!("[service-broker] Initialized.");
168
169    fs.serve_connection(directory_request).context("failed to serve outgoing namespace")?;
170    fs.collect::<()>().await;
171    Ok(())
172}
173
174#[cfg(test)]
175mod tests {
176    #[fuchsia::test]
177    async fn smoke_test() {
178        assert!(true);
179    }
180}