overnet_core/proxy/
mod.rs1mod handle;
6mod run;
7mod stream;
8
9use self::handle::{ProxyableHandle, ReadValue};
10use self::stream::StreamWriter;
11use crate::labels::{NodeId, TransferKey};
12use crate::peer::FramedStreamWriter;
13use crate::router::Router;
14use anyhow::{format_err, Error};
15use fidl_fuchsia_overnet_protocol::{
16 SignalUpdate, StreamId, StreamRef, TransferInitiator, TransferWaiter,
17};
18use futures::prelude::*;
19use std::pin::Pin;
20use std::sync::{Arc, Weak};
21use std::task::{Context, Poll};
22use zx_status;
23
24pub(crate) use self::handle::{IntoProxied, Proxyable, ProxyableRW};
25pub(crate) use self::run::spawn::{recv as spawn_recv, send as spawn_send};
26
27pub use self::run::set_proxy_drop_event_handler;
28
29#[derive(Debug)]
30pub(crate) enum RemoveFromProxyTable {
31 InitiateTransfer {
32 paired_handle: fidl::Handle,
33 drain_stream: FramedStreamWriter,
34 stream_ref_sender: StreamRefSender,
35 },
36 Dropped,
37}
38
39impl RemoveFromProxyTable {
40 pub(crate) fn is_dropped(&self) -> bool {
41 match self {
42 RemoveFromProxyTable::Dropped => true,
43 _ => false,
44 }
45 }
46}
47
48pub(crate) struct ProxyTransferInitiationReceiver(
49 Pin<Box<dyn Send + Future<Output = Result<RemoveFromProxyTable, Error>>>>,
50);
51
52impl Future for ProxyTransferInitiationReceiver {
53 type Output = Result<RemoveFromProxyTable, Error>;
54 fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
55 self.0.as_mut().poll(ctx)
56 }
57}
58
59impl std::fmt::Debug for ProxyTransferInitiationReceiver {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 "_".fmt(f)
62 }
63}
64
65impl ProxyTransferInitiationReceiver {
66 pub(crate) fn new(
67 f: impl 'static + Send + Future<Output = Result<RemoveFromProxyTable, Error>>,
68 ) -> Self {
69 Self(Box::pin(f))
70 }
71}
72
73pub(crate) struct StreamRefSender {
74 chan: futures::channel::oneshot::Sender<StreamRef>,
75}
76
77impl std::fmt::Debug for StreamRefSender {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 "StreamRefSender".fmt(f)
80 }
81}
82
83impl StreamRefSender {
84 pub(crate) fn new() -> (Self, futures::channel::oneshot::Receiver<StreamRef>) {
85 let (tx, rx) = futures::channel::oneshot::channel();
86 (Self { chan: tx }, rx)
87 }
88
89 fn send(self, stream_ref: StreamRef) -> Result<(), Error> {
90 Ok(self.chan.send(stream_ref).map_err(|_| format_err!("Failed sending StreamRef"))?)
91 }
92
93 pub(crate) fn draining_initiate(
94 self,
95 stream_id: u64,
96 new_destination_node: NodeId,
97 transfer_key: TransferKey,
98 ) -> Result<(), Error> {
99 Ok(self.send(StreamRef::TransferInitiator(TransferInitiator {
100 stream_id: StreamId { id: stream_id },
101 new_destination_node: new_destination_node.into(),
102 transfer_key,
103 }))?)
104 }
105
106 pub(crate) fn draining_await(
107 self,
108 stream_id: u64,
109 transfer_key: TransferKey,
110 ) -> Result<(), Error> {
111 Ok(self.send(StreamRef::TransferWaiter(TransferWaiter {
112 stream_id: StreamId { id: stream_id },
113 transfer_key,
114 }))?)
115 }
116}
117
118mod proxy_count {
119 use std::sync::Mutex;
120
121 pub struct ProxyCount {
122 count: usize,
123 high_water: usize,
124 increment: usize,
125 }
126
127 pub static PROXY_COUNT: Mutex<ProxyCount> =
128 Mutex::new(ProxyCount { count: 0, high_water: 0, increment: 100 });
129
130 impl ProxyCount {
131 pub fn increment(&mut self) {
132 self.count += 1;
133
134 if self.count == self.high_water + self.increment {
135 self.high_water += self.increment;
136 if self.count == self.increment * 10 {
137 self.increment *= 10;
138 }
139 log::info!("{} proxies extant or never reaped", self.count)
140 }
141 }
142
143 pub fn decrement(&mut self) {
144 if self.count == 0 {
145 log::warn!("proxy counter went below zero");
146 } else {
147 self.count -= 1;
148 }
149 }
150 }
151}
152
153#[derive(Debug)]
154pub(crate) struct Proxy<Hdl: Proxyable + 'static> {
155 hdl: Option<ProxyableHandle<Hdl>>,
156}
157
158impl<Hdl: 'static + Proxyable> Drop for Proxy<Hdl> {
159 fn drop(&mut self) {
160 proxy_count::PROXY_COUNT.lock().unwrap().decrement();
161 }
162}
163
164impl<Hdl: 'static + Proxyable> Proxy<Hdl> {
165 fn new(hdl: Hdl, router: Weak<Router>) -> Arc<Self> {
166 proxy_count::PROXY_COUNT.lock().unwrap().increment();
167 Arc::new(Self { hdl: Some(ProxyableHandle::new(hdl, router)) })
168 }
169
170 fn close_with_reason(mut self, msg: String) {
171 if let Some(hdl) = self.hdl.take() {
172 hdl.close_with_reason(msg);
173 }
174 }
175
176 fn hdl(&self) -> &ProxyableHandle<Hdl> {
177 self.hdl.as_ref().unwrap()
178 }
179
180 async fn write_to_handle(&self, msg: &mut Hdl::Message) -> Result<(), zx_status::Status>
181 where
182 Hdl: for<'a> ProxyableRW<'a>,
183 {
184 self.hdl().write(msg).await
185 }
186
187 fn apply_signal_update(&self, signal_update: SignalUpdate) -> Result<(), Error> {
188 self.hdl().apply_signal_update(signal_update)
189 }
190
191 fn read_from_handle<'a>(
192 &'a self,
193 msg: &'a mut Hdl::Message,
194 ) -> impl 'a + Future<Output = Result<ReadValue, zx_status::Status>> + Unpin
195 where
196 Hdl: ProxyableRW<'a>,
197 {
198 self.hdl().read(msg)
199 }
200
201 async fn drain_handle_to_stream(
202 &self,
203 stream_writer: &mut StreamWriter<Hdl::Message>,
204 ) -> Result<(), Error>
205 where
206 Hdl: for<'a> ProxyableRW<'a>,
207 {
208 self.hdl().drain_to_stream(stream_writer).await
209 }
210}