socket_proxy/
socket_provider.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Implements fuchsia.posix.socket.Provider and fuchsia.posix.socket.raw.Provider.
6
7use anyhow::{Context as _, Error};
8use fidl::endpoints::{ClientEnd, ProtocolMarker, Proxy as _};
9use fidl_fuchsia_net::{self as fnet, MarkDomain};
10use fidl_fuchsia_posix_socket::{
11    self as fposix_socket, OptionalUint32, ProviderDatagramSocketResponse,
12    ProviderDatagramSocketWithOptionsResponse,
13};
14use fuchsia_component::client::connect_to_protocol;
15use fuchsia_inspect_derive::{IValue, Inspect, Unit};
16use futures::lock::Mutex;
17use futures::{Future, StreamExt as _, TryStreamExt as _};
18use std::sync::Arc;
19use {fidl_fuchsia_posix as fposix, fidl_fuchsia_posix_socket_raw as fposix_socket_raw};
20
21/// Inspect node for socket provider.
22///
23/// Manages the following inspect nodes:
24/// socket_provider:
25///   sockets = 0
26///   datagram:
27///     proxied = 0
28///     unmarked = 0
29///   raw:
30///     proxied = 0
31///     unmarked = 0
32///   stream:
33///     proxied = 0
34///     unmarked = 0
35///   synchronous_datagram:
36///     proxied = 0
37///     unmarked = 0
38#[derive(Unit, Default)]
39struct SocketProviderInspect {
40    sockets: u32,
41    stream: PerType,
42    synchronous_datagram: PerType,
43    datagram: PerType,
44    raw: PerType,
45}
46
47#[derive(Unit, Default)]
48struct PerType {
49    proxied: u32,
50    unmarked: u32,
51}
52
53impl PerType {
54    pub(crate) fn track(&mut self, marks: crate::SocketMarks) {
55        self.proxied += 1;
56        self.unmarked += u32::from(!marks.has_value());
57    }
58}
59
60trait Markable {
61    fn mark(
62        &self,
63        domain: MarkDomain,
64        mark: OptionalUint32,
65    ) -> impl Future<Output = Result<Result<(), fposix::Errno>, fidl::Error>>;
66}
67
68macro_rules! impl_markable {
69    ($($ty:ty),*) => {
70        $(
71            impl Markable for $ty {
72                fn mark(
73                    &self,
74                    domain: fnet::MarkDomain,
75                    mark: OptionalUint32,
76                ) -> impl Future<Output = Result<Result<(), fposix::Errno>, fidl::Error>> {
77                    self.set_mark(domain, &mark)
78                }
79            }
80        )*
81    };
82    ($($ty:ty),*,) => { impl_markable!($($ty),*); };
83}
84
85impl_markable!(
86    fposix_socket::StreamSocketProxy,
87    fposix_socket::SynchronousDatagramSocketProxy,
88    fposix_socket::DatagramSocketProxy,
89    fposix_socket_raw::SocketProxy,
90);
91
92trait Marked: Sized {
93    fn marked(
94        self,
95        marks: crate::SocketMarks,
96    ) -> impl Future<Output = Result<Result<Self, fposix::Errno>, Error>>;
97}
98
99impl<Marker> Marked for ClientEnd<Marker>
100where
101    Marker: ProtocolMarker,
102    Marker::Proxy: fidl::endpoints::Proxy<Protocol = Marker> + Markable,
103{
104    fn marked(
105        self,
106        marks: crate::SocketMarks,
107    ) -> impl Future<Output = Result<Result<Self, fposix::Errno>, Error>> {
108        async move {
109            let proxy = self.into_proxy();
110            Ok(
111                match Result::and(
112                    proxy.mark(MarkDomain::Mark1, marks.mark_1).await?,
113                    proxy.mark(MarkDomain::Mark2, marks.mark_2).await?,
114                ) {
115                    Ok(()) => Ok(proxy.into_client_end().map_err(|_| {
116                        anyhow::anyhow!("Failed to convert socket proxy back into client end")
117                    })?),
118
119                    Err(e) => Err(e),
120                },
121            )
122        }
123    }
124}
125
126#[derive(Inspect, Clone)]
127pub(crate) struct SocketProvider {
128    marks: Arc<Mutex<crate::SocketMarks>>,
129
130    #[inspect(forward)]
131    metrics: Arc<Mutex<IValue<SocketProviderInspect>>>,
132}
133
134impl SocketProvider {
135    pub(crate) fn new(mark: Arc<Mutex<crate::SocketMarks>>) -> Self {
136        Self { marks: mark, metrics: Default::default() }
137    }
138
139    /// Run an instance of fuchsia.posix.socket.Provider.
140    pub(crate) async fn run(
141        &self,
142        stream: fposix_socket::ProviderRequestStream,
143    ) -> Result<(), Error> {
144        let inner_provider = connect_to_protocol::<fposix_socket::ProviderMarker>()
145            .context("Failed to connect to inner server")?;
146        stream
147            .map(|result| result.context("failed request"))
148            .try_for_each(|request| async {
149                match request {
150                    fposix_socket::ProviderRequest::StreamSocket { domain, proto, responder } => {
151                        let marks = *self.marks.lock().await;
152                        let mut metrics_lock = self.metrics.lock().await;
153                        let mut metrics = metrics_lock.as_mut();
154                        metrics.sockets += 1;
155                        metrics.stream.track(marks);
156                        responder.send(
157                            inner_provider
158                                .stream_socket_with_options(domain, proto, &fposix_socket::SocketCreationOptions {
159                                    marks: Some(marks.into()),
160                                    ..Default::default()
161                                })
162                                .await?,
163                        )?;
164                    }
165                    fposix_socket::ProviderRequest::StreamSocketWithOptions {
166                        domain,
167                        proto,
168                        responder,
169                        opts,
170                    } => {
171                        let marks = *self.marks.lock().await;
172                        let mut metrics_lock = self.metrics.lock().await;
173                        let mut metrics = metrics_lock.as_mut();
174                        metrics.sockets += 1;
175                        metrics.stream.track(marks);
176                        if let Some(opts_marks) = opts.marks {
177                            log::warn!(
178                                "stream socket marks supplied by creation opts {:?} \
179                                will be overriden by {:?}",
180                                opts_marks,
181                                marks
182                            );
183                        }
184                        responder.send(
185                            inner_provider
186                                .stream_socket_with_options(domain, proto, &fposix_socket::SocketCreationOptions {
187                                    marks: Some(marks.into()),
188                                    ..opts
189                                })
190                                .await?,
191                        )?;
192                    }
193                    fposix_socket::ProviderRequest::DatagramSocketDeprecated {
194                        domain,
195                        proto,
196                        responder,
197                    } => {
198                        let marks = *self.marks.lock().await;
199                        let mut metrics_lock = self.metrics.lock().await;
200                        let mut metrics = metrics_lock.as_mut();
201                        metrics.sockets += 1;
202                        metrics.synchronous_datagram.track(marks);
203                        responder.send(
204                            match inner_provider.datagram_socket_deprecated(domain, proto).await? {
205                                Ok(socket) => socket.marked(marks).await?,
206                                e => e,
207                            },
208                        )?;
209                    }
210                    fposix_socket::ProviderRequest::DatagramSocket { domain, proto, responder } => {
211                        let marks = *self.marks.lock().await;
212                        let mut metrics_lock = self.metrics.lock().await;
213                        let mut metrics = metrics_lock.as_mut();
214                        metrics.sockets += 1;
215                        let datagram_socket = inner_provider
216                            .datagram_socket_with_options(domain, proto, &fposix_socket::SocketCreationOptions {
217                                marks: Some(marks.into()),
218                                ..Default::default()
219                            })
220                            .await?
221                            .map(|response| {
222                                match response {
223                                ProviderDatagramSocketWithOptionsResponse::DatagramSocket(client_end)
224                                => {
225                                    metrics.datagram.track(marks);
226                                    ProviderDatagramSocketResponse::DatagramSocket(client_end)
227                                }
228                                ProviderDatagramSocketWithOptionsResponse::SynchronousDatagramSocket(
229                                    client_end,
230                                ) => {
231                                    metrics.synchronous_datagram.track(marks);
232                                    ProviderDatagramSocketResponse::SynchronousDatagramSocket(
233                                    client_end,
234                                    )
235                                },
236                            }
237                            });
238                        responder.send(datagram_socket)?
239                    }
240                    fposix_socket::ProviderRequest::DatagramSocketWithOptions {
241                        domain,
242                        proto,
243                        opts,
244                        responder,
245                    } => {
246                        let marks = *self.marks.lock().await;
247                        let mut metrics_lock = self.metrics.lock().await;
248                        let mut metrics = metrics_lock.as_mut();
249                        metrics.sockets += 1;
250                        if let Some(opts_marks) = opts.marks {
251                            log::warn!(
252                                "datagram marks supplied by creation opts {:?} \
253                                will be overriden by {:?}",
254                                opts_marks,
255                                marks
256                            );
257                        }
258                        let datagram_socket = inner_provider
259                                .datagram_socket_with_options(domain, proto, &fposix_socket::SocketCreationOptions {
260                                    marks: Some(marks.into()),
261                                    ..opts
262                                })
263                                .await?
264                            .map(|response| {
265                                match response {
266                                ProviderDatagramSocketWithOptionsResponse::DatagramSocket(_)
267                                => {
268                                    metrics.datagram.track(marks);
269                                }
270                                ProviderDatagramSocketWithOptionsResponse::SynchronousDatagramSocket(_) => {
271                                    metrics.synchronous_datagram.track(marks);
272                                },
273                                };
274                                response
275                            });
276                        responder.send(datagram_socket)?
277                    }
278                    fposix_socket::ProviderRequest::InterfaceIndexToName { index, responder } => {
279                        let name = inner_provider.interface_index_to_name(index).await?;
280                        responder.send(match &name {
281                            Ok(n) => Ok(n),
282                            Err(e) => Err(*e),
283                        })?;
284                    }
285                    fposix_socket::ProviderRequest::InterfaceNameToIndex { name, responder } => {
286                        responder.send(inner_provider.interface_name_to_index(&name).await?)?
287                    }
288                    fposix_socket::ProviderRequest::InterfaceNameToFlags { name, responder } => {
289                        responder.send(inner_provider.interface_name_to_flags(&name).await?)?
290                    }
291                    fposix_socket::ProviderRequest::GetInterfaceAddresses { responder } => {
292                        responder.send(&inner_provider.get_interface_addresses().await?)?
293                    }
294                }
295
296                Ok(())
297            })
298            .await
299    }
300
301    /// Run an instance of fuchsia.posix.socket.raw.Provider.
302    pub(crate) async fn run_raw(
303        &self,
304        stream: fposix_socket_raw::ProviderRequestStream,
305    ) -> Result<(), Error> {
306        let inner_provider = connect_to_protocol::<fposix_socket_raw::ProviderMarker>()
307            .context("Failed to connect to inner server")?;
308        stream
309            .map(|result| result.context("failed request"))
310            .try_for_each(|request| async {
311                match request {
312                    fposix_socket_raw::ProviderRequest::Socket { domain, proto, responder } => {
313                        let marks = *self.marks.lock().await;
314                        let mut metrics_lock = self.metrics.lock().await;
315                        let mut metrics = metrics_lock.as_mut();
316                        metrics.sockets += 1;
317                        metrics.raw.track(marks);
318                        responder.send(
319                            inner_provider
320                                .socket_with_options(
321                                    domain,
322                                    &proto,
323                                    &fposix_socket::SocketCreationOptions {
324                                        marks: Some(marks.into()),
325                                        ..Default::default()
326                                    },
327                                )
328                                .await?,
329                        )?
330                    }
331                    fposix_socket_raw::ProviderRequest::SocketWithOptions {
332                        domain,
333                        proto,
334                        opts,
335                        responder,
336                    } => {
337                        let marks = *self.marks.lock().await;
338                        let mut metrics_lock = self.metrics.lock().await;
339                        let mut metrics = metrics_lock.as_mut();
340                        metrics.sockets += 1;
341                        metrics.raw.track(marks);
342                        if let Some(opts_marks) = opts.marks {
343                            log::warn!(
344                                "raw socket marks supplied by creation opts {:?} \
345                                will be overriden by {:?}",
346                                opts_marks,
347                                marks
348                            );
349                        }
350                        responder.send(
351                            inner_provider
352                                .socket_with_options(
353                                    domain,
354                                    &proto,
355                                    &fposix_socket::SocketCreationOptions {
356                                        marks: Some(marks.into()),
357                                        ..opts
358                                    },
359                                )
360                                .await?,
361                        )?
362                    }
363                }
364
365                Ok(())
366            })
367            .await
368    }
369}