1use crate::InterfaceId;
6use crate::telemetry::{NetworkEventMetadata, TelemetryEvent, TelemetrySender};
7use anyhow::Context as _;
8use async_utils::stream::{Tagged, WithTag as _};
9use dns_server_watcher::DnsServers;
10use fidl::endpoints::{ControlHandle as _, Responder as _};
11use log::{error, info, warn};
12use policy_properties::NetworkTokenExt as _;
13use std::collections::HashMap;
14use std::collections::hash_map::Entry;
15
16mod token_registry;
17
18use fidl_fuchsia_net as fnet;
19use fidl_fuchsia_net_name as fnet_name;
20use fidl_fuchsia_net_policy_properties as fnp_properties;
21use fidl_fuchsia_net_policy_socketproxy as fnp_socketproxy;
22use fidl_fuchsia_posix_socket as fposix_socket;
23
24#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
29pub enum NetworkId {
30 Fuchsia(InterfaceId),
31 Delegated(InterfaceId),
32}
33
34impl std::fmt::Display for NetworkId {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 match self {
37 NetworkId::Fuchsia(interface_id) => write!(f, "fuchsia:{interface_id}"),
38 NetworkId::Delegated(interface_id) => write!(f, "delegated:{interface_id}"),
39 }
40 }
41}
42
43impl NetworkId {
44 pub fn get(&self) -> InterfaceId {
45 match self {
46 NetworkId::Fuchsia(interface_id) => *interface_id,
47 NetworkId::Delegated(interface_id) => *interface_id,
48 }
49 }
50
51 pub fn fuchsia<I: Into<InterfaceId>>(id: I) -> Self {
52 NetworkId::Fuchsia(id.into())
53 }
54
55 pub fn delegated<I: Into<InterfaceId>>(id: I) -> Self {
56 NetworkId::Delegated(id.into())
57 }
58}
59
60#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
61pub(crate) struct NetworkTokenContents {
62 network_id: NetworkId,
63 is_default: bool,
64}
65
66#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
67pub struct ConnectionId(usize);
68
69#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
70pub struct UpdateGeneration {
71 default_network: usize,
74
75 properties: usize,
78}
79
80#[derive(Clone, Debug, Default)]
81pub struct UpdateGenerations(HashMap<ConnectionId, UpdateGeneration>);
82
83impl UpdateGenerations {
84 fn default_network(&self, id: &ConnectionId) -> Option<usize> {
85 self.0.get(id).map(|g| g.default_network)
86 }
87
88 fn set_default_network(&mut self, id: ConnectionId, generation: UpdateGeneration) {
89 self.0.entry(id).or_default().default_network = generation.default_network;
90 }
91
92 fn properties(&self, id: &ConnectionId) -> Option<usize> {
93 self.0.get(id).map(|g| g.properties)
94 }
95
96 fn set_properties(&mut self, id: ConnectionId, generation: UpdateGeneration) {
97 self.0.entry(id).or_default().properties = generation.properties;
98 }
99
100 fn remove(&mut self, id: &ConnectionId) -> Option<UpdateGeneration> {
101 self.0.remove(id)
102 }
103}
104
105trait SetMark {
106 fn set_mark(&mut self, domain: fnet::MarkDomain, value: Option<u32>);
107}
108
109impl SetMark for fnet::Marks {
110 fn set_mark(&mut self, domain: fnet::MarkDomain, value: Option<u32>) {
111 match domain {
112 fnet::MarkDomain::Mark1 => self.mark_1 = value,
113 fnet::MarkDomain::Mark2 => self.mark_2 = value,
114 }
115 }
116}
117
118#[derive(Debug)]
119pub(crate) struct NetworkPropertyResponder {
120 token: fnp_properties::NetworkToken,
121 watched_properties: Vec<fnp_properties::Property>,
122 responder: fnp_properties::NetworksWatchPropertiesResponder,
123}
124
125impl NetworkPropertyResponder {
126 fn respond(
127 self,
128 response: Result<&[fnp_properties::PropertyUpdate], fnp_properties::WatchError>,
129 ) -> Result<(), fidl::Error> {
130 self.responder.send(response)
131 }
132}
133
134#[derive(Default, Clone)]
135struct NetworkProperties {
136 socket_marks: Option<fnet::Marks>,
137 #[allow(dead_code)]
139 connectivity_state: Option<fnp_socketproxy::ConnectivityState>,
140 name: Option<String>,
141 network_type: Option<fnp_socketproxy::NetworkType>,
142}
143
144impl NetworkProperties {
145 fn get_marks(&self) -> Option<&fnet::Marks> {
146 self.socket_marks.as_ref()
147 }
148}
149
150#[derive(Default, Clone)]
152struct RegisteredNetworks {
153 default_network: Option<NetworkId>,
154 networks: HashMap<NetworkId, NetworkProperties>,
155 dns_servers: Vec<fnet_name::DnsServer_>,
156}
157
158impl RegisteredNetworks {
159 fn apply(&mut self, update: PropertyUpdate) -> UpdateApplied {
160 match update {
161 PropertyUpdate::LoseDefaultNetwork => self.handle_default_network_update(None),
162 PropertyUpdate::ChangeNetwork(network_id, network_change) => match network_change {
163 NetworkUpdate::Properties(event) => self.handle_changed_network(network_id, event),
164 NetworkUpdate::Remove => UpdateApplied::NetworkRemoved(network_id),
165 NetworkUpdate::MakeDefault => self.handle_default_network_update(Some(network_id)),
166 },
167 PropertyUpdate::UpdateDns(dns_servers) => {
168 if self.dns_servers != dns_servers {
169 self.dns_servers = dns_servers;
170 UpdateApplied::DnsChanged
171 } else {
172 UpdateApplied::None
173 }
174 }
175 }
176 }
177
178 fn handle_default_network_update(
184 &mut self,
185 new_default_network: Option<NetworkId>,
186 ) -> UpdateApplied {
187 if new_default_network == self.default_network {
189 return UpdateApplied::None;
190 }
191
192 let old_default_network = self.default_network;
193 self.default_network = new_default_network;
194 return UpdateApplied::DefaultNetworkChanged(old_default_network);
195 }
196
197 fn handle_changed_network(
202 &mut self,
203 network_id: NetworkId,
204 event: NetworkPropertiesChange,
205 ) -> UpdateApplied {
206 let NetworkPropertiesChange {
207 added,
208 marks: socket_marks,
209 connectivity_state,
210 name,
211 network_type,
212 } = event;
213 let entry = self.networks.entry(network_id);
214 let result = match (added, &entry, network_id, socket_marks) {
215 (true, Entry::Occupied(_), _, _) => Err("add already added network"),
216 (false, Entry::Vacant(_), _, _) => Err("update a non-added network"),
217 (_, _, NetworkId::Fuchsia(_), Some(_)) => Err("have a fuchsia network with marks"),
218 (_, _, NetworkId::Delegated(_), None) => Err("have a delegated network without marks"),
219 (_, _, NetworkId::Fuchsia(_), None) => Ok((NetworkProperties::default(), added)),
220 (_, entry, NetworkId::Delegated(_), Some(socket_marks)) => {
221 let changed = if let Entry::Occupied(e) = entry {
222 e.get().get_marks() != Some(&socket_marks)
223 } else {
224 true
225 };
226 Ok((
227 NetworkProperties { socket_marks: Some(socket_marks), ..Default::default() },
228 changed,
229 ))
230 }
231 };
232
233 match result {
234 Ok((mut properties, changed_marks)) => {
235 properties.connectivity_state = connectivity_state;
236 properties.network_type = network_type;
237 properties.name = name.clone();
238 let _ = entry.insert_entry(properties);
239 UpdateApplied::NetworkChanged {
240 network_id,
241 added,
242 changed_marks,
243 name,
244 network_type,
245 }
246 }
247 Err(e) => {
248 error!("Cannot {e}. Update ignored.");
249 UpdateApplied::None
250 }
251 }
252 }
253
254 fn maybe_respond(
255 &self,
256 network: &NetworkTokenContents,
257 responder: NetworkPropertyResponder,
258 ) -> Option<NetworkPropertyResponder> {
259 let mut updates = Vec::new();
260 updates.add_socket_marks(self, network, &responder);
261 updates.add_dns(self, network, &responder);
262
263 if updates.is_empty() {
264 Some(responder)
265 } else {
266 if let Err(e) = responder.respond(Ok(&updates)) {
267 warn!("Could not send to responder: {e}");
268 }
269 None
270 }
271 }
272}
273
274trait PropertyUpdates {
275 fn add_socket_marks(
276 &mut self,
277 network_registry: &RegisteredNetworks,
278 network: &NetworkTokenContents,
279 responder: &NetworkPropertyResponder,
280 );
281 fn add_dns(
282 &mut self,
283 network_registry: &RegisteredNetworks,
284 network: &NetworkTokenContents,
285 responder: &NetworkPropertyResponder,
286 );
287}
288
289impl PropertyUpdates for Vec<fnp_properties::PropertyUpdate> {
290 fn add_socket_marks(
291 &mut self,
292 network_registry: &RegisteredNetworks,
293 network: &NetworkTokenContents,
294 responder: &NetworkPropertyResponder,
295 ) {
296 if !responder.watched_properties.contains(&fnp_properties::Property::SocketMarks) {
297 return;
298 }
299
300 match network_registry.networks.get(&network.network_id) {
301 Some(network) => {
302 if let Some(socket_marks) = network.get_marks() {
303 self.push(fnp_properties::PropertyUpdate::SocketMarks(socket_marks.clone()));
304 }
305 return;
306 }
307 None => {
308 error!(
309 "State is inconsistent. We attempted to add marks for a \
310 network that is not known: {:?}",
311 network.network_id
312 );
313 }
314 }
315 }
316
317 fn add_dns(
318 &mut self,
319 network_registry: &RegisteredNetworks,
320 network: &NetworkTokenContents,
321 responder: &NetworkPropertyResponder,
322 ) {
323 if !responder.watched_properties.contains(&fnp_properties::Property::DnsConfiguration) {
324 return;
325 }
326
327 let interface_id = network.network_id;
328 self.push(fnp_properties::PropertyUpdate::DnsConfiguration(
329 fnp_properties::DnsConfiguration {
330 servers: Some(
331 network_registry
332 .dns_servers
333 .iter()
334 .filter(|d| {
335 match &d.source {
336 Some(source) => match source {
337 fnet_name::DnsServerSource::StaticSource(_) => true,
338 fnet_name::DnsServerSource::SocketProxy(
339 fnet_name::SocketProxyDnsServerSource {
340 source_interface,
341 ..
342 },
343 ) => match (interface_id, source_interface) {
344 (_, None) => true,
345 (id1, Some(id2)) => {
346 Ok(id1)
347 == InterfaceId::try_from(*id2)
348 .map(|id| NetworkId::delegated(id))
349 }
350 },
351 fnet_name::DnsServerSource::Dhcp(
352 fnet_name::DhcpDnsServerSource { source_interface, .. },
353 )
354 | fnet_name::DnsServerSource::Ndp(
355 fnet_name::NdpDnsServerSource { source_interface, .. },
356 )
357 | fnet_name::DnsServerSource::Dhcpv6(
358 fnet_name::Dhcpv6DnsServerSource {
359 source_interface, ..
360 },
361 ) => match (interface_id, source_interface) {
362 (_, None) => true,
363 (id1, Some(id2)) => {
364 Ok(id1)
365 == InterfaceId::try_from(*id2)
366 .map(|id| NetworkId::fuchsia(id))
367 }
368 },
369
370 _ => {
371 error!("unhandled DnsServerSource: {source:?}");
372 false
373 }
374 },
375
376 None => true,
378 }
379 })
380 .cloned()
381 .collect::<Vec<_>>(),
382 ),
383 ..Default::default()
384 },
385 ));
386 }
387}
388
389#[derive(Debug)]
391pub struct NetworkPropertiesChange {
392 pub added: bool,
395 pub marks: Option<fnet::Marks>,
397 pub connectivity_state: Option<fnp_socketproxy::ConnectivityState>,
399 pub name: Option<String>,
401 pub network_type: Option<fnp_socketproxy::NetworkType>,
403}
404
405#[derive(Debug)]
406pub enum NetworkUpdate {
407 Properties(NetworkPropertiesChange),
409 Remove,
410 MakeDefault,
411}
412
413#[derive(Debug, PartialEq, Eq)]
414enum UpdateApplied {
415 None,
417
418 DefaultNetworkChanged(Option<NetworkId>),
420
421 DnsChanged,
423
424 NetworkChanged {
426 network_id: NetworkId,
427 added: bool,
428 changed_marks: bool,
429 name: Option<String>,
430 network_type: Option<fnp_socketproxy::NetworkType>,
431 },
432
433 NetworkRemoved(NetworkId),
435}
436
437#[derive(Debug)]
438pub enum PropertyUpdate {
439 LoseDefaultNetwork,
440 ChangeNetwork(NetworkId, NetworkUpdate),
441 UpdateDns(Vec<fnet_name::DnsServer_>),
442}
443
444impl PropertyUpdate {
445 pub fn default_network_lost() -> Self {
446 PropertyUpdate::LoseDefaultNetwork
447 }
448
449 pub fn dns(dns_servers: &DnsServers) -> Self {
450 PropertyUpdate::UpdateDns(dns_servers.consolidated_dns_servers())
453 }
454}
455
456#[derive(Default)]
457pub struct NetpolNetworksService {
458 current_generation: UpdateGeneration,
460 generations_by_connection: UpdateGenerations,
462 default_network_responders:
464 HashMap<ConnectionId, fnp_properties::NetworksWatchDefaultResponder>,
465 tokens: token_registry::TokenRegistry<NetworkTokenContents>,
466 property_responders: HashMap<ConnectionId, NetworkPropertyResponder>,
468 network_registry: RegisteredNetworks,
470 telemetry: Option<TelemetrySender>,
471}
472
473impl NetpolNetworksService {
474 pub fn set_telemetry(&mut self, telemetry: TelemetrySender) {
475 self.telemetry = Some(telemetry);
476 }
477
478 pub async fn handle_network_attributes_request(
479 &mut self,
480 id: ConnectionId,
481 req: Result<fnp_properties::NetworksRequest, fidl::Error>,
482 ) -> Result<(), anyhow::Error> {
483 let req = req.context("network attributes request")?;
484 match req {
485 fnp_properties::NetworksRequest::WatchDefault { responder } => {
486 match self.default_network_responders.entry(id) {
487 std::collections::hash_map::Entry::Occupied(_) => {
488 warn!(
489 "Only one call to fuchsia.net.policy.properties/Networks.WatchDefault \
490 may be active per connection"
491 );
492 responder
493 .control_handle()
494 .shutdown_with_epitaph(zx::Status::CONNECTION_ABORTED)
495 }
496 std::collections::hash_map::Entry::Vacant(vacant_entry) => {
497 let network_id = if self
498 .generations_by_connection
499 .default_network(&id)
500 .unwrap_or_default()
501 < self.current_generation.default_network
502 {
503 self.network_registry.default_network
504 } else {
505 None
506 };
507 if let Some(network_id) = network_id {
508 self.generations_by_connection
509 .set_default_network(id, self.current_generation);
510 let token = self
511 .tokens
512 .ensure_token(NetworkTokenContents { network_id, is_default: true })
513 .get()
514 .duplicate()
515 .context("could not duplicate token")?;
516 responder.send(
517 fnp_properties::NetworksWatchDefaultResponse::Network(token),
518 )?;
519
520 if let Some(responder) = self.property_responders.remove(&id) {
521 let _: Option<_> = self.generations_by_connection.remove(&id);
522 let _: Result<(), fidl::Error> =
523 responder.respond(Err(fnp_properties::WatchError::NetworkGone));
524 }
525 } else {
526 let _: &mut _ = vacant_entry.insert(responder);
527 }
528 }
529 }
530 }
531 fnp_properties::NetworksRequest::WatchProperties {
532 payload: fnp_properties::NetworksWatchPropertiesRequest { network, properties, .. },
533 responder,
534 } => match (network, properties) {
535 (None, _) | (_, None) => {
536 responder.send(Err(fnp_properties::WatchError::MissingRequiredArgument))?
537 }
538 (Some(network), Some(properties)) => {
539 if properties.is_empty() {
540 responder.send(Err(fnp_properties::WatchError::NoProperties))?;
541 } else {
542 match self.property_responders.entry(id) {
543 std::collections::hash_map::Entry::Occupied(_) => {
544 warn!(
545 "Only one call to \
546 fuchsia.net.policy.properties/Networks.WatchProperties may be \
547 active per connection"
548 );
549 responder
550 .control_handle()
551 .shutdown_with_epitaph(zx::Status::CONNECTION_ABORTED)
552 }
553 std::collections::hash_map::Entry::Vacant(vacant_entry) => {
554 match self.tokens.get_contents(&network) {
555 Err(e) => {
556 warn!("Unknown network token. ({network:?}: {e})");
557 responder.send(Err(
558 fnp_properties::WatchError::InvalidNetworkToken,
559 ))?;
560 }
561 Ok(network_contents) => {
562 let responder = NetworkPropertyResponder {
563 token: network,
564 watched_properties: properties,
565 responder,
566 };
567 if self
568 .generations_by_connection
569 .properties(&id)
570 .unwrap_or_default()
571 < self.current_generation.properties
572 {
573 self.generations_by_connection
574 .set_properties(id, self.current_generation);
575 if let Some(responder) = self
576 .network_registry
577 .maybe_respond(&network_contents, responder)
578 {
579 let _: &mut NetworkPropertyResponder =
580 vacant_entry.insert(responder);
581 }
582 } else {
583 let _: &mut NetworkPropertyResponder =
584 vacant_entry.insert(responder);
585 }
586 }
587 }
588 }
589 }
590 }
591 }
592 },
593 _ => {
594 warn!("Received unexpected request {req:?}");
595 }
596 }
597
598 Ok(())
599 }
600
601 pub async fn handle_delegated_networks_update(
602 &mut self,
603 update: Result<fnp_socketproxy::NetworkRegistryRequest, fidl::Error>,
604 ) -> Result<(), anyhow::Error> {
605 use fnp_socketproxy::{
606 NetworkInfo, NetworkRegistryAddError, NetworkRegistryRemoveError,
607 NetworkRegistryRequest, NetworkRegistrySetDefaultError, NetworkRegistryUpdateError,
608 };
609
610 match update {
611 Err(e) => {
612 error!(
613 "Encountered error watching for delegated network \
614 updates: {e:?}"
615 );
616 Ok(())
617 }
618 Ok(NetworkRegistryRequest::SetDefault { network_id, responder }) => responder.send(
619 (async || match network_id {
620 fposix_socket::OptionalUint32::Value(interface_id) => {
623 self.update(PropertyUpdate::ChangeNetwork(
624 NetworkId::delegated(
625 InterfaceId::try_from(interface_id)
626 .map_err(|_| NetworkRegistrySetDefaultError::NotFound)?,
627 ),
628 NetworkUpdate::MakeDefault,
629 ))
630 .await;
631 Ok(())
632 }
633 fposix_socket::OptionalUint32::Unset(_) => {
634 self.update(PropertyUpdate::default_network_lost()).await;
635 Ok(())
636 }
637 })()
638 .await,
639 ),
640 Ok(NetworkRegistryRequest::Add { network, responder }) => responder.send(
641 (async || {
642 let network_id = network
643 .network_id
644 .and_then(|id| InterfaceId::try_from(id).ok())
645 .map(|id| NetworkId::delegated(id))
646 .ok_or(NetworkRegistryAddError::MissingNetworkId)?;
647 let NetworkInfo::Starnix(info) =
648 network.info.ok_or(NetworkRegistryAddError::MissingNetworkInfo)?
649 else {
650 return Err(NetworkRegistryAddError::MissingNetworkInfo);
651 };
652
653 let mut marks = fnet::Marks::default();
654 marks.set_mark(fnet::MARK_DOMAIN_SO_MARK, info.mark);
655
656 self.update(PropertyUpdate::ChangeNetwork(
659 network_id,
660 NetworkUpdate::Properties(NetworkPropertiesChange {
661 added: true,
662 marks: Some(marks),
663 connectivity_state: network.connectivity,
664 name: network.name,
665 network_type: network.network_type,
666 }),
667 ))
668 .await;
669 Ok(())
670 })()
671 .await,
672 ),
673 Ok(NetworkRegistryRequest::Update { network, responder }) => responder.send(
674 (async || {
675 let network_id = network
676 .network_id
677 .and_then(|id| InterfaceId::try_from(id).ok())
678 .map(|id| NetworkId::delegated(id))
679 .ok_or(NetworkRegistryUpdateError::MissingNetworkId)?;
680 let NetworkInfo::Starnix(info) =
681 network.info.ok_or(NetworkRegistryUpdateError::MissingNetworkInfo)?
682 else {
683 return Err(NetworkRegistryUpdateError::MissingNetworkInfo);
684 };
685
686 let mut marks = fnet::Marks::default();
687 marks.set_mark(fnet::MARK_DOMAIN_SO_MARK, info.mark);
688 self.update(PropertyUpdate::ChangeNetwork(
689 network_id,
690 NetworkUpdate::Properties(NetworkPropertiesChange {
691 added: false,
692 marks: Some(marks),
693 connectivity_state: network.connectivity,
694 name: network.name,
695 network_type: network.network_type,
696 }),
697 ))
698 .await;
699 Ok(())
700 })()
701 .await,
702 ),
703 Ok(NetworkRegistryRequest::Remove { network_id, responder }) => responder.send(
704 (async || {
705 self.update(PropertyUpdate::ChangeNetwork(
706 NetworkId::delegated(
707 InterfaceId::try_from(network_id)
711 .map_err(|_| NetworkRegistryRemoveError::NotFound)?,
712 ),
713 NetworkUpdate::Remove,
714 ))
715 .await;
716 Ok(())
717 })()
718 .await,
719 ),
720 }
721 .context("while handling DelegatedNetwork request")
722 }
723
724 pub(crate) async fn handle_network_token_resolver_request(
725 &mut self,
726 request: Result<fnp_properties::NetworkTokenResolverRequest, fidl::Error>,
727 ) -> Result<(), anyhow::Error> {
728 use fnp_properties::NetworkTokenResolverResolveTokenError as ResolveTokenError;
729
730 let request = request.context("while handling NetworkTokenResolver request")?;
731 match request {
732 fnp_properties::NetworkTokenResolverRequest::ResolveToken { token, responder } => {
733 let maybe_contents = self.tokens.get_contents(&token).copied();
734 match maybe_contents {
735 Err(e) => {
736 warn!("Unknown network token. ({token:?}: {e})");
737 responder.send(Err(ResolveTokenError::InvalidNetworkToken))?;
738 }
739 Ok(contents) => {
740 if contents.is_default {
741 let query = NetworkTokenContents { is_default: false, ..contents };
744 if let Some(tok) = self.tokens.get_token(&query) {
745 responder.send(tok.duplicate().map_err(|e| {
746 warn!("Encountered issue duplicating generated token. {e}");
747 ResolveTokenError::InvalidNetworkToken
748 }))?;
749 } else {
750 warn!("Requested canonical version of unregistered network.");
751 responder.send(Err(ResolveTokenError::InvalidNetworkToken))?;
752 }
753 } else {
754 responder.send(Ok(token))?;
755 }
756 }
757 }
758 }
759 fidl_fuchsia_net_policy_properties::NetworkTokenResolverRequest::_UnknownMethod {
760 ordinal,
761 control_handle,
762 method_type,
763 ..
764 } => warn!(
765 "Encountered unknown method call on NetworkTokenResolver: {ordinal} \
766 {control_handle:?} {method_type:?}"
767 ),
768 }
769
770 Ok(())
771 }
772
773 async fn changed_default_network(
774 &mut self,
775 previous_default_network: Option<NetworkId>,
776 responders: &mut HashMap<ConnectionId, NetworkPropertyResponder>,
777 ) {
778 let mut r = HashMap::new();
779 std::mem::swap(&mut r, responders);
780 r = r
781 .into_iter()
782 .filter_map(|(id, responder)| {
783 match self.tokens.get_contents(&responder.token) {
784 Ok(contents) => {
785 if contents.is_default {
787 let _: Option<_> = self.generations_by_connection.remove(&id);
788 let _: Result<(), fidl::Error> =
789 responder.respond(Err(fnp_properties::WatchError::NetworkGone));
790 return None;
791 }
792 }
793 Err(zx::Status::NOT_FOUND) => {
794 warn!("Token provided to get_contents is not valid.");
795 }
796 Err(e) => {
797 warn!("Encountered unknown issue while getting contents: {e}");
798 }
799 }
800 Some((id, responder))
801 })
802 .collect::<HashMap<_, _>>();
803 std::mem::swap(&mut r, responders);
804 self.tokens.drop_if(|&c| {
805 c.is_default && previous_default_network.is_some_and(|i| i == c.network_id)
806 });
807 }
808
809 pub(crate) async fn remove_network(&mut self, network_id: NetworkId) {
810 info!("Removing interface {network_id}. Reporting NETWORK_GONE to all clients.");
811 let mut responders = HashMap::new();
812 std::mem::swap(&mut self.property_responders, &mut responders);
813 for (id, responder) in responders {
814 let network = match self.tokens.get_contents(&responder.token) {
815 Ok(network) => network,
816 Err(e) => {
817 warn!("Could not fetch network data for responder: {e}");
818 continue;
819 }
820 };
821 if network.network_id == network_id {
822 if let Err(e) = responder.respond(Err(fnp_properties::WatchError::NetworkGone)) {
824 warn!("Could not send to responder: {e}");
825 }
826 } else {
827 if self.property_responders.insert(id, responder).is_some() {
828 error!("Re-inserted in an existing responder slot. This should be impossible.");
829 }
830 }
831 }
832 }
833
834 pub async fn update(&mut self, update: PropertyUpdate) {
835 self.current_generation.properties += 1;
836 let update_applied = self.network_registry.apply(update);
837 if let UpdateApplied::None = update_applied {
838 return;
840 }
841
842 let mut property_responders = HashMap::new();
843 std::mem::swap(&mut self.property_responders, &mut property_responders);
844
845 match update_applied {
846 UpdateApplied::DefaultNetworkChanged(previous_default) => {
847 self.changed_default_network(previous_default, &mut property_responders).await;
848 match self.network_registry.default_network {
849 Some(default_network) => {
850 if let Some(telemetry) = &self.telemetry {
851 if let Some(props) =
852 self.network_registry.networks.get(&default_network)
853 {
854 telemetry.send(TelemetryEvent::DefaultNetworkChanged(
855 NetworkEventMetadata {
856 id: default_network.get().get(),
857 name: props.name.clone(),
858 transport: props
859 .network_type
860 .unwrap_or(fnp_socketproxy::NetworkType::Unknown),
861 is_fuchsia_provisioned: matches!(
862 default_network,
863 NetworkId::Fuchsia(_)
864 ),
865 },
866 ));
867 } else {
868 warn!("Could not fetch network data for default network.");
869 }
870 }
871 self.current_generation.default_network += 1;
872 let mut responders = HashMap::new();
873 std::mem::swap(&mut self.default_network_responders, &mut responders);
874 for (id, responder) in responders {
875 self.generations_by_connection
876 .set_default_network(id, self.current_generation);
877 match self
878 .tokens
879 .ensure_token(NetworkTokenContents {
880 network_id: default_network,
881 is_default: true,
882 })
883 .get()
884 .duplicate()
885 {
886 Ok(token) => {
887 if let Err(e) = responder.send(
888 fnp_properties::NetworksWatchDefaultResponse::Network(
889 token,
890 ),
891 ) {
892 warn!("Could not send to responder: {e}");
893 }
894 }
895 Err(e) => warn!("Could not duplicate token: {e}"),
896 };
897 }
898 }
899 None => {
900 if let Some(telemetry) = &self.telemetry {
901 telemetry.send(TelemetryEvent::DefaultNetworkLost);
902 }
903 self.current_generation.default_network += 1;
905 let mut responders = HashMap::new();
906 std::mem::swap(&mut self.default_network_responders, &mut responders);
907 for (id, responder) in responders {
908 self.generations_by_connection
909 .set_default_network(id, self.current_generation);
910 if let Err(e) = responder.send(
911 fnp_properties::NetworksWatchDefaultResponse::NoDefaultNetwork(
912 fnp_properties::Empty,
913 ),
914 ) {
915 warn!("Could not send to responder: {e}");
916 }
917 }
918 }
919 }
920
921 return;
923 }
924 UpdateApplied::NetworkChanged { network_id, added: true, .. } => {
925 let _ = self
926 .tokens
927 .ensure_token(NetworkTokenContents { network_id, is_default: false });
928 }
929 UpdateApplied::NetworkRemoved(network_id) => {
930 self.tokens.drop_if(|c| !c.is_default && c.network_id == network_id);
931 }
932 UpdateApplied::NetworkChanged { added: false, .. } => {
933 }
936 UpdateApplied::DnsChanged => {}
939 UpdateApplied::None => {}
940 }
941
942 for (id, responder) in property_responders {
943 let mut updates = Vec::new();
944 let network = match self.tokens.get_contents(&responder.token) {
945 Ok(network) => network,
946 Err(e) => {
947 warn!("Could not fetch network data for responder: {e}");
948 continue;
949 }
950 };
951
952 if let UpdateApplied::NetworkChanged { network_id, changed_marks: true, .. } =
953 update_applied
954 {
955 if network.network_id == network_id {
956 updates.add_socket_marks(&self.network_registry, &network, &responder);
957 }
958 }
959 if let UpdateApplied::DnsChanged = update_applied {
960 updates.add_dns(&self.network_registry, &network, &responder);
961 }
962
963 self.generations_by_connection.set_properties(id, self.current_generation);
964 if updates.is_empty() {
965 if self.property_responders.insert(id, responder).is_some() {
966 warn!("Re-inserted in an existing responder slot. This should be impossible.");
967 }
968 } else {
969 if let Err(e) = responder.respond(Ok(&updates)) {
970 warn!("Could not send to responder: {e}");
971 }
972 }
973 }
974 }
975}
976
977pub struct ConnectionTagged<Stream: futures::Stream + Unpin> {
978 next_id: ConnectionId,
979 streams: futures::stream::SelectAll<Tagged<ConnectionId, Stream>>,
980}
981
982impl<Stream: futures::Stream + Unpin> Default for ConnectionTagged<Stream> {
983 fn default() -> Self {
984 Self { next_id: Default::default(), streams: Default::default() }
985 }
986}
987
988impl<Stream: futures::Stream + Unpin> ConnectionTagged<Stream> {
989 pub fn push(&mut self, stream: Stream) {
990 self.streams.push(stream.tagged(self.next_id));
991 self.next_id.0 += 1;
992 }
993}
994
995impl<Stream: futures::Stream + Unpin> futures::Stream for ConnectionTagged<Stream> {
996 type Item = (ConnectionId, <Stream as futures::Stream>::Item);
997
998 fn poll_next(
999 mut self: std::pin::Pin<&mut Self>,
1000 cx: &mut std::task::Context<'_>,
1001 ) -> std::task::Poll<Option<Self::Item>> {
1002 std::pin::Pin::new(&mut self.streams).poll_next(cx)
1003 }
1004}
1005
1006impl<Stream: futures::Stream + Unpin> futures::stream::FusedStream for ConnectionTagged<Stream> {
1007 fn is_terminated(&self) -> bool {
1008 self.streams.is_terminated()
1009 }
1010}
1011
1012#[cfg(test)]
1013mod tests {
1014 use super::*;
1015 use crate::InterfaceId;
1016 use std::num::NonZeroU64;
1017 const ID_1: InterfaceId = InterfaceId(NonZeroU64::new(1).unwrap());
1018 const ID_2: InterfaceId = InterfaceId(NonZeroU64::new(2).unwrap());
1019 const NAME_1: &str = "testif1";
1020 const NAME_2: &str = "testif2";
1021
1022 #[test]
1023 fn test_handle_changed_network_delegated() {
1024 let mut networks = RegisteredNetworks::default();
1025 let network_id = NetworkId::Delegated(ID_1);
1026 let marks = fnet::Marks { mark_1: Some(123), ..Default::default() };
1027
1028 let event = NetworkPropertiesChange {
1030 added: true,
1031 marks: Some(marks.clone()),
1032 connectivity_state: Some(fnp_socketproxy::ConnectivityState::FullConnectivity),
1033 name: Some(NAME_1.to_string()),
1034 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1035 };
1036 assert_eq!(
1037 networks.handle_changed_network(network_id, event),
1038 UpdateApplied::NetworkChanged {
1039 network_id,
1040 added: true,
1041 changed_marks: true,
1042 name: Some(NAME_1.to_string()),
1043 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1044 }
1045 );
1046
1047 let properties = networks.networks.get(&network_id).expect("network should be present");
1048 assert_eq!(properties.socket_marks, Some(marks.clone()));
1049 assert_eq!(
1050 properties.connectivity_state,
1051 Some(fnp_socketproxy::ConnectivityState::FullConnectivity)
1052 );
1053
1054 let event = NetworkPropertiesChange {
1056 added: false,
1057 marks: Some(marks.clone()),
1058 connectivity_state: Some(fnp_socketproxy::ConnectivityState::NoConnectivity),
1059 name: Some(NAME_1.to_string()),
1060 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1061 };
1062 assert_eq!(
1063 networks.handle_changed_network(network_id, event),
1064 UpdateApplied::NetworkChanged {
1065 network_id,
1066 added: false,
1067 changed_marks: false,
1068 name: Some(NAME_1.to_string()),
1069 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1070 }
1071 );
1072
1073 let properties = networks.networks.get(&network_id).expect("network should be present");
1074 assert_eq!(properties.socket_marks, Some(marks.clone()));
1075 assert_eq!(
1076 properties.connectivity_state,
1077 Some(fnp_socketproxy::ConnectivityState::NoConnectivity)
1078 );
1079
1080 let new_marks = fnet::Marks { mark_1: Some(456), ..Default::default() };
1082 let event = NetworkPropertiesChange {
1083 added: false,
1084 marks: Some(new_marks.clone()),
1085 connectivity_state: Some(fnp_socketproxy::ConnectivityState::FullConnectivity),
1086 name: Some(NAME_1.to_string()),
1087 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1088 };
1089 assert_eq!(
1090 networks.handle_changed_network(network_id, event),
1091 UpdateApplied::NetworkChanged {
1092 network_id,
1093 added: false,
1094 changed_marks: true,
1095 name: Some(NAME_1.to_string()),
1096 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1097 }
1098 );
1099
1100 let properties = networks.networks.get(&network_id).expect("network should be present");
1101 assert_eq!(properties.socket_marks, Some(new_marks));
1102 assert_eq!(
1103 properties.connectivity_state,
1104 Some(fnp_socketproxy::ConnectivityState::FullConnectivity)
1105 );
1106 }
1107
1108 #[test]
1109 fn test_handle_changed_network_fuchsia() {
1110 let mut networks = RegisteredNetworks::default();
1111 let network_id = NetworkId::Fuchsia(ID_2);
1112
1113 let event = NetworkPropertiesChange {
1115 added: true,
1116 marks: None,
1117 connectivity_state: Some(fnp_socketproxy::ConnectivityState::LocalConnectivity),
1118 name: Some(NAME_2.to_string()),
1119 network_type: Some(fnp_socketproxy::NetworkType::Wifi),
1120 };
1121 assert_eq!(
1122 networks.handle_changed_network(network_id, event),
1123 UpdateApplied::NetworkChanged {
1124 network_id,
1125 added: true,
1126 changed_marks: true,
1127 name: Some(NAME_2.to_string()),
1128 network_type: Some(fnp_socketproxy::NetworkType::Wifi),
1129 }
1130 );
1131
1132 let properties = networks.networks.get(&network_id).expect("network should be present");
1133 assert_eq!(properties.socket_marks, None);
1134 assert_eq!(
1135 properties.connectivity_state,
1136 Some(fnp_socketproxy::ConnectivityState::LocalConnectivity)
1137 );
1138
1139 let event = NetworkPropertiesChange {
1141 added: false,
1142 marks: None,
1143 connectivity_state: Some(fnp_socketproxy::ConnectivityState::FullConnectivity),
1144 name: Some(NAME_2.to_string()),
1145 network_type: Some(fnp_socketproxy::NetworkType::Wifi),
1146 };
1147 assert_eq!(
1148 networks.handle_changed_network(network_id, event),
1149 UpdateApplied::NetworkChanged {
1150 network_id,
1151 added: false,
1152 changed_marks: false,
1153 name: Some(NAME_2.to_string()),
1154 network_type: Some(fnp_socketproxy::NetworkType::Wifi),
1155 }
1156 );
1157
1158 let properties = networks.networks.get(&network_id).expect("network should be present");
1159 assert_eq!(
1160 properties.connectivity_state,
1161 Some(fnp_socketproxy::ConnectivityState::FullConnectivity)
1162 );
1163 }
1164
1165 #[test]
1166 fn test_handle_changed_network_validation() {
1167 let mut networks = RegisteredNetworks::default();
1168 let network_id = NetworkId::Delegated(ID_1);
1169 let marks = fnet::Marks { mark_1: Some(123), ..Default::default() };
1170
1171 let event = NetworkPropertiesChange {
1173 added: false,
1174 marks: Some(marks.clone()),
1175 connectivity_state: None,
1176 name: Some(NAME_1.to_string()),
1177 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1178 };
1179 assert_eq!(networks.handle_changed_network(network_id, event), UpdateApplied::None);
1180 let event = NetworkPropertiesChange {
1182 added: true,
1183 marks: Some(marks.clone()),
1184 connectivity_state: None,
1185 name: Some(NAME_1.to_string()),
1186 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1187 };
1188 assert_eq!(
1189 networks.handle_changed_network(network_id, event),
1190 UpdateApplied::NetworkChanged {
1191 network_id,
1192 added: true,
1193 changed_marks: true,
1194 name: Some(NAME_1.to_string()),
1195 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1196 }
1197 );
1198
1199 let event = NetworkPropertiesChange {
1201 added: true,
1202 marks: Some(marks.clone()),
1203 connectivity_state: None,
1204 name: Some(NAME_1.to_string()),
1205 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1206 };
1207 assert_eq!(networks.handle_changed_network(network_id, event), UpdateApplied::None);
1208
1209 let fuchsia_id = NetworkId::Fuchsia(ID_1);
1211 let event = NetworkPropertiesChange {
1212 added: true,
1213 marks: Some(marks.clone()),
1214 connectivity_state: None,
1215 name: Some(NAME_1.to_string()),
1216 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1217 };
1218 assert_eq!(networks.handle_changed_network(fuchsia_id, event), UpdateApplied::None);
1219
1220 let delegated_id = NetworkId::Delegated(ID_1);
1222 let event = NetworkPropertiesChange {
1223 added: true,
1224 marks: None,
1225 connectivity_state: None,
1226 name: Some(NAME_1.to_string()),
1227 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1228 };
1229 assert_eq!(networks.handle_changed_network(delegated_id, event), UpdateApplied::None);
1230 }
1231}