1use anyhow::{Context as _, Error};
8use fidl::endpoints::{ClientEnd, ProtocolMarker, Proxy as _};
9use fidl_fuchsia_net::{self as fnet, MarkDomain};
10use fidl_fuchsia_posix_socket::{
11 self as fposix_socket, OptionalUint32, ProviderDatagramSocketResponse,
12 ProviderDatagramSocketWithOptionsResponse,
13};
14use fuchsia_component::client::connect_to_protocol;
15use fuchsia_inspect_derive::{IValue, Inspect, Unit};
16use futures::lock::Mutex;
17use futures::{Future, StreamExt as _, TryStreamExt as _};
18use std::sync::Arc;
19use {fidl_fuchsia_posix as fposix, fidl_fuchsia_posix_socket_raw as fposix_socket_raw};
20
21#[derive(Unit, Default)]
39struct SocketProviderInspect {
40 sockets: u32,
41 stream: PerType,
42 synchronous_datagram: PerType,
43 datagram: PerType,
44 raw: PerType,
45}
46
47#[derive(Unit, Default)]
48struct PerType {
49 proxied: u32,
50 unmarked: u32,
51}
52
53impl PerType {
54 pub(crate) fn track(&mut self, marks: crate::SocketMarks) {
55 self.proxied += 1;
56 self.unmarked += u32::from(!marks.has_value());
57 }
58}
59
60trait Markable {
61 fn mark(
62 &self,
63 domain: MarkDomain,
64 mark: OptionalUint32,
65 ) -> impl Future<Output = Result<Result<(), fposix::Errno>, fidl::Error>>;
66}
67
68macro_rules! impl_markable {
69 ($($ty:ty),*) => {
70 $(
71 impl Markable for $ty {
72 fn mark(
73 &self,
74 domain: fnet::MarkDomain,
75 mark: OptionalUint32,
76 ) -> impl Future<Output = Result<Result<(), fposix::Errno>, fidl::Error>> {
77 self.set_mark(domain, &mark)
78 }
79 }
80 )*
81 };
82 ($($ty:ty),*,) => { impl_markable!($($ty),*); };
83}
84
85impl_markable!(
86 fposix_socket::StreamSocketProxy,
87 fposix_socket::SynchronousDatagramSocketProxy,
88 fposix_socket::DatagramSocketProxy,
89 fposix_socket_raw::SocketProxy,
90);
91
92trait Marked: Sized {
93 fn marked(
94 self,
95 marks: crate::SocketMarks,
96 ) -> impl Future<Output = Result<Result<Self, fposix::Errno>, Error>>;
97}
98
99impl<Marker> Marked for ClientEnd<Marker>
100where
101 Marker: ProtocolMarker,
102 Marker::Proxy: fidl::endpoints::Proxy<Protocol = Marker> + Markable,
103{
104 fn marked(
105 self,
106 marks: crate::SocketMarks,
107 ) -> impl Future<Output = Result<Result<Self, fposix::Errno>, Error>> {
108 async move {
109 let proxy = self.into_proxy();
110 Ok(
111 match Result::and(
112 proxy.mark(MarkDomain::Mark1, marks.mark_1).await?,
113 proxy.mark(MarkDomain::Mark2, marks.mark_2).await?,
114 ) {
115 Ok(()) => Ok(proxy.into_client_end().map_err(|_| {
116 anyhow::anyhow!("Failed to convert socket proxy back into client end")
117 })?),
118
119 Err(e) => Err(e),
120 },
121 )
122 }
123 }
124}
125
126#[derive(Inspect, Clone)]
127pub(crate) struct SocketProvider {
128 marks: Arc<Mutex<crate::SocketMarks>>,
129
130 #[inspect(forward)]
131 metrics: Arc<Mutex<IValue<SocketProviderInspect>>>,
132}
133
134impl SocketProvider {
135 pub(crate) fn new(mark: Arc<Mutex<crate::SocketMarks>>) -> Self {
136 Self { marks: mark, metrics: Default::default() }
137 }
138
139 pub(crate) async fn run(
141 &self,
142 stream: fposix_socket::ProviderRequestStream,
143 ) -> Result<(), Error> {
144 let inner_provider = connect_to_protocol::<fposix_socket::ProviderMarker>()
145 .context("Failed to connect to inner server")?;
146 stream
147 .map(|result| result.context("failed request"))
148 .try_for_each(|request| async {
149 match request {
150 fposix_socket::ProviderRequest::StreamSocket { domain, proto, responder } => {
151 let marks = *self.marks.lock().await;
152 let mut metrics_lock = self.metrics.lock().await;
153 let mut metrics = metrics_lock.as_mut();
154 metrics.sockets += 1;
155 metrics.stream.track(marks);
156 responder.send(
157 inner_provider
158 .stream_socket_with_options(domain, proto, &fposix_socket::SocketCreationOptions {
159 marks: Some(marks.into()),
160 ..Default::default()
161 })
162 .await?,
163 )?;
164 }
165 fposix_socket::ProviderRequest::StreamSocketWithOptions {
166 domain,
167 proto,
168 responder,
169 opts,
170 } => {
171 let marks = *self.marks.lock().await;
172 let mut metrics_lock = self.metrics.lock().await;
173 let mut metrics = metrics_lock.as_mut();
174 metrics.sockets += 1;
175 metrics.stream.track(marks);
176 if let Some(opts_marks) = opts.marks {
177 log::warn!(
178 "stream socket marks supplied by creation opts {:?} \
179 will be overriden by {:?}",
180 opts_marks,
181 marks
182 );
183 }
184 responder.send(
185 inner_provider
186 .stream_socket_with_options(domain, proto, &fposix_socket::SocketCreationOptions {
187 marks: Some(marks.into()),
188 ..opts
189 })
190 .await?,
191 )?;
192 }
193 fposix_socket::ProviderRequest::DatagramSocketDeprecated {
194 domain,
195 proto,
196 responder,
197 } => {
198 let marks = *self.marks.lock().await;
199 let mut metrics_lock = self.metrics.lock().await;
200 let mut metrics = metrics_lock.as_mut();
201 metrics.sockets += 1;
202 metrics.synchronous_datagram.track(marks);
203 responder.send(
204 match inner_provider.datagram_socket_deprecated(domain, proto).await? {
205 Ok(socket) => socket.marked(marks).await?,
206 e => e,
207 },
208 )?;
209 }
210 fposix_socket::ProviderRequest::DatagramSocket { domain, proto, responder } => {
211 let marks = *self.marks.lock().await;
212 let mut metrics_lock = self.metrics.lock().await;
213 let mut metrics = metrics_lock.as_mut();
214 metrics.sockets += 1;
215 let datagram_socket = inner_provider
216 .datagram_socket_with_options(domain, proto, &fposix_socket::SocketCreationOptions {
217 marks: Some(marks.into()),
218 ..Default::default()
219 })
220 .await?
221 .map(|response| {
222 match response {
223 ProviderDatagramSocketWithOptionsResponse::DatagramSocket(client_end)
224 => {
225 metrics.datagram.track(marks);
226 ProviderDatagramSocketResponse::DatagramSocket(client_end)
227 }
228 ProviderDatagramSocketWithOptionsResponse::SynchronousDatagramSocket(
229 client_end,
230 ) => {
231 metrics.synchronous_datagram.track(marks);
232 ProviderDatagramSocketResponse::SynchronousDatagramSocket(
233 client_end,
234 )
235 },
236 }
237 });
238 responder.send(datagram_socket)?
239 }
240 fposix_socket::ProviderRequest::DatagramSocketWithOptions {
241 domain,
242 proto,
243 opts,
244 responder,
245 } => {
246 let marks = *self.marks.lock().await;
247 let mut metrics_lock = self.metrics.lock().await;
248 let mut metrics = metrics_lock.as_mut();
249 metrics.sockets += 1;
250 if let Some(opts_marks) = opts.marks {
251 log::warn!(
252 "datagram marks supplied by creation opts {:?} \
253 will be overriden by {:?}",
254 opts_marks,
255 marks
256 );
257 }
258 let datagram_socket = inner_provider
259 .datagram_socket_with_options(domain, proto, &fposix_socket::SocketCreationOptions {
260 marks: Some(marks.into()),
261 ..opts
262 })
263 .await?
264 .map(|response| {
265 match response {
266 ProviderDatagramSocketWithOptionsResponse::DatagramSocket(_)
267 => {
268 metrics.datagram.track(marks);
269 }
270 ProviderDatagramSocketWithOptionsResponse::SynchronousDatagramSocket(_) => {
271 metrics.synchronous_datagram.track(marks);
272 },
273 };
274 response
275 });
276 responder.send(datagram_socket)?
277 }
278 fposix_socket::ProviderRequest::InterfaceIndexToName { index, responder } => {
279 let name = inner_provider.interface_index_to_name(index).await?;
280 responder.send(match &name {
281 Ok(n) => Ok(n),
282 Err(e) => Err(*e),
283 })?;
284 }
285 fposix_socket::ProviderRequest::InterfaceNameToIndex { name, responder } => {
286 responder.send(inner_provider.interface_name_to_index(&name).await?)?
287 }
288 fposix_socket::ProviderRequest::InterfaceNameToFlags { name, responder } => {
289 responder.send(inner_provider.interface_name_to_flags(&name).await?)?
290 }
291 fposix_socket::ProviderRequest::GetInterfaceAddresses { responder } => {
292 responder.send(&inner_provider.get_interface_addresses().await?)?
293 }
294 }
295
296 Ok(())
297 })
298 .await
299 }
300
301 pub(crate) async fn run_raw(
303 &self,
304 stream: fposix_socket_raw::ProviderRequestStream,
305 ) -> Result<(), Error> {
306 let inner_provider = connect_to_protocol::<fposix_socket_raw::ProviderMarker>()
307 .context("Failed to connect to inner server")?;
308 stream
309 .map(|result| result.context("failed request"))
310 .try_for_each(|request| async {
311 match request {
312 fposix_socket_raw::ProviderRequest::Socket { domain, proto, responder } => {
313 let marks = *self.marks.lock().await;
314 let mut metrics_lock = self.metrics.lock().await;
315 let mut metrics = metrics_lock.as_mut();
316 metrics.sockets += 1;
317 metrics.raw.track(marks);
318 responder.send(
319 inner_provider
320 .socket_with_options(
321 domain,
322 &proto,
323 &fposix_socket::SocketCreationOptions {
324 marks: Some(marks.into()),
325 ..Default::default()
326 },
327 )
328 .await?,
329 )?
330 }
331 fposix_socket_raw::ProviderRequest::SocketWithOptions {
332 domain,
333 proto,
334 opts,
335 responder,
336 } => {
337 let marks = *self.marks.lock().await;
338 let mut metrics_lock = self.metrics.lock().await;
339 let mut metrics = metrics_lock.as_mut();
340 metrics.sockets += 1;
341 metrics.raw.track(marks);
342 if let Some(opts_marks) = opts.marks {
343 log::warn!(
344 "raw socket marks supplied by creation opts {:?} \
345 will be overriden by {:?}",
346 opts_marks,
347 marks
348 );
349 }
350 responder.send(
351 inner_provider
352 .socket_with_options(
353 domain,
354 &proto,
355 &fposix_socket::SocketCreationOptions {
356 marks: Some(marks.into()),
357 ..opts
358 },
359 )
360 .await?,
361 )?
362 }
363 }
364
365 Ok(())
366 })
367 .await
368 }
369}