overnet_core/proxy/handle/
mod.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod channel;
6mod event_pair;
7mod signals;
8mod socket;
9
10use super::stream::{Frame, StreamReaderBinder, StreamWriter};
11use crate::peer::{FramedStreamReader, PeerConnRef};
12use crate::router::Router;
13use anyhow::{bail, format_err, Error};
14use fidl::Signals;
15use fidl_fuchsia_overnet_protocol::SignalUpdate;
16use futures::future::poll_fn;
17use futures::prelude::*;
18use futures::task::noop_waker_ref;
19use std::sync::{Arc, Weak};
20use std::task::{Context, Poll};
21use zx_status;
22
23/// Holds a reference to a router.
24/// We start out `Unused` with a weak reference to the router, but various methods
25/// need said router, and so we can transition to `Used` with a reference when the router
26/// is needed.
27/// Saves some repeated upgrading of weak to arc.
28#[derive(Clone)]
29pub(crate) enum RouterHolder<'a> {
30    Unused(&'a Weak<Router>),
31    Used(Arc<Router>),
32}
33
34impl<'a> std::fmt::Debug for RouterHolder<'a> {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        match self {
37            RouterHolder::Unused(_) => f.write_str("Unused"),
38            RouterHolder::Used(r) => write!(f, "Used({:?})", r.node_id()),
39        }
40    }
41}
42
43impl<'a> RouterHolder<'a> {
44    pub(crate) fn get(&mut self) -> Result<&Arc<Router>, Error> {
45        match self {
46            RouterHolder::Used(ref r) => Ok(r),
47            RouterHolder::Unused(r) => {
48                *self = RouterHolder::Used(
49                    Weak::upgrade(r).ok_or_else(|| format_err!("Router is closed"))?,
50                );
51                self.get()
52            }
53        }
54    }
55}
56
57/// Perform some IO operation on a handle.
58pub(crate) trait IO<'a>: Send {
59    type Proxyable: Proxyable;
60    type Output;
61    fn new() -> Self;
62    fn poll_io(
63        &mut self,
64        msg: &mut <Self::Proxyable as Proxyable>::Message,
65        proxyable: &'a Self::Proxyable,
66        fut_ctx: &mut Context<'_>,
67    ) -> Poll<Result<Self::Output, zx_status::Status>>;
68}
69
70/// Serializer defines how to read or write a message to a QUIC stream.
71/// They are usually defined in pairs (one reader, one writer).
72/// In some cases those implementations end up being the same and we leverage that to improve
73/// footprint.
74pub(crate) trait Serializer: Send {
75    type Message;
76    fn new() -> Self;
77    fn poll_ser(
78        &mut self,
79        msg: &mut Self::Message,
80        bytes: &mut Vec<u8>,
81        conn: PeerConnRef<'_>,
82        router: &mut RouterHolder<'_>,
83        fut_ctx: &mut Context<'_>,
84    ) -> Poll<Result<(), Error>>;
85}
86
87/// A proxyable message - defines how to parse/serialize itself, and gets pulled
88/// in by Proxyable to also define how to send/receive itself on the right kind of handle.
89pub(crate) trait Message: Send + Sized + Default + PartialEq + std::fmt::Debug {
90    /// How to parse this message type from bytes.
91    type Parser: Serializer<Message = Self> + std::fmt::Debug;
92    /// How to turn this message into wire bytes.
93    type Serializer: Serializer<Message = Self>;
94}
95
96/// The result of an IO read - either a message was received, or a signal.
97pub(crate) enum ReadValue {
98    Message,
99    SignalUpdate(SignalUpdate),
100}
101
102/// An object that can be proxied.
103pub(crate) trait Proxyable: Send + Sync + Sized + std::fmt::Debug {
104    /// The type of message exchanged by this handle.
105    /// This transitively also brings in types encoding how to parse/serialize messages to the
106    /// wire.
107    type Message: Message;
108
109    /// Convert a FIDL handle into a proxyable instance (or fail).
110    fn from_fidl_handle(hdl: fidl::Handle) -> Result<Self, Error>;
111    /// Collapse this Proxyable instance back to the underlying FIDL handle (or fail).
112    fn into_fidl_handle(self) -> Result<fidl::Handle, Error>;
113    /// Clear/set signals on this handle's peer.
114    fn signal_peer(&self, clear: Signals, set: Signals) -> Result<(), Error>;
115    /// Set a reason for this handle to close.
116    fn close_with_reason(self, _msg: String) {}
117}
118
119pub(crate) trait ProxyableRW<'a>: Proxyable {
120    /// A type that can be used for communicating messages from the handle to the proxy code.
121    type Reader: 'a + IO<'a, Proxyable = Self, Output = ReadValue>;
122    /// A type that can be used for communicating messages from the proxy code to the handle.
123    type Writer: 'a + IO<'a, Proxyable = Self, Output = ()>;
124}
125
126pub(crate) trait IntoProxied {
127    type Proxied: Proxyable;
128    fn into_proxied(self) -> Result<Self::Proxied, Error>;
129}
130
131/// Wraps a Proxyable, adds some convenience values, and provides a nicer API.
132pub(crate) struct ProxyableHandle<Hdl: Proxyable> {
133    hdl: Hdl,
134    router: Weak<Router>,
135}
136
137impl<Hdl: Proxyable> std::fmt::Debug for ProxyableHandle<Hdl> {
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        write!(f, "{:?}#{:?}", self.hdl, Weak::upgrade(&self.router).map(|r| r.node_id()))
140    }
141}
142
143impl<Hdl: Proxyable> ProxyableHandle<Hdl> {
144    pub(crate) fn new(hdl: Hdl, router: Weak<Router>) -> Self {
145        Self { hdl, router }
146    }
147
148    pub(crate) fn into_fidl_handle(self) -> Result<fidl::Handle, Error> {
149        self.hdl.into_fidl_handle()
150    }
151
152    pub(crate) fn close_with_reason(self, msg: String) {
153        self.hdl.close_with_reason(msg);
154    }
155
156    /// Write `msg` to the handle.
157    pub(crate) fn write<'a>(
158        &'a self,
159        msg: &'a mut Hdl::Message,
160    ) -> impl 'a + Future<Output = Result<(), zx_status::Status>> + Unpin
161    where
162        Hdl: ProxyableRW<'a>,
163    {
164        self.handle_io(msg, Hdl::Writer::new())
165    }
166
167    /// Attempt to read one `msg` from the handle.
168    /// Return Ok(Message) if a message was read.
169    /// Return Ok(SignalUpdate) if a signal was instead read.
170    /// Return Err(_) if an error occurred.
171    pub(crate) fn read<'a>(
172        &'a self,
173        msg: &'a mut Hdl::Message,
174    ) -> impl 'a + Future<Output = Result<ReadValue, zx_status::Status>> + Unpin
175    where
176        Hdl: ProxyableRW<'a>,
177    {
178        self.handle_io(msg, Hdl::Reader::new())
179    }
180
181    pub(crate) fn router(&self) -> &Weak<Router> {
182        &self.router
183    }
184
185    /// Given a signal update from the wire, apply it to the underlying handle (signalling
186    /// the peer and completing the loop).
187    pub(crate) fn apply_signal_update(&self, signal_update: SignalUpdate) -> Result<(), Error> {
188        if let Some(assert_signals) = signal_update.assert_signals {
189            self.hdl
190                .signal_peer(Signals::empty(), self::signals::from_wire_signals(assert_signals))?
191        }
192        Ok(())
193    }
194
195    fn handle_io<'a, I: 'a + IO<'a, Proxyable = Hdl>>(
196        &'a self,
197        msg: &'a mut Hdl::Message,
198        mut io: I,
199    ) -> impl 'a + Future<Output = Result<<I as IO<'a>>::Output, zx_status::Status>> + Unpin {
200        poll_fn(move |fut_ctx| io.poll_io(msg, &self.hdl, fut_ctx))
201    }
202
203    /// Drain all remaining messages from this handle and write them to `stream_writer`.
204    /// Assumes that nothing else is writing to the handle, so that getting Poll::Pending on read
205    /// is a signal that all messages have been read.
206    pub(crate) async fn drain_to_stream(
207        &self,
208        stream_writer: &mut StreamWriter<Hdl::Message>,
209    ) -> Result<(), Error>
210    where
211        Hdl: for<'a> ProxyableRW<'a>,
212    {
213        let mut message = Default::default();
214        loop {
215            let pr = self.read(&mut message).poll_unpin(&mut Context::from_waker(noop_waker_ref()));
216            match pr {
217                Poll::Pending => return Ok(()),
218                Poll::Ready(Err(e)) => return Err(e.into()),
219                Poll::Ready(Ok(ReadValue::Message)) => {
220                    stream_writer.send_data(&mut message).await?
221                }
222                Poll::Ready(Ok(ReadValue::SignalUpdate(signal_update))) => {
223                    stream_writer.send_signal(signal_update).await?
224                }
225            }
226        }
227    }
228
229    /// Drain all messages on a stream into this handle.
230    pub(crate) async fn drain_stream_to_handle(
231        self,
232        drain_stream: FramedStreamReader,
233    ) -> Result<fidl::Handle, Error>
234    where
235        Hdl: for<'a> ProxyableRW<'a>,
236    {
237        let mut drain_stream = drain_stream.bind(&self);
238        loop {
239            match drain_stream.next().await? {
240                Frame::Data(message) => self.write(message).await?,
241                Frame::SignalUpdate(signal_update) => self.apply_signal_update(signal_update)?,
242                Frame::EndTransfer => return Ok(self.hdl.into_fidl_handle()?),
243                Frame::Hello => bail!("Hello frame disallowed on drain streams"),
244                Frame::BeginTransfer(_, _) => bail!("BeginTransfer on drain stream"),
245                Frame::AckTransfer => bail!("AckTransfer on drain stream"),
246                Frame::Shutdown(r) => bail!("Stream shutdown during drain: {:?}", r),
247            }
248        }
249    }
250}