overnet_core/proxy/handle/
mod.rs1mod channel;
6mod event_pair;
7mod signals;
8mod socket;
9
10use super::stream::{Frame, StreamReaderBinder, StreamWriter};
11use crate::peer::{FramedStreamReader, PeerConnRef};
12use crate::router::Router;
13use anyhow::{bail, format_err, Error};
14use fidl::Signals;
15use fidl_fuchsia_overnet_protocol::SignalUpdate;
16use futures::future::poll_fn;
17use futures::prelude::*;
18use futures::task::noop_waker_ref;
19use std::sync::{Arc, Weak};
20use std::task::{Context, Poll};
21use zx_status;
22
23#[derive(Clone)]
29pub(crate) enum RouterHolder<'a> {
30 Unused(&'a Weak<Router>),
31 Used(Arc<Router>),
32}
33
34impl<'a> std::fmt::Debug for RouterHolder<'a> {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 match self {
37 RouterHolder::Unused(_) => f.write_str("Unused"),
38 RouterHolder::Used(r) => write!(f, "Used({:?})", r.node_id()),
39 }
40 }
41}
42
43impl<'a> RouterHolder<'a> {
44 pub(crate) fn get(&mut self) -> Result<&Arc<Router>, Error> {
45 match self {
46 RouterHolder::Used(ref r) => Ok(r),
47 RouterHolder::Unused(r) => {
48 *self = RouterHolder::Used(
49 Weak::upgrade(r).ok_or_else(|| format_err!("Router is closed"))?,
50 );
51 self.get()
52 }
53 }
54 }
55}
56
57pub(crate) trait IO<'a>: Send {
59 type Proxyable: Proxyable;
60 type Output;
61 fn new() -> Self;
62 fn poll_io(
63 &mut self,
64 msg: &mut <Self::Proxyable as Proxyable>::Message,
65 proxyable: &'a Self::Proxyable,
66 fut_ctx: &mut Context<'_>,
67 ) -> Poll<Result<Self::Output, zx_status::Status>>;
68}
69
70pub(crate) trait Serializer: Send {
75 type Message;
76 fn new() -> Self;
77 fn poll_ser(
78 &mut self,
79 msg: &mut Self::Message,
80 bytes: &mut Vec<u8>,
81 conn: PeerConnRef<'_>,
82 router: &mut RouterHolder<'_>,
83 fut_ctx: &mut Context<'_>,
84 ) -> Poll<Result<(), Error>>;
85}
86
87pub(crate) trait Message: Send + Sized + Default + PartialEq + std::fmt::Debug {
90 type Parser: Serializer<Message = Self> + std::fmt::Debug;
92 type Serializer: Serializer<Message = Self>;
94}
95
96pub(crate) enum ReadValue {
98 Message,
99 SignalUpdate(SignalUpdate),
100}
101
102pub(crate) trait Proxyable: Send + Sync + Sized + std::fmt::Debug {
104 type Message: Message;
108
109 fn from_fidl_handle(hdl: fidl::Handle) -> Result<Self, Error>;
111 fn into_fidl_handle(self) -> Result<fidl::Handle, Error>;
113 fn signal_peer(&self, clear: Signals, set: Signals) -> Result<(), Error>;
115 fn close_with_reason(self, _msg: String) {}
117}
118
119pub(crate) trait ProxyableRW<'a>: Proxyable {
120 type Reader: 'a + IO<'a, Proxyable = Self, Output = ReadValue>;
122 type Writer: 'a + IO<'a, Proxyable = Self, Output = ()>;
124}
125
126pub(crate) trait IntoProxied {
127 type Proxied: Proxyable;
128 fn into_proxied(self) -> Result<Self::Proxied, Error>;
129}
130
131pub(crate) struct ProxyableHandle<Hdl: Proxyable> {
133 hdl: Hdl,
134 router: Weak<Router>,
135}
136
137impl<Hdl: Proxyable> std::fmt::Debug for ProxyableHandle<Hdl> {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 write!(f, "{:?}#{:?}", self.hdl, Weak::upgrade(&self.router).map(|r| r.node_id()))
140 }
141}
142
143impl<Hdl: Proxyable> ProxyableHandle<Hdl> {
144 pub(crate) fn new(hdl: Hdl, router: Weak<Router>) -> Self {
145 Self { hdl, router }
146 }
147
148 pub(crate) fn into_fidl_handle(self) -> Result<fidl::Handle, Error> {
149 self.hdl.into_fidl_handle()
150 }
151
152 pub(crate) fn close_with_reason(self, msg: String) {
153 self.hdl.close_with_reason(msg);
154 }
155
156 pub(crate) fn write<'a>(
158 &'a self,
159 msg: &'a mut Hdl::Message,
160 ) -> impl 'a + Future<Output = Result<(), zx_status::Status>> + Unpin
161 where
162 Hdl: ProxyableRW<'a>,
163 {
164 self.handle_io(msg, Hdl::Writer::new())
165 }
166
167 pub(crate) fn read<'a>(
172 &'a self,
173 msg: &'a mut Hdl::Message,
174 ) -> impl 'a + Future<Output = Result<ReadValue, zx_status::Status>> + Unpin
175 where
176 Hdl: ProxyableRW<'a>,
177 {
178 self.handle_io(msg, Hdl::Reader::new())
179 }
180
181 pub(crate) fn router(&self) -> &Weak<Router> {
182 &self.router
183 }
184
185 pub(crate) fn apply_signal_update(&self, signal_update: SignalUpdate) -> Result<(), Error> {
188 if let Some(assert_signals) = signal_update.assert_signals {
189 self.hdl
190 .signal_peer(Signals::empty(), self::signals::from_wire_signals(assert_signals))?
191 }
192 Ok(())
193 }
194
195 fn handle_io<'a, I: 'a + IO<'a, Proxyable = Hdl>>(
196 &'a self,
197 msg: &'a mut Hdl::Message,
198 mut io: I,
199 ) -> impl 'a + Future<Output = Result<<I as IO<'a>>::Output, zx_status::Status>> + Unpin {
200 poll_fn(move |fut_ctx| io.poll_io(msg, &self.hdl, fut_ctx))
201 }
202
203 pub(crate) async fn drain_to_stream(
207 &self,
208 stream_writer: &mut StreamWriter<Hdl::Message>,
209 ) -> Result<(), Error>
210 where
211 Hdl: for<'a> ProxyableRW<'a>,
212 {
213 let mut message = Default::default();
214 loop {
215 let pr = self.read(&mut message).poll_unpin(&mut Context::from_waker(noop_waker_ref()));
216 match pr {
217 Poll::Pending => return Ok(()),
218 Poll::Ready(Err(e)) => return Err(e.into()),
219 Poll::Ready(Ok(ReadValue::Message)) => {
220 stream_writer.send_data(&mut message).await?
221 }
222 Poll::Ready(Ok(ReadValue::SignalUpdate(signal_update))) => {
223 stream_writer.send_signal(signal_update).await?
224 }
225 }
226 }
227 }
228
229 pub(crate) async fn drain_stream_to_handle(
231 self,
232 drain_stream: FramedStreamReader,
233 ) -> Result<fidl::Handle, Error>
234 where
235 Hdl: for<'a> ProxyableRW<'a>,
236 {
237 let mut drain_stream = drain_stream.bind(&self);
238 loop {
239 match drain_stream.next().await? {
240 Frame::Data(message) => self.write(message).await?,
241 Frame::SignalUpdate(signal_update) => self.apply_signal_update(signal_update)?,
242 Frame::EndTransfer => return Ok(self.hdl.into_fidl_handle()?),
243 Frame::Hello => bail!("Hello frame disallowed on drain streams"),
244 Frame::BeginTransfer(_, _) => bail!("BeginTransfer on drain stream"),
245 Frame::AckTransfer => bail!("AckTransfer on drain stream"),
246 Frame::Shutdown(r) => bail!("Stream shutdown during drain: {:?}", r),
247 }
248 }
249 }
250}