1use crate::InterfaceId;
6use anyhow::Context as _;
7use async_utils::stream::{Tagged, WithTag as _};
8use dns_server_watcher::DnsServers;
9use fidl::endpoints::{ControlHandle as _, Responder as _};
10use log::{error, info, warn};
11use std::collections::HashMap;
12
13mod token_map;
14
15use {
16 fidl_fuchsia_net as fnet, fidl_fuchsia_net_name as fnet_name,
17 fidl_fuchsia_net_policy_properties as fnp_properties,
18};
19
20pub(crate) struct NetworkTokenContents {
21 connection_id: ConnectionId,
22 interface_id: InterfaceId,
23}
24
25#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
26pub struct ConnectionId(usize);
27
28#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
29pub struct UpdateGeneration {
30 default_network: usize,
33
34 properties: usize,
37}
38
39#[derive(Clone, Debug, Default)]
40pub struct UpdateGenerations(HashMap<ConnectionId, UpdateGeneration>);
41
42impl UpdateGenerations {
43 fn default_network(&self, id: &ConnectionId) -> Option<usize> {
44 self.0.get(id).map(|g| g.default_network)
45 }
46
47 fn set_default_network(&mut self, id: ConnectionId, generation: UpdateGeneration) {
48 self.0.entry(id).or_default().default_network = generation.default_network;
49 }
50
51 fn properties(&self, id: &ConnectionId) -> Option<usize> {
52 self.0.get(id).map(|g| g.properties)
53 }
54
55 fn set_properties(&mut self, id: ConnectionId, generation: UpdateGeneration) {
56 self.0.entry(id).or_default().properties = generation.properties;
57 }
58
59 fn remove(&mut self, id: &ConnectionId) -> Option<UpdateGeneration> {
60 self.0.remove(id)
61 }
62}
63
64#[derive(Debug)]
65pub(crate) struct NetworkPropertyResponder {
66 token: fidl::EventPair,
67 watched_properties: Vec<fnp_properties::Property>,
68 responder: fnp_properties::NetworksWatchPropertiesResponder,
69}
70
71impl NetworkPropertyResponder {
72 fn respond(
73 self,
74 response: Result<&[fnp_properties::PropertyUpdate], fnp_properties::WatchError>,
75 ) -> Result<(), fidl::Error> {
76 self.responder.send(response)
77 }
78}
79
80#[derive(Default)]
81pub struct NetpolNetworksService {
82 current_generation: UpdateGeneration,
84 generations_by_connection: UpdateGenerations,
86 default_network_responders:
88 HashMap<ConnectionId, fnp_properties::NetworksWatchDefaultResponder>,
89 tokens: token_map::TokenMap<NetworkTokenContents>,
90 property_responders: HashMap<ConnectionId, NetworkPropertyResponder>,
92 network_properties: NetworkProperties,
94}
95
96#[derive(Default, Clone)]
97struct NetworkProperties {
98 default_network: Option<InterfaceId>,
100 socket_marks: HashMap<InterfaceId, fnet::Marks>,
102 dns_servers: Vec<fnet_name::DnsServer_>,
104}
105
106impl NetworkProperties {
107 fn apply(&mut self, update: PropertyUpdate) -> UpdatesApplied {
108 let mut updates = UpdatesApplied::default();
109
110 if let Some(new_default_network) = update.default_network {
111 updates.default_network = self.handle_default_network_update(new_default_network);
112 }
113
114 if let Some((netid, marks)) = update.socket_marks {
115 updates.socket_marks_network = self.handle_socket_marks_update(netid, marks);
116 }
117
118 if let Some(dns) = update.dns {
119 updates.dns_changed = self.dns_servers != dns;
120 self.dns_servers = dns;
121 }
122
123 updates
124 }
125
126 fn handle_default_network_update(
132 &mut self,
133 new_default_network: Option<InterfaceId>,
134 ) -> Option<Option<InterfaceId>> {
135 if new_default_network == self.default_network {
137 return None;
138 }
139
140 let old_default_network = self.default_network;
141 if let (None, Some(old_default_network_id)) = (new_default_network, old_default_network) {
142 let _: Option<_> = self.socket_marks.remove(&old_default_network_id);
143 }
144 self.default_network = new_default_network;
145 return Some(old_default_network);
146 }
147
148 fn handle_socket_marks_update(
154 &mut self,
155 netid: InterfaceId,
156 marks: fnet::Marks,
157 ) -> Option<InterfaceId> {
158 if self.socket_marks.contains_key(&netid)
161 && self
162 .socket_marks
163 .get(&netid)
164 .and_then(|old_marks| Some(*old_marks == marks))
165 .unwrap_or_default()
166 {
167 return None;
168 }
169
170 let _: Option<_> = self.socket_marks.insert(netid, marks);
171 return Some(netid);
172 }
173
174 fn maybe_respond(
175 &self,
176 network: &NetworkTokenContents,
177 responder: NetworkPropertyResponder,
178 ) -> Option<NetworkPropertyResponder> {
179 let mut updates = Vec::new();
180 updates.add_socket_marks(self, network, &responder);
181 updates.add_dns(self, network, &responder);
182
183 if updates.is_empty() {
184 Some(responder)
185 } else {
186 if let Err(e) = responder.respond(Ok(&updates)) {
187 warn!("Could not send to responder: {e}");
188 }
189 None
190 }
191 }
192}
193
194trait PropertyUpdates {
195 fn add_socket_marks(
196 &mut self,
197 properties: &NetworkProperties,
198 network: &NetworkTokenContents,
199 responder: &NetworkPropertyResponder,
200 );
201 fn add_dns(
202 &mut self,
203 properties: &NetworkProperties,
204 network: &NetworkTokenContents,
205 responder: &NetworkPropertyResponder,
206 );
207}
208
209impl PropertyUpdates for Vec<fnp_properties::PropertyUpdate> {
210 fn add_socket_marks(
211 &mut self,
212 properties: &NetworkProperties,
213 network: &NetworkTokenContents,
214 responder: &NetworkPropertyResponder,
215 ) {
216 if !responder.watched_properties.contains(&fnp_properties::Property::SocketMarks) {
217 return;
218 }
219 match properties.socket_marks.get(&network.interface_id) {
220 Some(marks) => self.push(fnp_properties::PropertyUpdate::SocketMarks(marks.clone())),
221 None => {}
222 }
223 }
224
225 fn add_dns(
226 &mut self,
227 properties: &NetworkProperties,
228 network: &NetworkTokenContents,
229 responder: &NetworkPropertyResponder,
230 ) {
231 if !responder.watched_properties.contains(&fnp_properties::Property::DnsConfiguration) {
232 return;
233 }
234
235 let interface_id = network.interface_id;
236 self.push(fnp_properties::PropertyUpdate::DnsConfiguration(
237 fnp_properties::DnsConfiguration {
238 servers: Some(
239 properties
240 .dns_servers
241 .iter()
242 .filter(|d| {
243 match &d.source {
244 Some(source) => match source {
245 fnet_name::DnsServerSource::StaticSource(_) => true,
246 fnet_name::DnsServerSource::SocketProxy(
247 fnet_name::SocketProxyDnsServerSource {
248 source_interface,
249 ..
250 },
251 )
252 | fnet_name::DnsServerSource::Dhcp(
253 fnet_name::DhcpDnsServerSource { source_interface, .. },
254 )
255 | fnet_name::DnsServerSource::Ndp(
256 fnet_name::NdpDnsServerSource { source_interface, .. },
257 )
258 | fnet_name::DnsServerSource::Dhcpv6(
259 fnet_name::Dhcpv6DnsServerSource {
260 source_interface, ..
261 },
262 ) => match (interface_id, source_interface) {
263 (_, None) => true,
264 (id1, Some(id2)) => id1.get() == *id2,
265 },
266
267 _ => {
268 error!("unhandled DnsServerSource: {source:?}");
269 false
270 }
271 },
272
273 None => true,
275 }
276 })
277 .cloned()
278 .collect::<Vec<_>>(),
279 ),
280 ..Default::default()
281 },
282 ));
283 }
284}
285
286#[derive(Default, Debug)]
287pub struct PropertyUpdate {
288 default_network: Option<Option<InterfaceId>>,
289 socket_marks: Option<(InterfaceId, fnet::Marks)>,
290 dns: Option<Vec<fnet_name::DnsServer_>>,
291}
292
293#[derive(Default, Debug)]
294struct UpdatesApplied {
295 default_network: Option<Option<InterfaceId>>,
297
298 socket_marks_network: Option<InterfaceId>,
300
301 dns_changed: bool,
303}
304
305impl PropertyUpdate {
306 pub fn default_network<N: TryInto<InterfaceId>>(
307 mut self,
308 network_id: N,
309 ) -> Result<Self, N::Error> {
310 self.default_network = Some(Some(network_id.try_into()?));
311 Ok(self)
312 }
313
314 pub fn default_network_lost(mut self) -> Self {
315 self.default_network = Some(None);
316 self
317 }
318
319 pub fn socket_marks<N: TryInto<InterfaceId>, Marks: Into<fnet::Marks>>(
320 mut self,
321 network_id: N,
322 marks: Marks,
323 ) -> Result<Self, N::Error> {
324 self.socket_marks = Some((network_id.try_into()?, marks.into()));
325 Ok(self)
326 }
327
328 pub fn dns(mut self, dns_servers: &DnsServers) -> Self {
329 self.dns = Some(dns_servers.consolidated_dns_servers());
330 self
331 }
332}
333
334impl NetpolNetworksService {
335 pub async fn handle_network_attributes_request(
336 &mut self,
337 id: ConnectionId,
338 req: Result<fnp_properties::NetworksRequest, fidl::Error>,
339 ) -> Result<(), anyhow::Error> {
340 let req = req.context("network attributes request")?;
341 match req {
342 fnp_properties::NetworksRequest::WatchDefault { responder } => {
343 match self.default_network_responders.entry(id) {
344 std::collections::hash_map::Entry::Occupied(_) => {
345 warn!(
346 "Only one call to fuchsia.net.policy.properties/Networks.WatchDefault \
347 may be active per connection"
348 );
349 responder
350 .control_handle()
351 .shutdown_with_epitaph(zx::Status::CONNECTION_ABORTED)
352 }
353 std::collections::hash_map::Entry::Vacant(vacant_entry) => {
354 let interface_id = if self
355 .generations_by_connection
356 .default_network(&id)
357 .unwrap_or_default()
358 < self.current_generation.default_network
359 {
360 match self.network_properties.default_network {
361 Some(interface_id) => Some(interface_id),
362 None => None,
363 }
364 } else {
365 None
366 };
367 if let Some(interface_id) = interface_id {
368 self.generations_by_connection
369 .set_default_network(id, self.current_generation);
370 let token = self
371 .tokens
372 .insert_data(NetworkTokenContents {
373 connection_id: id,
374 interface_id,
375 })
376 .await;
377 responder.send(
378 fnp_properties::NetworksWatchDefaultResponse::Network(
379 fnp_properties::NetworkToken {
380 value: Some(token),
381 ..Default::default()
382 },
383 ),
384 )?;
385
386 if let Some(responder) = self.property_responders.remove(&id) {
387 let _: Option<_> = self.generations_by_connection.remove(&id);
388 let _: Result<(), fidl::Error> = responder.respond(Err(
389 fnp_properties::WatchError::DefaultNetworkChanged,
390 ));
391 }
392 } else {
393 let _: &mut _ = vacant_entry.insert(responder);
394 }
395 }
396 }
397 }
398 fnp_properties::NetworksRequest::WatchProperties {
399 payload: fnp_properties::NetworksWatchPropertiesRequest { network, properties, .. },
400 responder,
401 } => match (network, properties) {
402 (None, _) | (_, None) => {
403 responder.send(Err(fnp_properties::WatchError::MissingRequiredArgument))?
404 }
405 (Some(network), Some(properties)) => {
406 if properties.is_empty() {
407 responder.send(Err(fnp_properties::WatchError::NoProperties))?;
408 } else if let Some(token) = network.value {
409 match self.property_responders.entry(id) {
410 std::collections::hash_map::Entry::Occupied(_) => {
411 warn!(
412 "Only one call to \
413 fuchsia.net.policy.properties/Networks.WatchProperties may be \
414 active per connection"
415 );
416 responder
417 .control_handle()
418 .shutdown_with_epitaph(zx::Status::CONNECTION_ABORTED)
419 }
420 std::collections::hash_map::Entry::Vacant(vacant_entry) => {
421 match self.tokens.get(&token).await {
422 None => {
423 warn!("Unknown network token. ({token:?}");
424 responder.send(Err(
425 fnp_properties::WatchError::InvalidNetworkToken,
426 ))?;
427 }
428 Some(network_contents) => {
429 if network_contents.connection_id != id {
430 warn!(
431 "Cannot watch a NetworkToken that was not created \
432 by this connection."
433 );
434 responder.send(Err(
435 fnp_properties::WatchError::InvalidNetworkToken,
436 ))?;
437 } else {
438 let responder = NetworkPropertyResponder {
439 token,
440 watched_properties: properties,
441 responder,
442 };
443 if self
444 .generations_by_connection
445 .properties(&id)
446 .unwrap_or_default()
447 < self.current_generation.properties
448 {
449 self.generations_by_connection
450 .set_properties(id, self.current_generation);
451 if let Some(responder) = self
452 .network_properties
453 .maybe_respond(&network_contents, responder)
454 {
455 let _: &mut NetworkPropertyResponder =
456 vacant_entry.insert(responder);
457 }
458 } else {
459 let _: &mut NetworkPropertyResponder =
460 vacant_entry.insert(responder);
461 }
462 }
463 }
464 }
465 }
466 }
467 } else {
468 responder.send(Err(fnp_properties::WatchError::InvalidNetworkToken))?;
469 }
470 }
471 },
472 _ => {
473 warn!("Received unexpected request {req:?}");
474 }
475 }
476
477 Ok(())
478 }
479
480 async fn changed_default_network(
481 error: fnp_properties::WatchError,
482 responders: &mut HashMap<ConnectionId, NetworkPropertyResponder>,
483 generations: &mut UpdateGenerations,
484 ) {
485 let mut r = HashMap::new();
486 std::mem::swap(&mut r, responders);
487 r = r
488 .into_iter()
489 .filter_map(|(id, responder)| {
490 let _: Option<_> = generations.remove(&id);
492 let _: Result<(), fidl::Error> = responder.respond(Err(error));
493 None
494 })
495 .collect::<HashMap<_, _>>();
496 std::mem::swap(&mut r, responders);
497 }
498
499 pub(crate) async fn remove_network<ID: Into<InterfaceId>>(&mut self, interface_id: ID) {
500 let interface_id = interface_id.into();
501 info!("Removing interface {interface_id}. Reporting NETWORK_GONE to all clients.");
502 let mut responders = HashMap::new();
503 std::mem::swap(&mut self.property_responders, &mut responders);
504 for (id, responder) in responders {
505 let network = match self.tokens.get(&responder.token).await {
506 Some(network) => network,
507 None => {
508 warn!("Could not fetch network data for responder");
509 continue;
510 }
511 };
512 if network.interface_id == interface_id {
513 if let Err(e) = responder.respond(Err(fnp_properties::WatchError::NetworkGone)) {
515 warn!("Could not send to responder: {e}");
516 }
517 } else {
518 if self.property_responders.insert(id, responder).is_some() {
519 error!("Re-inserted in an existing responder slot. This should be impossible.");
520 }
521 }
522 }
523 }
524
525 pub async fn update(&mut self, update: PropertyUpdate) {
526 self.current_generation.properties += 1;
527 let updates_applied = self.network_properties.apply(update);
528 let mut property_responders = HashMap::new();
529 std::mem::swap(&mut self.property_responders, &mut property_responders);
530
531 if updates_applied.default_network.is_some() {
532 if let Some(default_network) = self.network_properties.default_network {
533 self.current_generation.default_network += 1;
534 let mut responders = HashMap::new();
535 std::mem::swap(&mut self.default_network_responders, &mut responders);
536 for (id, responder) in responders {
537 self.generations_by_connection.set_default_network(id, self.current_generation);
538
539 let token = self
540 .tokens
541 .insert_data(NetworkTokenContents {
542 connection_id: id,
543 interface_id: default_network,
544 })
545 .await;
546
547 if let Err(e) =
548 responder.send(fnp_properties::NetworksWatchDefaultResponse::Network(
549 fnp_properties::NetworkToken {
550 value: Some(token),
551 ..Default::default()
552 },
553 ))
554 {
555 warn!("Could not send to responder: {e}");
556 }
557 }
558
559 NetpolNetworksService::changed_default_network(
560 fnp_properties::WatchError::DefaultNetworkChanged,
561 &mut property_responders,
562 &mut self.generations_by_connection,
563 )
564 .await;
565 } else {
566 self.current_generation.default_network += 1;
568 let mut responders = HashMap::new();
569 std::mem::swap(&mut self.default_network_responders, &mut responders);
570 for (id, responder) in responders {
571 self.generations_by_connection.set_default_network(id, self.current_generation);
572 if let Err(e) = responder.send(
573 fnp_properties::NetworksWatchDefaultResponse::NoDefaultNetwork(
574 fnp_properties::Empty,
575 ),
576 ) {
577 warn!("Could not send to responder: {e}");
578 }
579 }
580
581 NetpolNetworksService::changed_default_network(
582 fnp_properties::WatchError::DefaultNetworkLost,
583 &mut property_responders,
584 &mut self.generations_by_connection,
585 )
586 .await;
587 }
588 }
589
590 for (id, responder) in property_responders {
591 let mut updates = Vec::new();
592 let network = match self.tokens.get(&responder.token).await {
593 Some(network) => network,
594 None => {
595 warn!("Could not fetch network data for responder");
596 continue;
597 }
598 };
599
600 if let Some(network_id) = updates_applied.socket_marks_network {
601 if network.interface_id == network_id {
602 updates.add_socket_marks(&self.network_properties, &network, &responder);
603 }
604 }
605 if updates_applied.dns_changed {
606 updates.add_dns(&self.network_properties, &network, &responder);
607 }
608
609 self.generations_by_connection.set_properties(id, self.current_generation);
610 if updates.is_empty() {
611 if self.property_responders.insert(id, responder).is_some() {
612 warn!("Re-inserted in an existing responder slot. This should be impossible.");
613 }
614 } else {
615 if let Err(e) = responder.respond(Ok(&updates)) {
616 warn!("Could not send to responder: {e}");
617 }
618 }
619 }
620 }
621}
622
623#[derive(Default)]
624pub struct NetworksRequestStreams {
625 next_id: ConnectionId,
626 request_streams:
627 futures::stream::SelectAll<Tagged<ConnectionId, fnp_properties::NetworksRequestStream>>,
628}
629
630impl NetworksRequestStreams {
631 pub fn push(&mut self, stream: fnp_properties::NetworksRequestStream) {
632 self.request_streams.push(stream.tagged(self.next_id));
633 self.next_id.0 += 1;
634 }
635}
636
637impl futures::Stream for NetworksRequestStreams {
638 type Item = (ConnectionId, Result<fnp_properties::NetworksRequest, fidl::Error>);
639
640 fn poll_next(
641 mut self: std::pin::Pin<&mut Self>,
642 cx: &mut std::task::Context<'_>,
643 ) -> std::task::Poll<Option<Self::Item>> {
644 std::pin::Pin::new(&mut self.request_streams).poll_next(cx)
645 }
646}
647
648impl futures::stream::FusedStream for NetworksRequestStreams {
649 fn is_terminated(&self) -> bool {
650 self.request_streams.is_terminated()
651 }
652}