1use crate::suspend::WakeSources;
6use anyhow::{anyhow, Error};
7use fuchsia_async as fasync;
8use fuchsia_sync::Mutex;
9use futures::FutureExt;
10use log::warn;
11use std::cell::RefCell;
12use std::mem::MaybeUninit;
13use std::rc::Rc;
14use std::sync::Arc;
15use zx::AsHandleRef;
16
17pub struct ChannelProxy {
23 pub container_channel: zx::Channel,
25
26 pub remote_channel: zx::Channel,
28
29 pub message_counter: zx::Counter,
32
33 pub name: String,
35}
36
37#[derive(Debug)]
39enum WaitReturn {
40 Container,
41 Remote,
42}
43
44const PROXY_ROLE_NAME: &str = "fuchsia.starnix.runner.proxy";
46
47pub fn run_proxy_thread(
49 new_proxies: async_channel::Receiver<(ChannelProxy, Arc<Mutex<WakeSources>>)>,
50) {
51 let _ = std::thread::Builder::new().name("proxy_thread".to_string()).spawn(move || {
52 if let Err(e) = fuchsia_scheduler::set_role_for_this_thread(PROXY_ROLE_NAME) {
53 warn!(e:%; "failed to set thread role");
54 }
55 let mut executor = fasync::LocalExecutor::new();
56 executor.run_singlethreaded(async move {
57 let mut tasks = fasync::TaskGroup::new();
58 let bounce_bytes = Rc::new(RefCell::new(
59 [MaybeUninit::uninit(); zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize],
60 ));
61 let bounce_handles = Rc::new(RefCell::new(
62 [const { MaybeUninit::uninit() }; zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize],
63 ));
64 while let Ok((proxy, events)) = new_proxies.recv().await {
65 let bytes_clone = bounce_bytes.clone();
66 let handles_clone = bounce_handles.clone();
67 tasks.local(start_proxy(proxy, events, bytes_clone, handles_clone));
68 }
69 });
70 });
71}
72
73async fn start_proxy(
79 proxy: ChannelProxy,
80 wake_sources: Arc<Mutex<WakeSources>>,
81 bounce_bytes: Rc<RefCell<[MaybeUninit<u8>; zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize]>>,
82 bounce_handles: Rc<
83 RefCell<[MaybeUninit<zx::Handle>; zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize]>,
84 >,
85) {
86 let proxy_name = proxy.name.as_str();
87 trace_instant(c"starnix_runner:start_proxy:loop:enter", proxy_name);
88
89 'outer: loop {
90 let mut container_wait = fasync::OnSignals::new(
92 proxy.container_channel.as_handle_ref(),
93 zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
94 )
95 .fuse();
96 let mut remote_wait = fasync::OnSignals::new(
97 proxy.remote_channel.as_handle_ref(),
98 zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
99 )
100 .fuse();
101
102 let (signals, finished_wait) = {
103 trace_duration(c"starnix_runner:start_proxy:wait_for_messages", proxy_name);
104 let result = futures::select! {
105 res = container_wait => {
106 trace_instant(c"starnix_runner:start_proxy:container_readable", proxy_name);
107 res.map(|s| (s, WaitReturn::Container))
108 },
109 res = remote_wait => {
110 trace_instant(c"starnix_runner:start_proxy:remote_readable", proxy_name);
111 res.map(|s| (s, WaitReturn::Remote))
112 },
113 };
114
115 match result {
116 Ok(result) => result,
117 Err(e) => {
118 trace_instant(c"starnix_runner:start_proxy:result:error", proxy_name);
119 log::warn!("Failed to wait on proxied channels in runner: {:?}", e);
120 break 'outer;
121 }
122 }
123 };
124
125 let name = proxy.name.as_str();
129 let result = match finished_wait {
130 WaitReturn::Container => forward_message(
131 &signals,
132 &proxy.container_channel,
133 &proxy.remote_channel,
134 None,
135 &mut bounce_bytes.borrow_mut(),
136 &mut bounce_handles.borrow_mut(),
137 name,
138 ),
139 WaitReturn::Remote => forward_message(
140 &signals,
141 &proxy.remote_channel,
142 &proxy.container_channel,
143 Some(&proxy.message_counter),
144 &mut bounce_bytes.borrow_mut(),
145 &mut bounce_handles.borrow_mut(),
146 name,
147 ),
148 };
149
150 if result.is_err() {
151 log::warn!(
152 "Proxy failed to forward message {} kernel: {}; {:?}",
153 match finished_wait {
154 WaitReturn::Container => "from",
155 WaitReturn::Remote => "to",
156 },
157 name,
158 result,
159 );
160 break 'outer;
161 }
162 }
163
164 trace_instant(c"starnix_runner:start_proxy:loop:exit", proxy_name);
165 if let Ok(koid) = proxy.message_counter.get_koid() {
166 wake_sources.lock().remove(&koid);
167 }
168}
169
170fn forward_message(
176 signals: &zx::Signals,
177 read_channel: &zx::Channel,
178 write_channel: &zx::Channel,
179 message_counter: Option<&zx::Counter>,
180 bytes: &mut [MaybeUninit<u8>; zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize],
181 handles: &mut [MaybeUninit<zx::Handle>; zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize],
182 name: &str,
183) -> Result<(), Error> {
184 trace_duration(c"starnix_runner:forward_message", name);
185
186 if signals.contains(zx::Signals::CHANNEL_READABLE) {
187 let (actual_bytes, actual_handles) = {
188 match read_channel.read_uninit(bytes, handles) {
189 zx::ChannelReadResult::Ok(r) => r,
190 _ => return Err(anyhow!("Failed to read from channel")),
191 }
192 };
193
194 if let Some(counter) = message_counter {
195 counter.add(1).expect("Failed to add to the proxy's message counter");
196 trace_instant(c"starnix_runner:forward_message:counter_incremented", name);
197 }
198
199 write_channel.write(actual_bytes, actual_handles)?;
200 }
201
202 if signals.contains(zx::Signals::CHANNEL_PEER_CLOSED) {
205 Err(anyhow!("Proxy peer was closed"))
206 } else {
207 Ok(())
208 }
209}
210
211fn trace_duration(event: &'static std::ffi::CStr, name: &str) {
212 fuchsia_trace::duration!(c"power", event, "name" => name);
213}
214
215fn trace_instant(event: &'static std::ffi::CStr, name: &str) {
216 fuchsia_trace::instant!(
217 c"power",
218 event,
219 fuchsia_trace::Scope::Process,
220 "name" => name
221 );
222}
223
224#[cfg(test)]
225mod test {
226 use super::{fasync, start_proxy, ChannelProxy};
227 use fidl::HandleBased;
228 use std::cell::RefCell;
229 use std::mem::MaybeUninit;
230 use std::rc::Rc;
231
232 fn run_proxy_for_test(proxy: ChannelProxy) -> fasync::Task<()> {
233 let bounce_bytes = Rc::new(RefCell::new(
234 [MaybeUninit::uninit(); zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize],
235 ));
236 let bounce_handles = Rc::new(RefCell::new(
237 [const { MaybeUninit::uninit() }; zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize],
238 ));
239 fasync::Task::local(start_proxy(proxy, Default::default(), bounce_bytes, bounce_handles))
240 }
241
242 #[fuchsia::test]
243 async fn test_peer_closed_kernel() {
244 let (local_client, local_server) = zx::Channel::create();
245 let (remote_client, remote_server) = zx::Channel::create();
246 let message_counter = zx::Counter::create().expect("failed to create counter");
247
248 let channel_proxy = ChannelProxy {
249 container_channel: local_server,
250 remote_channel: remote_client,
251 message_counter,
252 name: "test".to_string(),
253 };
254 let _task = run_proxy_for_test(channel_proxy);
255
256 std::mem::drop(local_client);
257
258 fasync::OnSignals::new(remote_server, zx::Signals::CHANNEL_PEER_CLOSED).await.unwrap();
259 }
260
261 #[fuchsia::test]
262 async fn test_peer_closed_remote() {
263 let (local_client, local_server) = zx::Channel::create();
264 let (remote_client, remote_server) = zx::Channel::create();
265 let message_counter = zx::Counter::create().expect("failed to create counter");
266
267 let channel_proxy = ChannelProxy {
268 container_channel: local_server,
269 remote_channel: remote_client,
270 message_counter,
271 name: "test".to_string(),
272 };
273 let _task = run_proxy_for_test(channel_proxy);
274
275 std::mem::drop(remote_server);
276
277 fasync::OnSignals::new(local_client, zx::Signals::CHANNEL_PEER_CLOSED).await.unwrap();
278 }
279
280 #[fuchsia::test]
281 async fn test_counter_sequential() {
282 let (_local_client, local_server) = zx::Channel::create();
283 let (remote_client, remote_server) = zx::Channel::create();
284 let message_counter = zx::Counter::create().expect("Failed to create counter");
285 let local_message_counter = message_counter
286 .duplicate_handle(zx::Rights::SAME_RIGHTS)
287 .expect("Failed to duplicate counter");
288
289 let channel_proxy = ChannelProxy {
290 container_channel: local_server,
291 remote_channel: remote_client,
292 message_counter,
293 name: "test".to_string(),
294 };
295 let _task = run_proxy_for_test(channel_proxy);
296
297 fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_NON_POSITIVE)
299 .await
300 .unwrap();
301 assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
302 fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_POSITIVE)
303 .await
304 .unwrap();
305
306 local_message_counter.add(-1).expect("Failed add");
308 fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_NON_POSITIVE)
309 .await
310 .unwrap();
311 assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
312 fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_POSITIVE)
313 .await
314 .unwrap();
315 }
316
317 #[fuchsia::test]
318 async fn test_counter_multiple() {
319 let (_local_client, local_server) = zx::Channel::create();
320 let (remote_client, remote_server) = zx::Channel::create();
321 let message_counter = zx::Counter::create().expect("Failed to create counter");
322 let local_message_counter = message_counter
323 .duplicate_handle(zx::Rights::SAME_RIGHTS)
324 .expect("Failed to duplicate counter");
325
326 let channel_proxy = ChannelProxy {
327 container_channel: local_server,
328 remote_channel: remote_client,
329 message_counter,
330 name: "test".to_string(),
331 };
332 let _task = run_proxy_for_test(channel_proxy);
333
334 assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
335 assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
336 assert!(remote_server.write(&[0x0, 0x1, 0x2], &mut []).is_ok());
337 fasync::OnSignals::new(&local_message_counter, zx::Signals::COUNTER_POSITIVE)
338 .await
339 .unwrap();
340 assert_eq!(local_message_counter.read().expect("Failed to read counter"), 3);
341 }
342}