1use anyhow::{Context as _, Error};
8use fidl::endpoints::{ClientEnd, ProtocolMarker, Proxy as _};
9use fidl_fuchsia_net::{self as fnet, MarkDomain};
10use fidl_fuchsia_posix_socket::{self as fposix_socket, OptionalUint32};
11use fuchsia_component::client::connect_to_protocol;
12use fuchsia_inspect_derive::{IValue, Inspect, Unit};
13use futures::lock::Mutex;
14use futures::{Future, StreamExt as _, TryStreamExt as _};
15use std::sync::Arc;
16use {fidl_fuchsia_posix as fposix, fidl_fuchsia_posix_socket_raw as fposix_socket_raw};
17
18#[derive(Unit, Default)]
36struct SocketProviderInspect {
37 sockets: u32,
38 stream: PerType,
39 synchronous_datagram: PerType,
40 datagram: PerType,
41 raw: PerType,
42}
43
44#[derive(Unit, Default)]
45struct PerType {
46 proxied: u32,
47 unmarked: u32,
48}
49
50impl PerType {
51 pub(crate) fn track(&mut self, marks: crate::SocketMarks) {
52 self.proxied += 1;
53 self.unmarked += u32::from(!marks.has_value());
54 }
55}
56
57trait Markable {
58 fn mark(
59 &self,
60 domain: MarkDomain,
61 mark: OptionalUint32,
62 ) -> impl Future<Output = Result<Result<(), fposix::Errno>, fidl::Error>>;
63}
64
65macro_rules! impl_markable {
66 ($($ty:ty),*) => {
67 $(
68 impl Markable for $ty {
69 fn mark(
70 &self,
71 domain: fnet::MarkDomain,
72 mark: OptionalUint32,
73 ) -> impl Future<Output = Result<Result<(), fposix::Errno>, fidl::Error>> {
74 self.set_mark(domain, &mark)
75 }
76 }
77 )*
78 };
79 ($($ty:ty),*,) => { impl_markable!($($ty),*); };
80}
81
82impl_markable!(
83 fposix_socket::StreamSocketProxy,
84 fposix_socket::SynchronousDatagramSocketProxy,
85 fposix_socket::DatagramSocketProxy,
86 fposix_socket_raw::SocketProxy,
87);
88
89trait Marked: Sized {
90 fn marked(
91 self,
92 marks: crate::SocketMarks,
93 ) -> impl Future<Output = Result<Result<Self, fposix::Errno>, Error>>;
94}
95
96impl<Marker> Marked for ClientEnd<Marker>
97where
98 Marker: ProtocolMarker,
99 Marker::Proxy: fidl::endpoints::Proxy<Protocol = Marker> + Markable,
100{
101 fn marked(
102 self,
103 marks: crate::SocketMarks,
104 ) -> impl Future<Output = Result<Result<Self, fposix::Errno>, Error>> {
105 async move {
106 let proxy = self.into_proxy();
107 Ok(
108 match Result::and(
109 proxy.mark(MarkDomain::Mark1, marks.mark_1).await?,
110 proxy.mark(MarkDomain::Mark2, marks.mark_2).await?,
111 ) {
112 Ok(()) => Ok(proxy.into_client_end().map_err(|_| {
113 anyhow::anyhow!("Failed to convert socket proxy back into client end")
114 })?),
115
116 Err(e) => Err(e),
117 },
118 )
119 }
120 }
121}
122
123#[derive(Inspect, Clone)]
124pub(crate) struct SocketProvider {
125 marks: Arc<Mutex<crate::SocketMarks>>,
126
127 #[inspect(forward)]
128 metrics: Arc<Mutex<IValue<SocketProviderInspect>>>,
129}
130
131impl SocketProvider {
132 pub(crate) fn new(mark: Arc<Mutex<crate::SocketMarks>>) -> Self {
133 Self { marks: mark, metrics: Default::default() }
134 }
135
136 pub(crate) async fn run(
138 &self,
139 stream: fposix_socket::ProviderRequestStream,
140 ) -> Result<(), Error> {
141 let inner_provider = connect_to_protocol::<fposix_socket::ProviderMarker>()
142 .context("Failed to connect to inner server")?;
143 stream
144 .map(|result| result.context("failed request"))
145 .try_for_each(|request| async {
146 match request {
147 fposix_socket::ProviderRequest::StreamSocket { domain, proto, responder } => {
148 let marks = *self.marks.lock().await;
149 let mut metrics_lock = self.metrics.lock().await;
150 let mut metrics = metrics_lock.as_mut();
151 metrics.sockets += 1;
152 metrics.stream.track(marks);
153 responder.send(
154 inner_provider
155 .stream_socket_with_options(domain, proto, &fposix_socket::SocketCreationOptions {
156 marks: Some(marks.into()),
157 ..Default::default()
158 })
159 .await?,
160 )?;
161 }
162 fposix_socket::ProviderRequest::StreamSocketWithOptions {
163 domain,
164 proto,
165 responder,
166 opts,
167 } => {
168 let marks = *self.marks.lock().await;
169 let mut metrics_lock = self.metrics.lock().await;
170 let mut metrics = metrics_lock.as_mut();
171 metrics.sockets += 1;
172 metrics.stream.track(marks);
173 if let Some(opts_marks) = opts.marks {
174 log::warn!(
175 "stream socket marks supplied by creation opts {:?} \
176 will be overriden by {:?}",
177 opts_marks,
178 marks
179 );
180 }
181 responder.send(
182 inner_provider
183 .stream_socket_with_options(domain, proto, &fposix_socket::SocketCreationOptions {
184 marks: Some(marks.into()),
185 ..opts
186 })
187 .await?,
188 )?;
189 }
190 fposix_socket::ProviderRequest::DatagramSocketDeprecated {
191 domain,
192 proto,
193 responder,
194 } => {
195 let marks = *self.marks.lock().await;
196 let mut metrics_lock = self.metrics.lock().await;
197 let mut metrics = metrics_lock.as_mut();
198 metrics.sockets += 1;
199 metrics.synchronous_datagram.track(marks);
200 responder.send(
201 match inner_provider.datagram_socket_deprecated(domain, proto).await? {
202 Ok(socket) => socket.marked(marks).await?,
203 e => e,
204 },
205 )?;
206 }
207 fposix_socket::ProviderRequest::DatagramSocket { domain, proto, responder } => {
208 let marks = *self.marks.lock().await;
209 let mut metrics_lock = self.metrics.lock().await;
210 let mut metrics = metrics_lock.as_mut();
211 metrics.sockets += 1;
212 use fposix_socket::{
213 ProviderDatagramSocketResponse, ProviderDatagramSocketWithOptionsResponse,
214 };
215 let response = inner_provider
216 .datagram_socket_with_options(domain, proto, &fposix_socket::SocketCreationOptions {
217 marks: Some(marks.into()),
218 ..Default::default()
219 })
220 .await?
221 .map(|response| {
222 match response {
223 ProviderDatagramSocketWithOptionsResponse::DatagramSocket(client_end)
224 => {
225 ProviderDatagramSocketResponse::DatagramSocket(client_end)
226 }
227 ProviderDatagramSocketWithOptionsResponse::SynchronousDatagramSocket(
228 client_end,
229 ) => ProviderDatagramSocketResponse::SynchronousDatagramSocket(
230 client_end,
231 ),
232 }
233 });
234 responder.send(response)?
235 }
236 fposix_socket::ProviderRequest::DatagramSocketWithOptions {
237 domain,
238 proto,
239 opts,
240 responder,
241 } => {
242 let marks = *self.marks.lock().await;
243 let mut metrics_lock = self.metrics.lock().await;
244 let mut metrics = metrics_lock.as_mut();
245 metrics.sockets += 1;
246 if let Some(opts_marks) = opts.marks {
247 log::warn!(
248 "datagram marks supplied by creation opts {:?} \
249 will be overriden by {:?}",
250 opts_marks,
251 marks
252 );
253 }
254 responder.send(
255 inner_provider
256 .datagram_socket_with_options(domain, proto, &fposix_socket::SocketCreationOptions {
257 marks: Some(marks.into()),
258 ..opts
259 })
260 .await?,
261 )?
262 }
263 fposix_socket::ProviderRequest::InterfaceIndexToName { index, responder } => {
264 let name = inner_provider.interface_index_to_name(index).await?;
265 responder.send(match &name {
266 Ok(n) => Ok(n),
267 Err(e) => Err(*e),
268 })?;
269 }
270 fposix_socket::ProviderRequest::InterfaceNameToIndex { name, responder } => {
271 responder.send(inner_provider.interface_name_to_index(&name).await?)?
272 }
273 fposix_socket::ProviderRequest::InterfaceNameToFlags { name, responder } => {
274 responder.send(inner_provider.interface_name_to_flags(&name).await?)?
275 }
276 fposix_socket::ProviderRequest::GetInterfaceAddresses { responder } => {
277 responder.send(&inner_provider.get_interface_addresses().await?)?
278 }
279 }
280
281 Ok(())
282 })
283 .await
284 }
285
286 pub(crate) async fn run_raw(
288 &self,
289 stream: fposix_socket_raw::ProviderRequestStream,
290 ) -> Result<(), Error> {
291 let inner_provider = connect_to_protocol::<fposix_socket_raw::ProviderMarker>()
292 .context("Failed to connect to inner server")?;
293 stream
294 .map(|result| result.context("failed request"))
295 .try_for_each(|request| async {
296 match request {
297 fposix_socket_raw::ProviderRequest::Socket { domain, proto, responder } => {
298 let marks = *self.marks.lock().await;
299 let mut metrics_lock = self.metrics.lock().await;
300 let mut metrics = metrics_lock.as_mut();
301 metrics.sockets += 1;
302 metrics.raw.track(marks);
303 responder.send(
304 inner_provider
305 .socket_with_options(
306 domain,
307 &proto,
308 &fposix_socket::SocketCreationOptions {
309 marks: Some(marks.into()),
310 ..Default::default()
311 },
312 )
313 .await?,
314 )?
315 }
316 fposix_socket_raw::ProviderRequest::SocketWithOptions {
317 domain,
318 proto,
319 opts,
320 responder,
321 } => {
322 let marks = *self.marks.lock().await;
323 let mut metrics_lock = self.metrics.lock().await;
324 let mut metrics = metrics_lock.as_mut();
325 metrics.sockets += 1;
326 metrics.raw.track(marks);
327 if let Some(opts_marks) = opts.marks {
328 log::warn!(
329 "raw socket marks supplied by creation opts {:?} \
330 will be overriden by {:?}",
331 opts_marks,
332 marks
333 );
334 }
335 responder.send(
336 inner_provider
337 .socket_with_options(
338 domain,
339 &proto,
340 &fposix_socket::SocketCreationOptions {
341 marks: Some(marks.into()),
342 ..opts
343 },
344 )
345 .await?,
346 )?
347 }
348 }
349
350 Ok(())
351 })
352 .await
353 }
354}