overnet_core/proxy/
mod.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod handle;
6mod run;
7mod stream;
8
9use self::handle::{ProxyableHandle, ReadValue};
10use self::stream::StreamWriter;
11use crate::labels::{NodeId, TransferKey};
12use crate::peer::FramedStreamWriter;
13use crate::router::Router;
14use anyhow::{format_err, Error};
15use fidl_fuchsia_overnet_protocol::{
16    SignalUpdate, StreamId, StreamRef, TransferInitiator, TransferWaiter,
17};
18use futures::prelude::*;
19use std::pin::Pin;
20use std::sync::{Arc, Weak};
21use std::task::{Context, Poll};
22use zx_status;
23
24pub(crate) use self::handle::{IntoProxied, Proxyable, ProxyableRW};
25pub(crate) use self::run::spawn::{recv as spawn_recv, send as spawn_send};
26
27pub use self::run::set_proxy_drop_event_handler;
28
29#[derive(Debug)]
30pub(crate) enum RemoveFromProxyTable {
31    InitiateTransfer {
32        paired_handle: fidl::Handle,
33        drain_stream: FramedStreamWriter,
34        stream_ref_sender: StreamRefSender,
35    },
36    Dropped,
37}
38
39impl RemoveFromProxyTable {
40    pub(crate) fn is_dropped(&self) -> bool {
41        match self {
42            RemoveFromProxyTable::Dropped => true,
43            _ => false,
44        }
45    }
46}
47
48pub(crate) struct ProxyTransferInitiationReceiver(
49    Pin<Box<dyn Send + Future<Output = Result<RemoveFromProxyTable, Error>>>>,
50);
51
52impl Future for ProxyTransferInitiationReceiver {
53    type Output = Result<RemoveFromProxyTable, Error>;
54    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
55        self.0.as_mut().poll(ctx)
56    }
57}
58
59impl std::fmt::Debug for ProxyTransferInitiationReceiver {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        "_".fmt(f)
62    }
63}
64
65impl ProxyTransferInitiationReceiver {
66    pub(crate) fn new(
67        f: impl 'static + Send + Future<Output = Result<RemoveFromProxyTable, Error>>,
68    ) -> Self {
69        Self(Box::pin(f))
70    }
71}
72
73pub(crate) struct StreamRefSender {
74    chan: futures::channel::oneshot::Sender<StreamRef>,
75}
76
77impl std::fmt::Debug for StreamRefSender {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        "StreamRefSender".fmt(f)
80    }
81}
82
83impl StreamRefSender {
84    pub(crate) fn new() -> (Self, futures::channel::oneshot::Receiver<StreamRef>) {
85        let (tx, rx) = futures::channel::oneshot::channel();
86        (Self { chan: tx }, rx)
87    }
88
89    fn send(self, stream_ref: StreamRef) -> Result<(), Error> {
90        Ok(self.chan.send(stream_ref).map_err(|_| format_err!("Failed sending StreamRef"))?)
91    }
92
93    pub(crate) fn draining_initiate(
94        self,
95        stream_id: u64,
96        new_destination_node: NodeId,
97        transfer_key: TransferKey,
98    ) -> Result<(), Error> {
99        Ok(self.send(StreamRef::TransferInitiator(TransferInitiator {
100            stream_id: StreamId { id: stream_id },
101            new_destination_node: new_destination_node.into(),
102            transfer_key,
103        }))?)
104    }
105
106    pub(crate) fn draining_await(
107        self,
108        stream_id: u64,
109        transfer_key: TransferKey,
110    ) -> Result<(), Error> {
111        Ok(self.send(StreamRef::TransferWaiter(TransferWaiter {
112            stream_id: StreamId { id: stream_id },
113            transfer_key,
114        }))?)
115    }
116}
117
118mod proxy_count {
119    use std::sync::Mutex;
120
121    pub struct ProxyCount {
122        count: usize,
123        high_water: usize,
124        increment: usize,
125    }
126
127    pub static PROXY_COUNT: Mutex<ProxyCount> =
128        Mutex::new(ProxyCount { count: 0, high_water: 0, increment: 100 });
129
130    impl ProxyCount {
131        pub fn increment(&mut self) {
132            self.count += 1;
133
134            if self.count == self.high_water + self.increment {
135                self.high_water += self.increment;
136                if self.count == self.increment * 10 {
137                    self.increment *= 10;
138                }
139                log::info!("{} proxies extant or never reaped", self.count)
140            }
141        }
142
143        pub fn decrement(&mut self) {
144            if self.count == 0 {
145                log::warn!("proxy counter went below zero");
146            } else {
147                self.count -= 1;
148            }
149        }
150    }
151}
152
153#[derive(Debug)]
154pub(crate) struct Proxy<Hdl: Proxyable + 'static> {
155    hdl: Option<ProxyableHandle<Hdl>>,
156}
157
158impl<Hdl: 'static + Proxyable> Drop for Proxy<Hdl> {
159    fn drop(&mut self) {
160        proxy_count::PROXY_COUNT.lock().unwrap().decrement();
161    }
162}
163
164impl<Hdl: 'static + Proxyable> Proxy<Hdl> {
165    fn new(hdl: Hdl, router: Weak<Router>) -> Arc<Self> {
166        proxy_count::PROXY_COUNT.lock().unwrap().increment();
167        Arc::new(Self { hdl: Some(ProxyableHandle::new(hdl, router)) })
168    }
169
170    fn close_with_reason(mut self, msg: String) {
171        if let Some(hdl) = self.hdl.take() {
172            hdl.close_with_reason(msg);
173        }
174    }
175
176    fn hdl(&self) -> &ProxyableHandle<Hdl> {
177        self.hdl.as_ref().unwrap()
178    }
179
180    async fn write_to_handle(&self, msg: &mut Hdl::Message) -> Result<(), zx_status::Status>
181    where
182        Hdl: for<'a> ProxyableRW<'a>,
183    {
184        self.hdl().write(msg).await
185    }
186
187    fn apply_signal_update(&self, signal_update: SignalUpdate) -> Result<(), Error> {
188        self.hdl().apply_signal_update(signal_update)
189    }
190
191    fn read_from_handle<'a>(
192        &'a self,
193        msg: &'a mut Hdl::Message,
194    ) -> impl 'a + Future<Output = Result<ReadValue, zx_status::Status>> + Unpin
195    where
196        Hdl: ProxyableRW<'a>,
197    {
198        self.hdl().read(msg)
199    }
200
201    async fn drain_handle_to_stream(
202        &self,
203        stream_writer: &mut StreamWriter<Hdl::Message>,
204    ) -> Result<(), Error>
205    where
206        Hdl: for<'a> ProxyableRW<'a>,
207    {
208        self.hdl().drain_to_stream(stream_writer).await
209    }
210}