socket_proxy/
socket_provider.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Implements fuchsia.posix.socket.Provider and fuchsia.posix.socket.raw.Provider.
6
7use anyhow::{Context as _, Error};
8use fidl::endpoints::{ClientEnd, ProtocolMarker, Proxy as _};
9use fidl_fuchsia_net::{self as fnet, MarkDomain};
10use fidl_fuchsia_posix_socket::{self as fposix_socket, OptionalUint32};
11use fuchsia_component::client::connect_to_protocol;
12use fuchsia_inspect_derive::{IValue, Inspect, Unit};
13use futures::lock::Mutex;
14use futures::{Future, StreamExt as _, TryStreamExt as _};
15use std::sync::Arc;
16use {fidl_fuchsia_posix as fposix, fidl_fuchsia_posix_socket_raw as fposix_socket_raw};
17
18/// Inspect node for socket provider.
19///
20/// Manages the following inspect nodes:
21/// socket_provider:
22///   sockets = 0
23///   datagram:
24///     proxied = 0
25///     unmarked = 0
26///   raw:
27///     proxied = 0
28///     unmarked = 0
29///   stream:
30///     proxied = 0
31///     unmarked = 0
32///   synchronous_datagram:
33///     proxied = 0
34///     unmarked = 0
35#[derive(Unit, Default)]
36struct SocketProviderInspect {
37    sockets: u32,
38    stream: PerType,
39    synchronous_datagram: PerType,
40    datagram: PerType,
41    raw: PerType,
42}
43
44#[derive(Unit, Default)]
45struct PerType {
46    proxied: u32,
47    unmarked: u32,
48}
49
50impl PerType {
51    pub(crate) fn track(&mut self, marks: crate::SocketMarks) {
52        self.proxied += 1;
53        self.unmarked += u32::from(!marks.has_value());
54    }
55}
56
57trait Markable {
58    fn mark(
59        &self,
60        domain: MarkDomain,
61        mark: OptionalUint32,
62    ) -> impl Future<Output = Result<Result<(), fposix::Errno>, fidl::Error>>;
63}
64
65macro_rules! impl_markable {
66    ($($ty:ty),*) => {
67        $(
68            impl Markable for $ty {
69                fn mark(
70                    &self,
71                    domain: fnet::MarkDomain,
72                    mark: OptionalUint32,
73                ) -> impl Future<Output = Result<Result<(), fposix::Errno>, fidl::Error>> {
74                    self.set_mark(domain, &mark)
75                }
76            }
77        )*
78    };
79    ($($ty:ty),*,) => { impl_markable!($($ty),*); };
80}
81
82impl_markable!(
83    fposix_socket::StreamSocketProxy,
84    fposix_socket::SynchronousDatagramSocketProxy,
85    fposix_socket::DatagramSocketProxy,
86    fposix_socket_raw::SocketProxy,
87);
88
89trait Marked: Sized {
90    fn marked(
91        self,
92        marks: crate::SocketMarks,
93    ) -> impl Future<Output = Result<Result<Self, fposix::Errno>, Error>>;
94}
95
96impl<Marker> Marked for ClientEnd<Marker>
97where
98    Marker: ProtocolMarker,
99    Marker::Proxy: fidl::endpoints::Proxy<Protocol = Marker> + Markable,
100{
101    fn marked(
102        self,
103        marks: crate::SocketMarks,
104    ) -> impl Future<Output = Result<Result<Self, fposix::Errno>, Error>> {
105        async move {
106            let proxy = self.into_proxy();
107            Ok(
108                match Result::and(
109                    proxy.mark(MarkDomain::Mark1, marks.mark_1).await?,
110                    proxy.mark(MarkDomain::Mark2, marks.mark_2).await?,
111                ) {
112                    Ok(()) => Ok(proxy.into_client_end().map_err(|_| {
113                        anyhow::anyhow!("Failed to convert socket proxy back into client end")
114                    })?),
115
116                    Err(e) => Err(e),
117                },
118            )
119        }
120    }
121}
122
123#[derive(Inspect, Clone)]
124pub(crate) struct SocketProvider {
125    marks: Arc<Mutex<crate::SocketMarks>>,
126
127    #[inspect(forward)]
128    metrics: Arc<Mutex<IValue<SocketProviderInspect>>>,
129}
130
131impl SocketProvider {
132    pub(crate) fn new(mark: Arc<Mutex<crate::SocketMarks>>) -> Self {
133        Self { marks: mark, metrics: Default::default() }
134    }
135
136    /// Run an instance of fuchsia.posix.socket.Provider.
137    pub(crate) async fn run(
138        &self,
139        stream: fposix_socket::ProviderRequestStream,
140    ) -> Result<(), Error> {
141        let inner_provider = connect_to_protocol::<fposix_socket::ProviderMarker>()
142            .context("Failed to connect to inner server")?;
143        stream
144            .map(|result| result.context("failed request"))
145            .try_for_each(|request| async {
146                match request {
147                    fposix_socket::ProviderRequest::StreamSocket { domain, proto, responder } => {
148                        let marks = *self.marks.lock().await;
149                        let mut metrics_lock = self.metrics.lock().await;
150                        let mut metrics = metrics_lock.as_mut();
151                        metrics.sockets += 1;
152                        metrics.stream.track(marks);
153                        responder.send(
154                            inner_provider
155                                .stream_socket_with_options(domain, proto, &fposix_socket::SocketCreationOptions {
156                                    marks: Some(marks.into()),
157                                    ..Default::default()
158                                })
159                                .await?,
160                        )?;
161                    }
162                    fposix_socket::ProviderRequest::StreamSocketWithOptions {
163                        domain,
164                        proto,
165                        responder,
166                        opts,
167                    } => {
168                        let marks = *self.marks.lock().await;
169                        let mut metrics_lock = self.metrics.lock().await;
170                        let mut metrics = metrics_lock.as_mut();
171                        metrics.sockets += 1;
172                        metrics.stream.track(marks);
173                        if let Some(opts_marks) = opts.marks {
174                            log::warn!(
175                                "stream socket marks supplied by creation opts {:?} \
176                                will be overriden by {:?}",
177                                opts_marks,
178                                marks
179                            );
180                        }
181                        responder.send(
182                            inner_provider
183                                .stream_socket_with_options(domain, proto, &fposix_socket::SocketCreationOptions {
184                                    marks: Some(marks.into()),
185                                    ..opts
186                                })
187                                .await?,
188                        )?;
189                    }
190                    fposix_socket::ProviderRequest::DatagramSocketDeprecated {
191                        domain,
192                        proto,
193                        responder,
194                    } => {
195                        let marks = *self.marks.lock().await;
196                        let mut metrics_lock = self.metrics.lock().await;
197                        let mut metrics = metrics_lock.as_mut();
198                        metrics.sockets += 1;
199                        metrics.synchronous_datagram.track(marks);
200                        responder.send(
201                            match inner_provider.datagram_socket_deprecated(domain, proto).await? {
202                                Ok(socket) => socket.marked(marks).await?,
203                                e => e,
204                            },
205                        )?;
206                    }
207                    fposix_socket::ProviderRequest::DatagramSocket { domain, proto, responder } => {
208                        let marks = *self.marks.lock().await;
209                        let mut metrics_lock = self.metrics.lock().await;
210                        let mut metrics = metrics_lock.as_mut();
211                        metrics.sockets += 1;
212                        use fposix_socket::{
213                            ProviderDatagramSocketResponse, ProviderDatagramSocketWithOptionsResponse,
214                        };
215                        let response = inner_provider
216                            .datagram_socket_with_options(domain, proto, &fposix_socket::SocketCreationOptions {
217                                marks: Some(marks.into()),
218                                ..Default::default()
219                            })
220                            .await?
221                            .map(|response| {
222                                match response {
223                                ProviderDatagramSocketWithOptionsResponse::DatagramSocket(client_end)
224                                => {
225                                    ProviderDatagramSocketResponse::DatagramSocket(client_end)
226                                }
227                                ProviderDatagramSocketWithOptionsResponse::SynchronousDatagramSocket(
228                                    client_end,
229                                ) => ProviderDatagramSocketResponse::SynchronousDatagramSocket(
230                                    client_end,
231                                ),
232                            }
233                            });
234                        responder.send(response)?
235                    }
236                    fposix_socket::ProviderRequest::DatagramSocketWithOptions {
237                        domain,
238                        proto,
239                        opts,
240                        responder,
241                    } => {
242                        let marks = *self.marks.lock().await;
243                        let mut metrics_lock = self.metrics.lock().await;
244                        let mut metrics = metrics_lock.as_mut();
245                        metrics.sockets += 1;
246                        if let Some(opts_marks) = opts.marks {
247                            log::warn!(
248                                "datagram marks supplied by creation opts {:?} \
249                                will be overriden by {:?}",
250                                opts_marks,
251                                marks
252                            );
253                        }
254                        responder.send(
255                            inner_provider
256                                .datagram_socket_with_options(domain, proto, &fposix_socket::SocketCreationOptions {
257                                    marks: Some(marks.into()),
258                                    ..opts
259                                })
260                                .await?,
261                        )?
262                    }
263                    fposix_socket::ProviderRequest::InterfaceIndexToName { index, responder } => {
264                        let name = inner_provider.interface_index_to_name(index).await?;
265                        responder.send(match &name {
266                            Ok(n) => Ok(n),
267                            Err(e) => Err(*e),
268                        })?;
269                    }
270                    fposix_socket::ProviderRequest::InterfaceNameToIndex { name, responder } => {
271                        responder.send(inner_provider.interface_name_to_index(&name).await?)?
272                    }
273                    fposix_socket::ProviderRequest::InterfaceNameToFlags { name, responder } => {
274                        responder.send(inner_provider.interface_name_to_flags(&name).await?)?
275                    }
276                    fposix_socket::ProviderRequest::GetInterfaceAddresses { responder } => {
277                        responder.send(&inner_provider.get_interface_addresses().await?)?
278                    }
279                }
280
281                Ok(())
282            })
283            .await
284    }
285
286    /// Run an instance of fuchsia.posix.socket.raw.Provider.
287    pub(crate) async fn run_raw(
288        &self,
289        stream: fposix_socket_raw::ProviderRequestStream,
290    ) -> Result<(), Error> {
291        let inner_provider = connect_to_protocol::<fposix_socket_raw::ProviderMarker>()
292            .context("Failed to connect to inner server")?;
293        stream
294            .map(|result| result.context("failed request"))
295            .try_for_each(|request| async {
296                match request {
297                    fposix_socket_raw::ProviderRequest::Socket { domain, proto, responder } => {
298                        let marks = *self.marks.lock().await;
299                        let mut metrics_lock = self.metrics.lock().await;
300                        let mut metrics = metrics_lock.as_mut();
301                        metrics.sockets += 1;
302                        metrics.raw.track(marks);
303                        responder.send(
304                            inner_provider
305                                .socket_with_options(
306                                    domain,
307                                    &proto,
308                                    &fposix_socket::SocketCreationOptions {
309                                        marks: Some(marks.into()),
310                                        ..Default::default()
311                                    },
312                                )
313                                .await?,
314                        )?
315                    }
316                    fposix_socket_raw::ProviderRequest::SocketWithOptions {
317                        domain,
318                        proto,
319                        opts,
320                        responder,
321                    } => {
322                        let marks = *self.marks.lock().await;
323                        let mut metrics_lock = self.metrics.lock().await;
324                        let mut metrics = metrics_lock.as_mut();
325                        metrics.sockets += 1;
326                        metrics.raw.track(marks);
327                        if let Some(opts_marks) = opts.marks {
328                            log::warn!(
329                                "raw socket marks supplied by creation opts {:?} \
330                                will be overriden by {:?}",
331                                opts_marks,
332                                marks
333                            );
334                        }
335                        responder.send(
336                            inner_provider
337                                .socket_with_options(
338                                    domain,
339                                    &proto,
340                                    &fposix_socket::SocketCreationOptions {
341                                        marks: Some(marks.into()),
342                                        ..opts
343                                    },
344                                )
345                                .await?,
346                        )?
347                    }
348                }
349
350                Ok(())
351            })
352            .await
353    }
354}