Skip to main content

fidl_fuchsia_net_filter_ext/
lib.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Extensions for the fuchsia.net.filter FIDL library.
6//!
7//! Note that this library as written is not meant for inclusion in the SDK. It
8//! is only meant to be used in conjunction with a netstack that is compiled
9//! against the same API level of the `fuchsia.net.filter` FIDL library. This
10//! library opts in to compile-time and runtime breakage when the FIDL library
11//! is evolved in order to enforce that it is updated along with the FIDL
12//! library itself.
13
14#[cfg(target_os = "fuchsia")]
15pub mod sync;
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::num::NonZeroU16;
20use std::ops::RangeInclusive;
21
22use async_utils::fold::FoldWhile;
23use fidl::marker::SourceBreaking;
24#[cfg(not(feature = "fdomain"))]
25use fidl_fuchsia_net_interfaces_ext as fnet_interfaces_ext;
26#[cfg(feature = "fdomain")]
27use fidl_fuchsia_net_interfaces_ext_fdomain as fnet_interfaces_ext;
28#[cfg(not(feature = "fdomain"))]
29use fidl_fuchsia_net_matchers_ext as fnet_matchers_ext;
30#[cfg(feature = "fdomain")]
31use fidl_fuchsia_net_matchers_ext_fdomain as fnet_matchers_ext;
32use flex_client::ProxyHasDomain as _;
33use flex_fuchsia_ebpf as febpf;
34use flex_fuchsia_net as fnet;
35use flex_fuchsia_net_filter as fnet_filter;
36use flex_fuchsia_net_root as fnet_root;
37use futures::{Stream, StreamExt as _, TryStreamExt as _};
38use thiserror::Error;
39
40/// Conversion errors from `fnet_filter` FIDL types to the
41/// equivalents defined in this module.
42#[derive(Debug, Error, PartialEq)]
43pub enum FidlConversionError {
44    #[error("union is of an unknown variant: {0}")]
45    UnknownUnionVariant(&'static str),
46    #[error("namespace ID not provided")]
47    MissingNamespaceId,
48    #[error("namespace domain not provided")]
49    MissingNamespaceDomain,
50    #[error("routine ID not provided")]
51    MissingRoutineId,
52    #[error("routine type not provided")]
53    MissingRoutineType,
54    #[error("IP installation hook not provided")]
55    MissingIpInstallationHook,
56    #[error("NAT installation hook not provided")]
57    MissingNatInstallationHook,
58    #[error("interface matcher specified an invalid ID of 0")]
59    ZeroInterfaceId,
60    #[error("invalid address range (start must be <= end)")]
61    InvalidAddressRange,
62    #[error("address range start and end addresses are not the same IP family")]
63    AddressRangeFamilyMismatch,
64    #[error("prefix length of subnet is longer than number of bits in IP address")]
65    SubnetPrefixTooLong,
66    #[error("host bits are set in subnet network")]
67    SubnetHostBitsSet,
68    #[error("invalid port matcher range (start must be <= end)")]
69    InvalidPortMatcherRange,
70    #[error("transparent proxy action specified an invalid local port of 0")]
71    UnspecifiedTransparentProxyPort,
72    #[error("NAT action specified an invalid rewrite port of 0")]
73    UnspecifiedNatPort,
74    #[error("invalid port range (start must be <= end)")]
75    InvalidPortRange,
76    #[error("non-error result variant could not be converted to an error")]
77    NotAnError,
78}
79
80impl From<fnet_matchers_ext::PortError> for FidlConversionError {
81    fn from(value: fnet_matchers_ext::PortError) -> Self {
82        match value {
83            fnet_matchers_ext::PortError::InvalidPortRange => {
84                FidlConversionError::InvalidPortMatcherRange
85            }
86        }
87    }
88}
89
90impl From<fnet_matchers_ext::InterfaceError> for FidlConversionError {
91    fn from(value: fnet_matchers_ext::InterfaceError) -> Self {
92        match value {
93            fnet_matchers_ext::InterfaceError::ZeroId => FidlConversionError::ZeroInterfaceId,
94            fnet_matchers_ext::InterfaceError::UnknownUnionVariant => {
95                FidlConversionError::UnknownUnionVariant(type_names::INTERFACE_MATCHER)
96            }
97            fnet_matchers_ext::InterfaceError::UnknownPortClass(unknown_port_class_error) => {
98                match unknown_port_class_error {
99                    fnet_interfaces_ext::UnknownPortClassError::NetInterfaces(_) => {
100                        FidlConversionError::UnknownUnionVariant(
101                            type_names::NET_INTERFACES_PORT_CLASS,
102                        )
103                    }
104                    fnet_interfaces_ext::UnknownPortClassError::HardwareNetwork(_) => {
105                        FidlConversionError::UnknownUnionVariant(
106                            type_names::HARDWARE_NETWORK_PORT_CLASS,
107                        )
108                    }
109                }
110            }
111        }
112    }
113}
114
115impl From<fnet_matchers_ext::AddressError> for FidlConversionError {
116    fn from(value: fnet_matchers_ext::AddressError) -> Self {
117        match value {
118            fnet_matchers_ext::AddressError::AddressMatcherType(address_matcher_type_error) => {
119                address_matcher_type_error.into()
120            }
121        }
122    }
123}
124
125impl From<fnet_matchers_ext::AddressMatcherTypeError> for FidlConversionError {
126    fn from(value: fnet_matchers_ext::AddressMatcherTypeError) -> Self {
127        match value {
128            fnet_matchers_ext::AddressMatcherTypeError::Subnet(subnet_error) => subnet_error.into(),
129            fnet_matchers_ext::AddressMatcherTypeError::AddressRange(address_range_error) => {
130                address_range_error.into()
131            }
132            fnet_matchers_ext::AddressMatcherTypeError::UnknownUnionVariant => {
133                FidlConversionError::UnknownUnionVariant(type_names::ADDRESS_MATCHER_TYPE)
134            }
135        }
136    }
137}
138
139impl From<fnet_matchers_ext::AddressRangeError> for FidlConversionError {
140    fn from(value: fnet_matchers_ext::AddressRangeError) -> Self {
141        match value {
142            fnet_matchers_ext::AddressRangeError::Invalid => {
143                FidlConversionError::InvalidAddressRange
144            }
145            fnet_matchers_ext::AddressRangeError::FamilyMismatch => {
146                FidlConversionError::AddressRangeFamilyMismatch
147            }
148        }
149    }
150}
151
152impl From<fnet_matchers_ext::SubnetError> for FidlConversionError {
153    fn from(value: fnet_matchers_ext::SubnetError) -> Self {
154        match value {
155            fnet_matchers_ext::SubnetError::PrefixTooLong => {
156                FidlConversionError::SubnetPrefixTooLong
157            }
158            fnet_matchers_ext::SubnetError::HostBitsSet => FidlConversionError::SubnetHostBitsSet,
159        }
160    }
161}
162
163impl From<fnet_matchers_ext::TransportProtocolError> for FidlConversionError {
164    fn from(value: fnet_matchers_ext::TransportProtocolError) -> Self {
165        match value {
166            fnet_matchers_ext::TransportProtocolError::Port(port_matcher_error) => {
167                port_matcher_error.into()
168            }
169            fnet_matchers_ext::TransportProtocolError::UnknownUnionVariant => {
170                FidlConversionError::UnknownUnionVariant(type_names::TRANSPORT_PROTOCOL)
171            }
172        }
173    }
174}
175
176// TODO(https://fxbug.dev/317058051): remove this when the Rust FIDL bindings
177// expose constants for these.
178mod type_names {
179    pub(super) const RESOURCE_ID: &str = "fuchsia.net.filter/ResourceId";
180    pub(super) const DOMAIN: &str = "fuchsia.net.filter/Domain";
181    pub(super) const IP_INSTALLATION_HOOK: &str = "fuchsia.net.filter/IpInstallationHook";
182    pub(super) const NAT_INSTALLATION_HOOK: &str = "fuchsia.net.filter/NatInstallationHook";
183    pub(super) const ROUTINE_TYPE: &str = "fuchsia.net.filter/RoutineType";
184    pub(super) const INTERFACE_MATCHER: &str = "fuchsia.net.matchers/Interface";
185    pub(super) const ADDRESS_MATCHER_TYPE: &str = "fuchsia.net.filter/AddressMatcherType";
186    pub(super) const TRANSPORT_PROTOCOL: &str = "fuchsia.net.matchers/TransportProtocol";
187    pub(super) const ACTION: &str = "fuchsia.net.filter/Action";
188    pub(super) const MARK_ACTION: &str = "fuchsia.net.filter/MarkAction";
189    pub(super) const TRANSPARENT_PROXY: &str = "fuchsia.net.filter/TransparentProxy";
190    pub(super) const RESOURCE: &str = "fuchsia.net.filter/Resource";
191    pub(super) const EVENT: &str = "fuchsia.net.filter/Event";
192    pub(super) const CHANGE: &str = "fuchsia.net.filter/Change";
193    pub(super) const CHANGE_VALIDATION_ERROR: &str = "fuchsia.net.filter/ChangeValidationError";
194    pub(super) const CHANGE_VALIDATION_RESULT: &str = "fuchsia.net.filter/ChangeValidationResult";
195    pub(super) const COMMIT_ERROR: &str = "fuchsia.net.filter/CommitError";
196    pub(super) const COMMIT_RESULT: &str = "fuchsia.net.filter/CommitResult";
197    pub(super) const NET_INTERFACES_PORT_CLASS: &str = "fuchsia.net.interfaces/PortClass";
198    pub(super) const HARDWARE_NETWORK_PORT_CLASS: &str = "fuchsia.hardware.network/PortClass";
199    pub(super) const REJECT_TYPE: &str = "fuchsia.net.filter/RejectType";
200}
201
202/// Extension type for [`fnet_filter::NamespaceId`].
203#[derive(Debug, Clone, PartialEq, Eq, Hash, Ord, PartialOrd)]
204pub struct NamespaceId(pub String);
205
206/// Extension type for [`fnet_filter::RoutineId`].
207#[derive(Debug, Clone, PartialEq, Eq, Hash)]
208pub struct RoutineId {
209    pub namespace: NamespaceId,
210    pub name: String,
211}
212
213impl From<fnet_filter::RoutineId> for RoutineId {
214    fn from(id: fnet_filter::RoutineId) -> Self {
215        let fnet_filter::RoutineId { namespace, name } = id;
216        Self { namespace: NamespaceId(namespace), name }
217    }
218}
219
220impl From<RoutineId> for fnet_filter::RoutineId {
221    fn from(id: RoutineId) -> Self {
222        let RoutineId { namespace, name } = id;
223        let NamespaceId(namespace) = namespace;
224        Self { namespace, name }
225    }
226}
227
228/// Extension type for [`fnet_filter::RuleId`].
229#[derive(Debug, Clone, PartialEq, Eq, Hash)]
230pub struct RuleId {
231    pub routine: RoutineId,
232    pub index: u32,
233}
234
235impl From<fnet_filter::RuleId> for RuleId {
236    fn from(id: fnet_filter::RuleId) -> Self {
237        let fnet_filter::RuleId { routine, index } = id;
238        Self { routine: routine.into(), index }
239    }
240}
241
242impl From<RuleId> for fnet_filter::RuleId {
243    fn from(id: RuleId) -> Self {
244        let RuleId { routine, index } = id;
245        Self { routine: routine.into(), index }
246    }
247}
248
249/// Extension type for [`fnet_filter::ResourceId`].
250#[derive(Debug, Clone, PartialEq, Eq, Hash)]
251pub enum ResourceId {
252    Namespace(NamespaceId),
253    Routine(RoutineId),
254    Rule(RuleId),
255}
256
257impl TryFrom<fnet_filter::ResourceId> for ResourceId {
258    type Error = FidlConversionError;
259
260    fn try_from(id: fnet_filter::ResourceId) -> Result<Self, Self::Error> {
261        match id {
262            fnet_filter::ResourceId::Namespace(id) => Ok(Self::Namespace(NamespaceId(id))),
263            fnet_filter::ResourceId::Routine(id) => Ok(Self::Routine(id.into())),
264            fnet_filter::ResourceId::Rule(id) => Ok(Self::Rule(id.into())),
265            fnet_filter::ResourceId::__SourceBreaking { .. } => {
266                Err(FidlConversionError::UnknownUnionVariant(type_names::RESOURCE_ID))
267            }
268        }
269    }
270}
271
272impl From<ResourceId> for fnet_filter::ResourceId {
273    fn from(id: ResourceId) -> Self {
274        match id {
275            ResourceId::Namespace(NamespaceId(id)) => fnet_filter::ResourceId::Namespace(id),
276            ResourceId::Routine(id) => fnet_filter::ResourceId::Routine(id.into()),
277            ResourceId::Rule(id) => fnet_filter::ResourceId::Rule(id.into()),
278        }
279    }
280}
281
282/// Extension type for [`fnet_filter::Domain`].
283#[derive(Debug, Clone, PartialEq)]
284pub enum Domain {
285    Ipv4,
286    Ipv6,
287    AllIp,
288}
289
290impl From<Domain> for fnet_filter::Domain {
291    fn from(domain: Domain) -> Self {
292        match domain {
293            Domain::Ipv4 => fnet_filter::Domain::Ipv4,
294            Domain::Ipv6 => fnet_filter::Domain::Ipv6,
295            Domain::AllIp => fnet_filter::Domain::AllIp,
296        }
297    }
298}
299
300impl TryFrom<fnet_filter::Domain> for Domain {
301    type Error = FidlConversionError;
302
303    fn try_from(domain: fnet_filter::Domain) -> Result<Self, Self::Error> {
304        match domain {
305            fnet_filter::Domain::Ipv4 => Ok(Self::Ipv4),
306            fnet_filter::Domain::Ipv6 => Ok(Self::Ipv6),
307            fnet_filter::Domain::AllIp => Ok(Self::AllIp),
308            fnet_filter::Domain::__SourceBreaking { .. } => {
309                Err(FidlConversionError::UnknownUnionVariant(type_names::DOMAIN))
310            }
311        }
312    }
313}
314
315/// Extension type for [`fnet_filter::Namespace`].
316#[derive(Debug, Clone, PartialEq)]
317pub struct Namespace {
318    pub id: NamespaceId,
319    pub domain: Domain,
320}
321
322impl From<Namespace> for fnet_filter::Namespace {
323    fn from(namespace: Namespace) -> Self {
324        let Namespace { id, domain } = namespace;
325        let NamespaceId(id) = id;
326        Self { id: Some(id), domain: Some(domain.into()), __source_breaking: SourceBreaking }
327    }
328}
329
330impl TryFrom<fnet_filter::Namespace> for Namespace {
331    type Error = FidlConversionError;
332
333    fn try_from(namespace: fnet_filter::Namespace) -> Result<Self, Self::Error> {
334        let fnet_filter::Namespace { id, domain, __source_breaking } = namespace;
335        let id = NamespaceId(id.ok_or(FidlConversionError::MissingNamespaceId)?);
336        let domain = domain.ok_or(FidlConversionError::MissingNamespaceDomain)?.try_into()?;
337        Ok(Self { id, domain })
338    }
339}
340
341/// Extension type for [`fnet_filter::IpInstallationHook`].
342#[derive(Debug, Clone, Copy, PartialEq)]
343pub enum IpHook {
344    Ingress,
345    LocalIngress,
346    Forwarding,
347    LocalEgress,
348    Egress,
349}
350
351impl From<IpHook> for fnet_filter::IpInstallationHook {
352    fn from(hook: IpHook) -> Self {
353        match hook {
354            IpHook::Ingress => Self::Ingress,
355            IpHook::LocalIngress => Self::LocalIngress,
356            IpHook::Forwarding => Self::Forwarding,
357            IpHook::LocalEgress => Self::LocalEgress,
358            IpHook::Egress => Self::Egress,
359        }
360    }
361}
362
363impl TryFrom<fnet_filter::IpInstallationHook> for IpHook {
364    type Error = FidlConversionError;
365
366    fn try_from(hook: fnet_filter::IpInstallationHook) -> Result<Self, Self::Error> {
367        match hook {
368            fnet_filter::IpInstallationHook::Ingress => Ok(Self::Ingress),
369            fnet_filter::IpInstallationHook::LocalIngress => Ok(Self::LocalIngress),
370            fnet_filter::IpInstallationHook::Forwarding => Ok(Self::Forwarding),
371            fnet_filter::IpInstallationHook::LocalEgress => Ok(Self::LocalEgress),
372            fnet_filter::IpInstallationHook::Egress => Ok(Self::Egress),
373            fnet_filter::IpInstallationHook::__SourceBreaking { .. } => {
374                Err(FidlConversionError::UnknownUnionVariant(type_names::IP_INSTALLATION_HOOK))
375            }
376        }
377    }
378}
379
380/// Extension type for [`fnet_filter::NatInstallationHook`].
381#[derive(Debug, Clone, Copy, PartialEq)]
382pub enum NatHook {
383    Ingress,
384    LocalIngress,
385    LocalEgress,
386    Egress,
387}
388
389impl From<NatHook> for fnet_filter::NatInstallationHook {
390    fn from(hook: NatHook) -> Self {
391        match hook {
392            NatHook::Ingress => Self::Ingress,
393            NatHook::LocalIngress => Self::LocalIngress,
394            NatHook::LocalEgress => Self::LocalEgress,
395            NatHook::Egress => Self::Egress,
396        }
397    }
398}
399
400impl TryFrom<fnet_filter::NatInstallationHook> for NatHook {
401    type Error = FidlConversionError;
402
403    fn try_from(hook: fnet_filter::NatInstallationHook) -> Result<Self, Self::Error> {
404        match hook {
405            fnet_filter::NatInstallationHook::Ingress => Ok(Self::Ingress),
406            fnet_filter::NatInstallationHook::LocalIngress => Ok(Self::LocalIngress),
407            fnet_filter::NatInstallationHook::LocalEgress => Ok(Self::LocalEgress),
408            fnet_filter::NatInstallationHook::Egress => Ok(Self::Egress),
409            fnet_filter::NatInstallationHook::__SourceBreaking { .. } => {
410                Err(FidlConversionError::UnknownUnionVariant(type_names::NAT_INSTALLATION_HOOK))
411            }
412        }
413    }
414}
415
416/// Extension type for [`fnet_filter::InstalledIpRoutine`].
417#[derive(Debug, Clone, PartialEq)]
418pub struct InstalledIpRoutine {
419    pub hook: IpHook,
420    pub priority: i32,
421}
422
423impl From<InstalledIpRoutine> for fnet_filter::InstalledIpRoutine {
424    fn from(routine: InstalledIpRoutine) -> Self {
425        let InstalledIpRoutine { hook, priority } = routine;
426        Self {
427            hook: Some(hook.into()),
428            priority: Some(priority),
429            __source_breaking: SourceBreaking,
430        }
431    }
432}
433
434impl TryFrom<fnet_filter::InstalledIpRoutine> for InstalledIpRoutine {
435    type Error = FidlConversionError;
436
437    fn try_from(routine: fnet_filter::InstalledIpRoutine) -> Result<Self, Self::Error> {
438        let fnet_filter::InstalledIpRoutine { hook, priority, __source_breaking } = routine;
439        let hook = hook.ok_or(FidlConversionError::MissingIpInstallationHook)?;
440        let priority = priority.unwrap_or(fnet_filter::DEFAULT_ROUTINE_PRIORITY);
441        Ok(Self { hook: hook.try_into()?, priority })
442    }
443}
444
445/// Extension type for [`fnet_filter::InstalledNatRoutine`].
446#[derive(Debug, Clone, PartialEq)]
447pub struct InstalledNatRoutine {
448    pub hook: NatHook,
449    pub priority: i32,
450}
451
452impl From<InstalledNatRoutine> for fnet_filter::InstalledNatRoutine {
453    fn from(routine: InstalledNatRoutine) -> Self {
454        let InstalledNatRoutine { hook, priority } = routine;
455        Self {
456            hook: Some(hook.into()),
457            priority: Some(priority),
458            __source_breaking: SourceBreaking,
459        }
460    }
461}
462
463impl TryFrom<fnet_filter::InstalledNatRoutine> for InstalledNatRoutine {
464    type Error = FidlConversionError;
465
466    fn try_from(routine: fnet_filter::InstalledNatRoutine) -> Result<Self, Self::Error> {
467        let fnet_filter::InstalledNatRoutine { hook, priority, __source_breaking } = routine;
468        let hook = hook.ok_or(FidlConversionError::MissingNatInstallationHook)?;
469        let priority = priority.unwrap_or(fnet_filter::DEFAULT_ROUTINE_PRIORITY);
470        Ok(Self { hook: hook.try_into()?, priority })
471    }
472}
473
474/// Extension type for [`fnet_filter::RoutineType`].
475#[derive(Debug, Clone, PartialEq)]
476pub enum RoutineType {
477    Ip(Option<InstalledIpRoutine>),
478    Nat(Option<InstalledNatRoutine>),
479}
480
481impl RoutineType {
482    pub fn is_installed(&self) -> bool {
483        // The `InstalledIpRoutine` or `InstalledNatRoutine` configuration is
484        // optional, and when omitted, signifies an uninstalled routine.
485        match self {
486            Self::Ip(Some(_)) | Self::Nat(Some(_)) => true,
487            Self::Ip(None) | Self::Nat(None) => false,
488        }
489    }
490}
491
492impl From<RoutineType> for fnet_filter::RoutineType {
493    fn from(routine: RoutineType) -> Self {
494        match routine {
495            RoutineType::Ip(installation) => Self::Ip(fnet_filter::IpRoutine {
496                installation: installation.map(Into::into),
497                __source_breaking: SourceBreaking,
498            }),
499            RoutineType::Nat(installation) => Self::Nat(fnet_filter::NatRoutine {
500                installation: installation.map(Into::into),
501                __source_breaking: SourceBreaking,
502            }),
503        }
504    }
505}
506
507impl TryFrom<fnet_filter::RoutineType> for RoutineType {
508    type Error = FidlConversionError;
509
510    fn try_from(type_: fnet_filter::RoutineType) -> Result<Self, Self::Error> {
511        match type_ {
512            fnet_filter::RoutineType::Ip(fnet_filter::IpRoutine {
513                installation,
514                __source_breaking,
515            }) => Ok(RoutineType::Ip(installation.map(TryInto::try_into).transpose()?)),
516            fnet_filter::RoutineType::Nat(fnet_filter::NatRoutine {
517                installation,
518                __source_breaking,
519            }) => Ok(RoutineType::Nat(installation.map(TryInto::try_into).transpose()?)),
520            fnet_filter::RoutineType::__SourceBreaking { .. } => {
521                Err(FidlConversionError::UnknownUnionVariant(type_names::ROUTINE_TYPE))
522            }
523        }
524    }
525}
526
527/// Extension type for [`fnet_filter::Routine`].
528#[derive(Debug, Clone, PartialEq)]
529pub struct Routine {
530    pub id: RoutineId,
531    pub routine_type: RoutineType,
532}
533
534impl From<Routine> for fnet_filter::Routine {
535    fn from(routine: Routine) -> Self {
536        let Routine { id, routine_type: type_ } = routine;
537        Self { id: Some(id.into()), type_: Some(type_.into()), __source_breaking: SourceBreaking }
538    }
539}
540
541impl TryFrom<fnet_filter::Routine> for Routine {
542    type Error = FidlConversionError;
543
544    fn try_from(routine: fnet_filter::Routine) -> Result<Self, Self::Error> {
545        let fnet_filter::Routine { id, type_, __source_breaking } = routine;
546        let id = id.ok_or(FidlConversionError::MissingRoutineId)?;
547        let type_ = type_.ok_or(FidlConversionError::MissingRoutineType)?;
548        Ok(Self { id: id.into(), routine_type: type_.try_into()? })
549    }
550}
551
552/// Extension type for [`fnet_filter::Matchers`].
553#[derive(Default, Clone, PartialEq)]
554pub struct Matchers {
555    pub in_interface: Option<fnet_matchers_ext::Interface>,
556    pub out_interface: Option<fnet_matchers_ext::Interface>,
557    pub src_addr: Option<fnet_matchers_ext::Address>,
558    pub dst_addr: Option<fnet_matchers_ext::Address>,
559    pub transport_protocol: Option<fnet_matchers_ext::TransportProtocol>,
560    pub ebpf_program: Option<febpf::ProgramId>,
561}
562
563impl From<Matchers> for fnet_filter::Matchers {
564    fn from(matchers: Matchers) -> Self {
565        let Matchers {
566            in_interface,
567            out_interface,
568            src_addr,
569            dst_addr,
570            transport_protocol,
571            ebpf_program,
572        } = matchers;
573        Self {
574            in_interface: in_interface.map(Into::into),
575            out_interface: out_interface.map(Into::into),
576            src_addr: src_addr.map(Into::into),
577            dst_addr: dst_addr.map(Into::into),
578            transport_protocol: transport_protocol.map(Into::into),
579            ebpf_program: ebpf_program.map(Into::into),
580            __source_breaking: SourceBreaking,
581        }
582    }
583}
584
585impl TryFrom<fnet_filter::Matchers> for Matchers {
586    type Error = FidlConversionError;
587
588    fn try_from(matchers: fnet_filter::Matchers) -> Result<Self, Self::Error> {
589        let fnet_filter::Matchers {
590            in_interface,
591            out_interface,
592            src_addr,
593            dst_addr,
594            transport_protocol,
595            ebpf_program,
596            __source_breaking,
597        } = matchers;
598        Ok(Self {
599            in_interface: in_interface.map(TryInto::try_into).transpose()?,
600            out_interface: out_interface.map(TryInto::try_into).transpose()?,
601            src_addr: src_addr.map(TryInto::try_into).transpose()?,
602            dst_addr: dst_addr.map(TryInto::try_into).transpose()?,
603            transport_protocol: transport_protocol.map(TryInto::try_into).transpose()?,
604            ebpf_program: ebpf_program.map(Into::into),
605        })
606    }
607}
608
609impl Debug for Matchers {
610    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
611        let mut debug_struct = f.debug_struct("Matchers");
612
613        let Matchers {
614            in_interface,
615            out_interface,
616            src_addr,
617            dst_addr,
618            transport_protocol,
619            ebpf_program,
620        } = &self;
621
622        // Omit empty fields.
623        if let Some(matcher) = in_interface {
624            let _ = debug_struct.field("in_interface", matcher);
625        }
626
627        if let Some(matcher) = out_interface {
628            let _ = debug_struct.field("out_interface", matcher);
629        }
630
631        if let Some(matcher) = src_addr {
632            let _ = debug_struct.field("src_addr", matcher);
633        }
634
635        if let Some(matcher) = dst_addr {
636            let _ = debug_struct.field("dst_addr", matcher);
637        }
638
639        if let Some(matcher) = transport_protocol {
640            let _ = debug_struct.field("transport_protocol", matcher);
641        }
642
643        if let Some(matcher) = ebpf_program {
644            let _ = debug_struct.field("ebpf_program", matcher);
645        }
646
647        debug_struct.finish()
648    }
649}
650
651/// Extension type for [`fnet_filter::Action`].
652#[derive(Debug, Clone, PartialEq)]
653pub enum Action {
654    Accept,
655    Drop,
656    Jump(String),
657    Return,
658    TransparentProxy(TransparentProxy),
659    Redirect { dst_port: Option<PortRange> },
660    Masquerade { src_port: Option<PortRange> },
661    Mark { domain: fnet::MarkDomain, action: MarkAction },
662    None,
663    Reject(RejectType),
664}
665
666#[derive(Debug, Clone, PartialEq)]
667pub enum MarkAction {
668    SetMark { clearing_mask: fnet::Mark, mark: fnet::Mark },
669}
670
671/// Extension type for [`fnet_filter::TransparentProxy_`].
672#[derive(Debug, Clone, PartialEq)]
673pub enum TransparentProxy {
674    LocalAddr(fnet::IpAddress),
675    LocalPort(NonZeroU16),
676    LocalAddrAndPort(fnet::IpAddress, NonZeroU16),
677}
678
679#[derive(Debug, Clone, PartialEq)]
680pub struct PortRange(pub RangeInclusive<NonZeroU16>);
681
682impl From<PortRange> for fnet_filter::PortRange {
683    fn from(range: PortRange) -> Self {
684        let PortRange(range) = range;
685        Self { start: range.start().get(), end: range.end().get() }
686    }
687}
688
689impl TryFrom<fnet_filter::PortRange> for PortRange {
690    type Error = FidlConversionError;
691
692    fn try_from(range: fnet_filter::PortRange) -> Result<Self, Self::Error> {
693        let fnet_filter::PortRange { start, end } = range;
694        if start > end {
695            Err(FidlConversionError::InvalidPortRange)
696        } else {
697            let start = NonZeroU16::new(start).ok_or(FidlConversionError::UnspecifiedNatPort)?;
698            let end = NonZeroU16::new(end).ok_or(FidlConversionError::UnspecifiedNatPort)?;
699            Ok(Self(start..=end))
700        }
701    }
702}
703
704#[derive(Debug, Clone, Copy, PartialEq)]
705pub enum RejectType {
706    TcpReset,
707    NetUnreachable,
708    HostUnreachable,
709    ProtoUnreachable,
710    PortUnreachable,
711    RoutePolicyFail,
712    RejectRoute,
713    AdminProhibited,
714}
715
716impl From<RejectType> for fnet_filter::RejectType {
717    fn from(value: RejectType) -> Self {
718        match value {
719            RejectType::TcpReset => fnet_filter::RejectType::TcpReset,
720            RejectType::NetUnreachable => fnet_filter::RejectType::NetUnreachable,
721            RejectType::HostUnreachable => fnet_filter::RejectType::HostUnreachable,
722            RejectType::ProtoUnreachable => fnet_filter::RejectType::ProtoUnreachable,
723            RejectType::PortUnreachable => fnet_filter::RejectType::PortUnreachable,
724            RejectType::RoutePolicyFail => fnet_filter::RejectType::RoutePolicyFail,
725            RejectType::RejectRoute => fnet_filter::RejectType::RejectRoute,
726            RejectType::AdminProhibited => fnet_filter::RejectType::AdminProhibited,
727        }
728    }
729}
730
731impl TryFrom<fnet_filter::RejectType> for RejectType {
732    type Error = FidlConversionError;
733
734    fn try_from(value: fnet_filter::RejectType) -> Result<Self, Self::Error> {
735        match value {
736            fnet_filter::RejectType::TcpReset => Ok(RejectType::TcpReset),
737            fnet_filter::RejectType::NetUnreachable => Ok(RejectType::NetUnreachable),
738            fnet_filter::RejectType::HostUnreachable => Ok(RejectType::HostUnreachable),
739            fnet_filter::RejectType::ProtoUnreachable => Ok(RejectType::ProtoUnreachable),
740            fnet_filter::RejectType::PortUnreachable => Ok(RejectType::PortUnreachable),
741            fnet_filter::RejectType::RoutePolicyFail => Ok(RejectType::RoutePolicyFail),
742            fnet_filter::RejectType::RejectRoute => Ok(RejectType::RejectRoute),
743            fnet_filter::RejectType::AdminProhibited => Ok(RejectType::AdminProhibited),
744            fnet_filter::RejectType::__SourceBreaking { .. } => {
745                Err(FidlConversionError::UnknownUnionVariant(type_names::REJECT_TYPE))
746            }
747        }
748    }
749}
750
751impl From<Action> for fnet_filter::Action {
752    fn from(action: Action) -> Self {
753        match action {
754            Action::Accept => Self::Accept(fnet_filter::Empty {}),
755            Action::Drop => Self::Drop(fnet_filter::Empty {}),
756            Action::Jump(target) => Self::Jump(target),
757            Action::Return => Self::Return_(fnet_filter::Empty {}),
758            Action::TransparentProxy(proxy) => Self::TransparentProxy(match proxy {
759                TransparentProxy::LocalAddr(addr) => {
760                    fnet_filter::TransparentProxy_::LocalAddr(addr)
761                }
762                TransparentProxy::LocalPort(port) => {
763                    fnet_filter::TransparentProxy_::LocalPort(port.get())
764                }
765                TransparentProxy::LocalAddrAndPort(addr, port) => {
766                    fnet_filter::TransparentProxy_::LocalAddrAndPort(fnet_filter::SocketAddr {
767                        addr,
768                        port: port.get(),
769                    })
770                }
771            }),
772            Action::Redirect { dst_port } => Self::Redirect(fnet_filter::Redirect {
773                dst_port: dst_port.map(Into::into),
774                __source_breaking: SourceBreaking,
775            }),
776            Action::Masquerade { src_port } => Self::Masquerade(fnet_filter::Masquerade {
777                src_port: src_port.map(Into::into),
778                __source_breaking: SourceBreaking,
779            }),
780            Action::Mark { domain, action } => {
781                Self::Mark(fnet_filter::Mark { domain, action: action.into() })
782            }
783            Action::None => Self::None(fnet_filter::Empty {}),
784            Action::Reject(reject_type) => {
785                Self::Reject(fnet_filter::Reject { reject_type: reject_type.into() })
786            }
787        }
788    }
789}
790
791impl TryFrom<fnet_filter::Action> for Action {
792    type Error = FidlConversionError;
793
794    fn try_from(action: fnet_filter::Action) -> Result<Self, Self::Error> {
795        match action {
796            fnet_filter::Action::Accept(fnet_filter::Empty {}) => Ok(Self::Accept),
797            fnet_filter::Action::Drop(fnet_filter::Empty {}) => Ok(Self::Drop),
798            fnet_filter::Action::Jump(target) => Ok(Self::Jump(target)),
799            fnet_filter::Action::Return_(fnet_filter::Empty {}) => Ok(Self::Return),
800            fnet_filter::Action::TransparentProxy(proxy) => {
801                Ok(Self::TransparentProxy(match proxy {
802                    fnet_filter::TransparentProxy_::LocalAddr(addr) => {
803                        TransparentProxy::LocalAddr(addr)
804                    }
805                    fnet_filter::TransparentProxy_::LocalPort(port) => {
806                        let port = NonZeroU16::new(port)
807                            .ok_or(FidlConversionError::UnspecifiedTransparentProxyPort)?;
808                        TransparentProxy::LocalPort(port)
809                    }
810                    fnet_filter::TransparentProxy_::LocalAddrAndPort(fnet_filter::SocketAddr {
811                        addr,
812                        port,
813                    }) => {
814                        let port = NonZeroU16::new(port)
815                            .ok_or(FidlConversionError::UnspecifiedTransparentProxyPort)?;
816                        TransparentProxy::LocalAddrAndPort(addr, port)
817                    }
818                    fnet_filter::TransparentProxy_::__SourceBreaking { .. } => {
819                        return Err(FidlConversionError::UnknownUnionVariant(
820                            type_names::TRANSPARENT_PROXY,
821                        ));
822                    }
823                }))
824            }
825            fnet_filter::Action::Redirect(fnet_filter::Redirect {
826                dst_port,
827                __source_breaking,
828            }) => Ok(Self::Redirect { dst_port: dst_port.map(TryInto::try_into).transpose()? }),
829            fnet_filter::Action::Masquerade(fnet_filter::Masquerade {
830                src_port,
831                __source_breaking,
832            }) => Ok(Self::Masquerade { src_port: src_port.map(TryInto::try_into).transpose()? }),
833            fnet_filter::Action::Mark(fnet_filter::Mark { domain, action }) => {
834                Ok(Self::Mark { domain, action: action.try_into()? })
835            }
836            fnet_filter::Action::__SourceBreaking { .. } => {
837                Err(FidlConversionError::UnknownUnionVariant(type_names::ACTION))
838            }
839            fnet_filter::Action::None(fnet_filter::Empty {}) => Ok(Self::None),
840            fnet_filter::Action::Reject(fnet_filter::Reject { reject_type }) => {
841                Ok(Self::Reject(reject_type.try_into()?))
842            }
843        }
844    }
845}
846
847impl From<MarkAction> for fnet_filter::MarkAction {
848    fn from(action: MarkAction) -> Self {
849        match action {
850            MarkAction::SetMark { clearing_mask, mark } => {
851                Self::SetMark(fnet_filter::SetMark { clearing_mask, mark })
852            }
853        }
854    }
855}
856
857impl TryFrom<fnet_filter::MarkAction> for MarkAction {
858    type Error = FidlConversionError;
859    fn try_from(action: fnet_filter::MarkAction) -> Result<Self, Self::Error> {
860        match action {
861            fnet_filter::MarkAction::SetMark(fnet_filter::SetMark { clearing_mask, mark }) => {
862                Ok(Self::SetMark { clearing_mask, mark })
863            }
864            fnet_filter::MarkAction::__SourceBreaking { .. } => {
865                Err(FidlConversionError::UnknownUnionVariant(type_names::MARK_ACTION))
866            }
867        }
868    }
869}
870
871/// Extension type for [`fnet_filter::Rule`].
872#[derive(Debug, Clone, PartialEq)]
873pub struct Rule {
874    pub id: RuleId,
875    pub matchers: Matchers,
876    pub action: Action,
877}
878
879impl From<Rule> for fnet_filter::Rule {
880    fn from(rule: Rule) -> Self {
881        let Rule { id, matchers, action } = rule;
882        Self { id: id.into(), matchers: matchers.into(), action: action.into() }
883    }
884}
885
886impl TryFrom<fnet_filter::Rule> for Rule {
887    type Error = FidlConversionError;
888
889    fn try_from(rule: fnet_filter::Rule) -> Result<Self, Self::Error> {
890        let fnet_filter::Rule { id, matchers, action } = rule;
891        Ok(Self { id: id.into(), matchers: matchers.try_into()?, action: action.try_into()? })
892    }
893}
894
895/// Extension type for [`fnet_filter::Resource`].
896#[derive(Debug, Clone, PartialEq)]
897pub enum Resource {
898    Namespace(Namespace),
899    Routine(Routine),
900    Rule(Rule),
901}
902
903impl Resource {
904    pub fn id(&self) -> ResourceId {
905        match self {
906            Self::Namespace(Namespace { id, domain: _ }) => ResourceId::Namespace(id.clone()),
907            Self::Routine(Routine { id, routine_type: _ }) => ResourceId::Routine(id.clone()),
908            Self::Rule(Rule { id, matchers: _, action: _ }) => ResourceId::Rule(id.clone()),
909        }
910    }
911}
912
913impl From<Resource> for fnet_filter::Resource {
914    fn from(resource: Resource) -> Self {
915        match resource {
916            Resource::Namespace(namespace) => Self::Namespace(namespace.into()),
917            Resource::Routine(routine) => Self::Routine(routine.into()),
918            Resource::Rule(rule) => Self::Rule(rule.into()),
919        }
920    }
921}
922
923impl TryFrom<fnet_filter::Resource> for Resource {
924    type Error = FidlConversionError;
925
926    fn try_from(resource: fnet_filter::Resource) -> Result<Self, Self::Error> {
927        match resource {
928            fnet_filter::Resource::Namespace(namespace) => {
929                Ok(Self::Namespace(namespace.try_into()?))
930            }
931            fnet_filter::Resource::Routine(routine) => Ok(Self::Routine(routine.try_into()?)),
932            fnet_filter::Resource::Rule(rule) => Ok(Self::Rule(rule.try_into()?)),
933            fnet_filter::Resource::__SourceBreaking { .. } => {
934                Err(FidlConversionError::UnknownUnionVariant(type_names::RESOURCE))
935            }
936        }
937    }
938}
939
940/// Extension type for [`fnet_filter::ControllerId`].
941#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
942pub struct ControllerId(pub String);
943
944/// Extension type for [`fnet_filter::Event`].
945#[derive(Debug, Clone, PartialEq)]
946pub enum Event {
947    Existing(ControllerId, Resource),
948    Idle,
949    Added(ControllerId, Resource),
950    Removed(ControllerId, ResourceId),
951    EndOfUpdate,
952}
953
954impl From<Event> for fnet_filter::Event {
955    fn from(event: Event) -> Self {
956        match event {
957            Event::Existing(controller, resource) => {
958                let ControllerId(id) = controller;
959                Self::Existing(fnet_filter::ExistingResource {
960                    controller: id,
961                    resource: resource.into(),
962                })
963            }
964            Event::Idle => Self::Idle(fnet_filter::Empty {}),
965            Event::Added(controller, resource) => {
966                let ControllerId(id) = controller;
967                Self::Added(fnet_filter::AddedResource {
968                    controller: id,
969                    resource: resource.into(),
970                })
971            }
972            Event::Removed(controller, resource) => {
973                let ControllerId(id) = controller;
974                Self::Removed(fnet_filter::RemovedResource {
975                    controller: id,
976                    resource: resource.into(),
977                })
978            }
979            Event::EndOfUpdate => Self::EndOfUpdate(fnet_filter::Empty {}),
980        }
981    }
982}
983
984impl TryFrom<fnet_filter::Event> for Event {
985    type Error = FidlConversionError;
986
987    fn try_from(event: fnet_filter::Event) -> Result<Self, Self::Error> {
988        match event {
989            fnet_filter::Event::Existing(fnet_filter::ExistingResource {
990                controller,
991                resource,
992            }) => Ok(Self::Existing(ControllerId(controller), resource.try_into()?)),
993            fnet_filter::Event::Idle(fnet_filter::Empty {}) => Ok(Self::Idle),
994            fnet_filter::Event::Added(fnet_filter::AddedResource { controller, resource }) => {
995                Ok(Self::Added(ControllerId(controller), resource.try_into()?))
996            }
997            fnet_filter::Event::Removed(fnet_filter::RemovedResource { controller, resource }) => {
998                Ok(Self::Removed(ControllerId(controller), resource.try_into()?))
999            }
1000            fnet_filter::Event::EndOfUpdate(fnet_filter::Empty {}) => Ok(Self::EndOfUpdate),
1001            fnet_filter::Event::__SourceBreaking { .. } => {
1002                Err(FidlConversionError::UnknownUnionVariant(type_names::EVENT))
1003            }
1004        }
1005    }
1006}
1007
1008/// Filter watcher creation errors.
1009#[derive(Debug, Error)]
1010pub enum WatcherCreationError {
1011    #[error("failed to create filter watcher proxy: {0}")]
1012    CreateProxy(fidl::Error),
1013    #[error("failed to get filter watcher: {0}")]
1014    GetWatcher(fidl::Error),
1015}
1016
1017/// Filter watcher `Watch` errors.
1018#[derive(Debug, Error)]
1019pub enum WatchError {
1020    /// The call to `Watch` returned a FIDL error.
1021    #[error("the call to `Watch()` failed: {0}")]
1022    Fidl(fidl::Error),
1023    /// The event returned by `Watch` encountered a conversion error.
1024    #[error("failed to convert event returned by `Watch()`: {0}")]
1025    Conversion(FidlConversionError),
1026    /// The server returned an empty batch of events.
1027    #[error("the call to `Watch()` returned an empty batch of events")]
1028    EmptyEventBatch,
1029}
1030
1031/// Connects to the watcher protocol and converts the Hanging-Get style API into
1032/// an Event stream.
1033///
1034/// Each call to `Watch` returns a batch of events, which are flattened into a
1035/// single stream. If an error is encountered while calling `Watch` or while
1036/// converting the event, the stream is immediately terminated.
1037pub fn event_stream_from_state(
1038    state: fnet_filter::StateProxy,
1039) -> Result<impl Stream<Item = Result<Event, WatchError>>, WatcherCreationError> {
1040    let (watcher, server_end) = state.domain().create_proxy::<fnet_filter::WatcherMarker>();
1041    state
1042        .get_watcher(&fnet_filter::WatcherOptions::default(), server_end)
1043        .map_err(WatcherCreationError::GetWatcher)?;
1044
1045    let stream = futures::stream::try_unfold(watcher, |watcher| async {
1046        let events = watcher.watch().await.map_err(WatchError::Fidl)?;
1047        if events.is_empty() {
1048            return Err(WatchError::EmptyEventBatch);
1049        }
1050
1051        let event_stream = futures::stream::iter(events).map(Ok).and_then(|event| {
1052            futures::future::ready(event.try_into().map_err(WatchError::Conversion))
1053        });
1054        Ok(Some((event_stream, watcher)))
1055    })
1056    .try_flatten();
1057
1058    Ok(stream)
1059}
1060
1061/// Errors returned by [`get_existing_resources`].
1062#[derive(Debug, Error)]
1063pub enum GetExistingResourcesError {
1064    /// There was an error in the event stream.
1065    #[error("there was an error in the event stream: {0}")]
1066    ErrorInStream(WatchError),
1067    /// There was an unexpected event in the event stream. Only `existing` or
1068    /// `idle` events are expected.
1069    #[error("there was an unexpected event in the event stream: {0:?}")]
1070    UnexpectedEvent(Event),
1071    /// A duplicate existing resource was reported in the event stream.
1072    #[error("a duplicate existing resource was reported")]
1073    DuplicateResource(Resource),
1074    /// The event stream unexpectedly ended.
1075    #[error("the event stream unexpectedly ended")]
1076    StreamEnded,
1077}
1078
1079/// A trait for types holding filtering state that can be updated by change
1080/// events.
1081pub trait Update {
1082    /// Add the resource to the specified controller's state.
1083    ///
1084    /// Optionally returns a resource that has already been added to the
1085    /// controller with the same [`ResourceId`].
1086    fn add(&mut self, controller: ControllerId, resource: Resource) -> Option<Resource>;
1087
1088    /// Remove the resource from the specified controller's state.
1089    ///
1090    /// Returns the removed resource, if present.
1091    fn remove(&mut self, controller: ControllerId, resource: &ResourceId) -> Option<Resource>;
1092}
1093
1094impl Update for HashMap<ControllerId, HashMap<ResourceId, Resource>> {
1095    fn add(&mut self, controller: ControllerId, resource: Resource) -> Option<Resource> {
1096        self.entry(controller).or_default().insert(resource.id(), resource)
1097    }
1098
1099    fn remove(&mut self, controller: ControllerId, resource: &ResourceId) -> Option<Resource> {
1100        self.get_mut(&controller)?.remove(resource)
1101    }
1102}
1103
1104/// Collects all `existing` events from the stream, stopping once the `idle`
1105/// event is observed.
1106#[allow(clippy::result_large_err)] // TODO(https://fxbug.dev/401253790)
1107pub async fn get_existing_resources<C: Update + Default>(
1108    stream: impl Stream<Item = Result<Event, WatchError>>,
1109) -> Result<C, GetExistingResourcesError> {
1110    async_utils::fold::fold_while(
1111        stream,
1112        Ok(C::default()),
1113        |resources: Result<C, GetExistingResourcesError>, event| {
1114            let mut resources =
1115                resources.expect("`resources` must be `Ok`, because we stop folding on err");
1116            futures::future::ready(match event {
1117                Err(e) => FoldWhile::Done(Err(GetExistingResourcesError::ErrorInStream(e))),
1118                Ok(e) => match e {
1119                    Event::Existing(controller, resource) => {
1120                        if let Some(resource) = resources.add(controller, resource) {
1121                            FoldWhile::Done(Err(GetExistingResourcesError::DuplicateResource(
1122                                resource,
1123                            )))
1124                        } else {
1125                            FoldWhile::Continue(Ok(resources))
1126                        }
1127                    }
1128                    Event::Idle => FoldWhile::Done(Ok(resources)),
1129                    e @ (Event::Added(_, _) | Event::Removed(_, _) | Event::EndOfUpdate) => {
1130                        FoldWhile::Done(Err(GetExistingResourcesError::UnexpectedEvent(e)))
1131                    }
1132                },
1133            })
1134        },
1135    )
1136    .await
1137    .short_circuited()
1138    .map_err(|_resources| GetExistingResourcesError::StreamEnded)?
1139}
1140
1141/// Errors returned by [`wait_for_condition`].
1142#[derive(Debug, Error)]
1143pub enum WaitForConditionError {
1144    /// There was an error in the event stream.
1145    #[error("there was an error in the event stream: {0}")]
1146    ErrorInStream(WatchError),
1147    /// There was an `Added` event for an already existing resource.
1148    #[error("observed an added event for an already existing resource: {0:?}")]
1149    AddedAlreadyExisting(Resource),
1150    /// There was a `Removed` event for a non-existent resource.
1151    #[error("observed a removed event for a non-existent resource: {0:?}")]
1152    RemovedNonExistent(ResourceId),
1153    /// The event stream unexpectedly ended.
1154    #[error("the event stream unexpectedly ended")]
1155    StreamEnded,
1156}
1157
1158/// Wait for a condition on filtering state to be satisfied.
1159///
1160/// With the given `initial_state`, take events from `event_stream` and update
1161/// the state, calling `predicate` whenever the state changes. When predicates
1162/// returns `True` yield `Ok(())`.
1163#[allow(clippy::result_large_err)] // TODO(https://fxbug.dev/401253790)
1164pub async fn wait_for_condition<
1165    C: Update,
1166    S: Stream<Item = Result<Event, WatchError>>,
1167    F: Fn(&C) -> bool,
1168>(
1169    event_stream: S,
1170    initial_state: &mut C,
1171    predicate: F,
1172) -> Result<(), WaitForConditionError> {
1173    async_utils::fold::try_fold_while(
1174        event_stream.map_err(WaitForConditionError::ErrorInStream),
1175        initial_state,
1176        |resources: &mut C, event| {
1177            futures::future::ready(match event {
1178                Event::Existing(controller, resource) | Event::Added(controller, resource) => {
1179                    if let Some(resource) = resources.add(controller, resource) {
1180                        Err(WaitForConditionError::AddedAlreadyExisting(resource))
1181                    } else {
1182                        Ok(FoldWhile::Continue(resources))
1183                    }
1184                }
1185                Event::Removed(controller, resource) => resources
1186                    .remove(controller, &resource)
1187                    .map(|_| FoldWhile::Continue(resources))
1188                    .ok_or(WaitForConditionError::RemovedNonExistent(resource)),
1189                // Wait until a transactional update has been completed to call
1190                // the predicate so it's not run against partially-updated
1191                // state.
1192                Event::Idle | Event::EndOfUpdate => {
1193                    if predicate(&resources) {
1194                        Ok(FoldWhile::Done(()))
1195                    } else {
1196                        Ok(FoldWhile::Continue(resources))
1197                    }
1198                }
1199            })
1200        },
1201    )
1202    .await?
1203    .short_circuited()
1204    .map_err(|_resources: &mut C| WaitForConditionError::StreamEnded)
1205}
1206
1207/// Namespace controller creation errors.
1208#[derive(Debug, Error)]
1209pub enum ControllerCreationError {
1210    #[error("failed to create namespace controller proxy: {0}")]
1211    CreateProxy(fidl::Error),
1212    #[error("failed to open namespace controller: {0}")]
1213    OpenController(fidl::Error),
1214    #[error("server did not emit OnIdAssigned event")]
1215    NoIdAssigned,
1216    #[error("failed to observe ID assignment event: {0}")]
1217    IdAssignment(fidl::Error),
1218}
1219
1220/// Errors for individual changes pushed.
1221///
1222/// Extension type for the error variants of [`fnet_filter::ChangeValidationError`].
1223#[derive(Debug, Error, PartialEq)]
1224pub enum ChangeValidationError {
1225    #[error("change contains a resource that is missing a required field")]
1226    MissingRequiredField,
1227    #[error("rule specifies an invalid interface matcher")]
1228    InvalidInterfaceMatcher,
1229    #[error("rule specifies an invalid address matcher")]
1230    InvalidAddressMatcher,
1231    #[error("rule specifies an invalid port matcher")]
1232    InvalidPortMatcher,
1233    #[error("rule specifies an invalid transparent proxy action")]
1234    InvalidTransparentProxyAction,
1235    #[error("rule specifies an invalid NAT action")]
1236    InvalidNatAction,
1237    #[error("rule specifies an invalid port range")]
1238    InvalidPortRange,
1239}
1240
1241impl TryFrom<fnet_filter::ChangeValidationError> for ChangeValidationError {
1242    type Error = FidlConversionError;
1243
1244    fn try_from(error: fnet_filter::ChangeValidationError) -> Result<Self, Self::Error> {
1245        match error {
1246            fnet_filter::ChangeValidationError::MissingRequiredField => {
1247                Ok(Self::MissingRequiredField)
1248            }
1249            fnet_filter::ChangeValidationError::InvalidInterfaceMatcher => {
1250                Ok(Self::InvalidInterfaceMatcher)
1251            }
1252            fnet_filter::ChangeValidationError::InvalidAddressMatcher => {
1253                Ok(Self::InvalidAddressMatcher)
1254            }
1255            fnet_filter::ChangeValidationError::InvalidPortMatcher => Ok(Self::InvalidPortMatcher),
1256            fnet_filter::ChangeValidationError::InvalidTransparentProxyAction => {
1257                Ok(Self::InvalidTransparentProxyAction)
1258            }
1259            fnet_filter::ChangeValidationError::InvalidNatAction => Ok(Self::InvalidNatAction),
1260            fnet_filter::ChangeValidationError::InvalidPortRange => Ok(Self::InvalidPortRange),
1261            fnet_filter::ChangeValidationError::Ok
1262            | fnet_filter::ChangeValidationError::NotReached => {
1263                Err(FidlConversionError::NotAnError)
1264            }
1265            fnet_filter::ChangeValidationError::__SourceBreaking { unknown_ordinal: _ } => {
1266                Err(FidlConversionError::UnknownUnionVariant(type_names::CHANGE_VALIDATION_ERROR))
1267            }
1268        }
1269    }
1270}
1271
1272#[derive(Debug, Error)]
1273pub enum RegisterEbpfProgramError {
1274    #[error("failed to call FIDL method: {0}")]
1275    CallMethod(fidl::Error),
1276
1277    #[error("failed to link the program")]
1278    LinkFailed,
1279
1280    #[error("failed to initialize a map")]
1281    MapFailed,
1282
1283    #[error("the program is already registered")]
1284    AlreadyRegistered,
1285
1286    #[error("the request is missing a required field")]
1287    MissingRequiredField,
1288}
1289
1290impl From<fnet_filter::RegisterEbpfProgramError> for RegisterEbpfProgramError {
1291    fn from(error: fnet_filter::RegisterEbpfProgramError) -> Self {
1292        match error {
1293            fnet_filter::RegisterEbpfProgramError::LinkFailed => Self::LinkFailed,
1294            fnet_filter::RegisterEbpfProgramError::MapFailed => Self::MapFailed,
1295            fnet_filter::RegisterEbpfProgramError::AlreadyRegistered => Self::AlreadyRegistered,
1296            fnet_filter::RegisterEbpfProgramError::MissingRequiredField => {
1297                Self::MissingRequiredField
1298            }
1299        }
1300    }
1301}
1302
1303/// Errors for the NamespaceController.PushChanges method.
1304#[derive(Debug, Error)]
1305pub enum PushChangesError {
1306    #[error("failed to call FIDL method: {0}")]
1307    CallMethod(fidl::Error),
1308    #[error("too many changes were pushed to the server")]
1309    TooManyChanges,
1310    #[error("invalid change(s) pushed: {0:?}")]
1311    ErrorOnChange(Vec<(Change, ChangeValidationError)>),
1312    #[error("unknown FIDL type: {0}")]
1313    FidlConversion(#[from] FidlConversionError),
1314}
1315
1316/// Errors for individual changes committed.
1317///
1318/// Extension type for the error variants of [`fnet_filter::CommitError`].
1319#[derive(Debug, Error, PartialEq)]
1320pub enum ChangeCommitError {
1321    #[error("the change referred to an unknown namespace")]
1322    NamespaceNotFound,
1323    #[error("the change referred to an unknown routine")]
1324    RoutineNotFound,
1325    #[error("the change referred to an unknown rule")]
1326    RuleNotFound,
1327    #[error("the specified resource already exists")]
1328    AlreadyExists,
1329    #[error("the change includes a rule that jumps to an installed routine")]
1330    TargetRoutineIsInstalled,
1331    #[error("the change includes an eBPF matcher with an invalid program ID")]
1332    InvalidEbpfProgramId,
1333}
1334
1335impl TryFrom<fnet_filter::CommitError> for ChangeCommitError {
1336    type Error = FidlConversionError;
1337
1338    fn try_from(error: fnet_filter::CommitError) -> Result<Self, Self::Error> {
1339        match error {
1340            fnet_filter::CommitError::NamespaceNotFound => Ok(Self::NamespaceNotFound),
1341            fnet_filter::CommitError::RoutineNotFound => Ok(Self::RoutineNotFound),
1342            fnet_filter::CommitError::RuleNotFound => Ok(Self::RuleNotFound),
1343            fnet_filter::CommitError::AlreadyExists => Ok(Self::AlreadyExists),
1344            fnet_filter::CommitError::TargetRoutineIsInstalled => {
1345                Ok(Self::TargetRoutineIsInstalled)
1346            }
1347            fnet_filter::CommitError::InvalidEbpfProgramId => Ok(Self::InvalidEbpfProgramId),
1348            fnet_filter::CommitError::Ok | fnet_filter::CommitError::NotReached => {
1349                Err(FidlConversionError::NotAnError)
1350            }
1351            fnet_filter::CommitError::__SourceBreaking { unknown_ordinal: _ } => {
1352                Err(FidlConversionError::UnknownUnionVariant(type_names::COMMIT_ERROR))
1353            }
1354        }
1355    }
1356}
1357
1358/// Errors for the NamespaceController.Commit method.
1359#[derive(Debug, Error)]
1360pub enum CommitError {
1361    #[error("failed to call FIDL method: {0}")]
1362    CallMethod(fidl::Error),
1363    #[error("rule has a matcher that is unavailable in its context: {0:?}")]
1364    RuleWithInvalidMatcher(RuleId),
1365    #[error("rule has an action that is invalid for its routine: {0:?}")]
1366    RuleWithInvalidAction(RuleId),
1367    #[error("rule has a TransparentProxy action but not a valid transport protocol matcher: {0:?}")]
1368    TransparentProxyWithInvalidMatcher(RuleId),
1369    #[error(
1370        "rule has a Redirect action that specifies a destination port but not a valid transport \
1371        protocol matcher: {0:?}"
1372    )]
1373    RedirectWithInvalidMatcher(RuleId),
1374    #[error(
1375        "rule has a Masquerade action that specifies a source port but not a valid transport \
1376        protocol matcher: {0:?}"
1377    )]
1378    MasqueradeWithInvalidMatcher(RuleId),
1379    #[error("rule has a Reject action but not a valid transport protocol matcher: {0:?}")]
1380    RejectWithInvalidMatcher(RuleId),
1381    #[error("routine forms a cycle {0:?}")]
1382    CyclicalRoutineGraph(RoutineId),
1383    #[error("invalid change was pushed: {0:?}")]
1384    ErrorOnChange(Vec<(Change, ChangeCommitError)>),
1385    #[error("unknown FIDL type: {0}")]
1386    FidlConversion(#[from] FidlConversionError),
1387}
1388
1389/// Extension type for [`fnet_filter::Change`].
1390#[derive(Debug, Clone, PartialEq)]
1391pub enum Change {
1392    Create(Resource),
1393    Remove(ResourceId),
1394}
1395
1396impl From<Change> for fnet_filter::Change {
1397    fn from(change: Change) -> Self {
1398        match change {
1399            Change::Create(resource) => Self::Create(resource.into()),
1400            Change::Remove(resource) => Self::Remove(resource.into()),
1401        }
1402    }
1403}
1404
1405impl TryFrom<fnet_filter::Change> for Change {
1406    type Error = FidlConversionError;
1407
1408    fn try_from(change: fnet_filter::Change) -> Result<Self, Self::Error> {
1409        match change {
1410            fnet_filter::Change::Create(resource) => Ok(Self::Create(resource.try_into()?)),
1411            fnet_filter::Change::Remove(resource) => Ok(Self::Remove(resource.try_into()?)),
1412            fnet_filter::Change::__SourceBreaking { .. } => {
1413                Err(FidlConversionError::UnknownUnionVariant(type_names::CHANGE))
1414            }
1415        }
1416    }
1417}
1418
1419/// A controller for filtering state.
1420pub struct Controller {
1421    controller: fnet_filter::NamespaceControllerProxy,
1422    // The client provides an ID when creating a new controller, but the server
1423    // may need to assign a different ID to avoid conflicts; either way, the
1424    // server informs the client of the final `ControllerId` on creation.
1425    id: ControllerId,
1426    // Changes that have been pushed to the server but not yet committed. This
1427    // allows the `Controller` to report more informative errors by correlating
1428    // error codes with particular changes.
1429    pending_changes: Vec<Change>,
1430}
1431
1432impl Controller {
1433    pub async fn new_root(
1434        root: &fnet_root::FilterProxy,
1435        ControllerId(id): &ControllerId,
1436    ) -> Result<Self, ControllerCreationError> {
1437        let (controller, server_end) =
1438            root.domain().create_proxy::<fnet_filter::NamespaceControllerMarker>();
1439        root.open_controller(id, server_end).map_err(ControllerCreationError::OpenController)?;
1440
1441        let fnet_filter::NamespaceControllerEvent::OnIdAssigned { id } = controller
1442            .take_event_stream()
1443            .next()
1444            .await
1445            .ok_or(ControllerCreationError::NoIdAssigned)?
1446            .map_err(ControllerCreationError::IdAssignment)?;
1447        Ok(Self { controller, id: ControllerId(id), pending_changes: Vec::new() })
1448    }
1449
1450    /// Creates a new `Controller`.
1451    ///
1452    /// Note that the provided `ControllerId` may need to be modified server-
1453    /// side to avoid collisions; to obtain the final ID assigned to the
1454    /// `Controller`, use the `id` method.
1455    pub async fn new(
1456        control: &fnet_filter::ControlProxy,
1457        ControllerId(id): &ControllerId,
1458    ) -> Result<Self, ControllerCreationError> {
1459        let (controller, server_end) =
1460            control.domain().create_proxy::<fnet_filter::NamespaceControllerMarker>();
1461        control.open_controller(id, server_end).map_err(ControllerCreationError::OpenController)?;
1462
1463        let fnet_filter::NamespaceControllerEvent::OnIdAssigned { id } = controller
1464            .take_event_stream()
1465            .next()
1466            .await
1467            .ok_or(ControllerCreationError::NoIdAssigned)?
1468            .map_err(ControllerCreationError::IdAssignment)?;
1469        Ok(Self { controller, id: ControllerId(id), pending_changes: Vec::new() })
1470    }
1471
1472    pub fn id(&self) -> &ControllerId {
1473        &self.id
1474    }
1475
1476    pub async fn register_ebpf_program(
1477        &mut self,
1478        handle: febpf::ProgramHandle,
1479        program: febpf::VerifiedProgram,
1480    ) -> Result<(), RegisterEbpfProgramError> {
1481        self.controller
1482            .register_ebpf_program(handle, program)
1483            .await
1484            .map_err(RegisterEbpfProgramError::CallMethod)?
1485            .map_err(RegisterEbpfProgramError::from)
1486    }
1487
1488    pub async fn push_changes(&mut self, changes: Vec<Change>) -> Result<(), PushChangesError> {
1489        let fidl_changes = changes.iter().cloned().map(Into::into).collect::<Vec<_>>();
1490        let result = self
1491            .controller
1492            .push_changes(&fidl_changes)
1493            .await
1494            .map_err(PushChangesError::CallMethod)?;
1495        handle_change_validation_result(result, &changes)?;
1496        // Maintain a client-side copy of the pending changes we've pushed to
1497        // the server in order to provide better error messages if a commit
1498        // fails.
1499        self.pending_changes.extend(changes);
1500        Ok(())
1501    }
1502
1503    async fn commit_with_options(
1504        &mut self,
1505        options: fnet_filter::CommitOptions,
1506    ) -> Result<(), CommitError> {
1507        let committed_changes = std::mem::take(&mut self.pending_changes);
1508        let result = self.controller.commit(options).await.map_err(CommitError::CallMethod)?;
1509        handle_commit_result(result, committed_changes)
1510    }
1511
1512    pub async fn commit(&mut self) -> Result<(), CommitError> {
1513        self.commit_with_options(fnet_filter::CommitOptions::default()).await
1514    }
1515
1516    pub async fn commit_idempotent(&mut self) -> Result<(), CommitError> {
1517        self.commit_with_options(fnet_filter::CommitOptions {
1518            idempotent: Some(true),
1519            __source_breaking: SourceBreaking,
1520        })
1521        .await
1522    }
1523}
1524
1525pub(crate) fn handle_change_validation_result(
1526    change_validation_result: fnet_filter::ChangeValidationResult,
1527    changes: &Vec<Change>,
1528) -> Result<(), PushChangesError> {
1529    match change_validation_result {
1530        fnet_filter::ChangeValidationResult::Ok(fnet_filter::Empty {}) => Ok(()),
1531        fnet_filter::ChangeValidationResult::TooManyChanges(fnet_filter::Empty {}) => {
1532            Err(PushChangesError::TooManyChanges)
1533        }
1534        fnet_filter::ChangeValidationResult::ErrorOnChange(results) => {
1535            let errors: Result<_, PushChangesError> =
1536                changes.iter().zip(results).try_fold(Vec::new(), |mut errors, (change, result)| {
1537                    match result {
1538                        fnet_filter::ChangeValidationError::Ok
1539                        | fnet_filter::ChangeValidationError::NotReached => Ok(errors),
1540                        error @ (fnet_filter::ChangeValidationError::MissingRequiredField
1541                        | fnet_filter::ChangeValidationError::InvalidInterfaceMatcher
1542                        | fnet_filter::ChangeValidationError::InvalidAddressMatcher
1543                        | fnet_filter::ChangeValidationError::InvalidPortMatcher
1544                        | fnet_filter::ChangeValidationError::InvalidTransparentProxyAction
1545                        | fnet_filter::ChangeValidationError::InvalidNatAction
1546                        | fnet_filter::ChangeValidationError::InvalidPortRange) => {
1547                            let error = error
1548                                .try_into()
1549                                .expect("`Ok` and `NotReached` are handled in another arm");
1550                            errors.push((change.clone(), error));
1551                            Ok(errors)
1552                        }
1553                        fnet_filter::ChangeValidationError::__SourceBreaking { .. } => {
1554                            Err(FidlConversionError::UnknownUnionVariant(
1555                                type_names::CHANGE_VALIDATION_ERROR,
1556                            )
1557                            .into())
1558                        }
1559                    }
1560                });
1561            Err(PushChangesError::ErrorOnChange(errors?))
1562        }
1563        fnet_filter::ChangeValidationResult::__SourceBreaking { .. } => {
1564            Err(FidlConversionError::UnknownUnionVariant(type_names::CHANGE_VALIDATION_RESULT)
1565                .into())
1566        }
1567    }
1568}
1569
1570pub(crate) fn handle_commit_result(
1571    commit_result: fnet_filter::CommitResult,
1572    committed_changes: Vec<Change>,
1573) -> Result<(), CommitError> {
1574    match commit_result {
1575        fnet_filter::CommitResult::Ok(fnet_filter::Empty {}) => Ok(()),
1576        fnet_filter::CommitResult::RuleWithInvalidMatcher(rule_id) => {
1577            Err(CommitError::RuleWithInvalidMatcher(rule_id.into()))
1578        }
1579        fnet_filter::CommitResult::RuleWithInvalidAction(rule_id) => {
1580            Err(CommitError::RuleWithInvalidAction(rule_id.into()))
1581        }
1582        fnet_filter::CommitResult::TransparentProxyWithInvalidMatcher(rule_id) => {
1583            Err(CommitError::TransparentProxyWithInvalidMatcher(rule_id.into()))
1584        }
1585        fnet_filter::CommitResult::RedirectWithInvalidMatcher(rule_id) => {
1586            Err(CommitError::RedirectWithInvalidMatcher(rule_id.into()))
1587        }
1588        fnet_filter::CommitResult::MasqueradeWithInvalidMatcher(rule_id) => {
1589            Err(CommitError::MasqueradeWithInvalidMatcher(rule_id.into()))
1590        }
1591        fnet_filter::CommitResult::RejectWithInvalidMatcher(rule_id) => {
1592            Err(CommitError::RejectWithInvalidMatcher(rule_id.into()))
1593        }
1594        fnet_filter::CommitResult::CyclicalRoutineGraph(routine_id) => {
1595            Err(CommitError::CyclicalRoutineGraph(routine_id.into()))
1596        }
1597        fnet_filter::CommitResult::ErrorOnChange(results) => {
1598            let errors: Result<_, CommitError> = committed_changes
1599                .into_iter()
1600                .zip(results)
1601                .try_fold(Vec::new(), |mut errors, (change, result)| match result {
1602                    fnet_filter::CommitError::Ok | fnet_filter::CommitError::NotReached => {
1603                        Ok(errors)
1604                    }
1605                    error @ (fnet_filter::CommitError::NamespaceNotFound
1606                    | fnet_filter::CommitError::RoutineNotFound
1607                    | fnet_filter::CommitError::RuleNotFound
1608                    | fnet_filter::CommitError::AlreadyExists
1609                    | fnet_filter::CommitError::TargetRoutineIsInstalled
1610                    | fnet_filter::CommitError::InvalidEbpfProgramId) => {
1611                        let error = error
1612                            .try_into()
1613                            .expect("`Ok` and `NotReached` are handled in another arm");
1614                        errors.push((change, error));
1615                        Ok(errors)
1616                    }
1617                    fnet_filter::CommitError::__SourceBreaking { .. } => {
1618                        Err(FidlConversionError::UnknownUnionVariant(type_names::COMMIT_ERROR)
1619                            .into())
1620                    }
1621                });
1622            Err(CommitError::ErrorOnChange(errors?))
1623        }
1624        fnet_filter::CommitResult::__SourceBreaking { .. } => {
1625            Err(FidlConversionError::UnknownUnionVariant(type_names::COMMIT_RESULT).into())
1626        }
1627    }
1628}
1629
1630#[cfg(test)]
1631mod tests {
1632
1633    use assert_matches::assert_matches;
1634    use flex_fuchsia_net_matchers as fnet_matchers;
1635    use futures::channel::mpsc;
1636    use futures::{FutureExt as _, SinkExt as _};
1637    use test_case::test_case;
1638
1639    use flex_fuchsia_hardware_network as fhardware_network;
1640    use flex_fuchsia_net_interfaces as fnet_interfaces;
1641
1642    use super::*;
1643
1644    #[test_case(
1645        fnet_filter::ResourceId::Namespace(String::from("namespace")),
1646        ResourceId::Namespace(NamespaceId(String::from("namespace")));
1647        "NamespaceId"
1648    )]
1649    #[test_case(fnet_filter::Domain::Ipv4, Domain::Ipv4; "Domain")]
1650    #[test_case(
1651        fnet_filter::Namespace {
1652            id: Some(String::from("namespace")),
1653            domain: Some(fnet_filter::Domain::Ipv4),
1654            ..Default::default()
1655        },
1656        Namespace { id: NamespaceId(String::from("namespace")), domain: Domain::Ipv4 };
1657        "Namespace"
1658    )]
1659    #[test_case(fnet_filter::IpInstallationHook::Egress, IpHook::Egress; "IpHook")]
1660    #[test_case(fnet_filter::NatInstallationHook::Egress, NatHook::Egress; "NatHook")]
1661    #[test_case(
1662        fnet_filter::InstalledIpRoutine {
1663            hook: Some(fnet_filter::IpInstallationHook::Egress),
1664            priority: Some(1),
1665            ..Default::default()
1666        },
1667        InstalledIpRoutine {
1668            hook: IpHook::Egress,
1669            priority: 1,
1670        };
1671        "InstalledIpRoutine"
1672    )]
1673    #[test_case(
1674        fnet_filter::RoutineType::Ip(fnet_filter::IpRoutine {
1675            installation: Some(fnet_filter::InstalledIpRoutine {
1676                hook: Some(fnet_filter::IpInstallationHook::LocalEgress),
1677                priority: Some(1),
1678                ..Default::default()
1679            }),
1680            ..Default::default()
1681        }),
1682        RoutineType::Ip(Some(InstalledIpRoutine { hook: IpHook::LocalEgress, priority: 1 }));
1683        "RoutineType"
1684    )]
1685    #[test_case(
1686        fnet_filter::Routine {
1687            id: Some(fnet_filter::RoutineId {
1688                namespace: String::from("namespace"),
1689                name: String::from("routine"),
1690            }),
1691            type_: Some(fnet_filter::RoutineType::Nat(fnet_filter::NatRoutine::default())),
1692            ..Default::default()
1693        },
1694        Routine {
1695            id: RoutineId {
1696                namespace: NamespaceId(String::from("namespace")),
1697                name: String::from("routine"),
1698            },
1699            routine_type: RoutineType::Nat(None),
1700        };
1701        "Routine"
1702    )]
1703    #[test_case(
1704        fnet_filter::Matchers {
1705            in_interface: Some(fnet_matchers::Interface::Name(String::from("wlan"))),
1706            transport_protocol: Some(fnet_matchers::PacketTransportProtocol::Tcp(fnet_matchers::TcpPacket {
1707                src_port: None,
1708                dst_port: Some(fnet_matchers::Port { start: 22, end: 22, invert: false }),
1709                ..Default::default()
1710            })),
1711            ..Default::default()
1712        },
1713        Matchers {
1714            in_interface: Some(fnet_matchers_ext::Interface::Name(String::from("wlan"))),
1715            transport_protocol: Some(fnet_matchers_ext::TransportProtocol::Tcp {
1716                src_port: None,
1717                dst_port: Some(fnet_matchers_ext::Port::new(22, 22, false).unwrap()),
1718            }),
1719            ..Default::default()
1720        };
1721        "Matchers"
1722    )]
1723    #[test_case(
1724        fnet_filter::Action::Accept(fnet_filter::Empty {}),
1725        Action::Accept;
1726        "Action"
1727    )]
1728    #[test_case(
1729        fnet_filter::Rule {
1730            id: fnet_filter::RuleId {
1731                routine: fnet_filter::RoutineId {
1732                    namespace: String::from("namespace"),
1733                    name: String::from("routine"),
1734                },
1735                index: 1,
1736            },
1737            matchers: fnet_filter::Matchers {
1738                transport_protocol: Some(fnet_matchers::PacketTransportProtocol::Icmp(
1739                    fnet_matchers::IcmpPacket::default()
1740                )),
1741                ..Default::default()
1742            },
1743            action: fnet_filter::Action::Drop(fnet_filter::Empty {}),
1744        },
1745        Rule {
1746            id: RuleId {
1747                routine: RoutineId {
1748                    namespace: NamespaceId(String::from("namespace")),
1749                    name: String::from("routine"),
1750                },
1751                index: 1,
1752            },
1753            matchers: Matchers {
1754                transport_protocol: Some(fnet_matchers_ext::TransportProtocol::Icmp),
1755                ..Default::default()
1756            },
1757            action: Action::Drop,
1758        };
1759        "Rule"
1760    )]
1761    #[test_case(
1762        fnet_filter::Resource::Namespace(fnet_filter::Namespace {
1763            id: Some(String::from("namespace")),
1764            domain: Some(fnet_filter::Domain::Ipv4),
1765            ..Default::default()
1766        }),
1767        Resource::Namespace(Namespace {
1768            id: NamespaceId(String::from("namespace")),
1769            domain: Domain::Ipv4
1770        });
1771        "Resource"
1772    )]
1773    #[test_case(
1774        fnet_filter::Event::EndOfUpdate(fnet_filter::Empty {}),
1775        Event::EndOfUpdate;
1776        "Event"
1777    )]
1778    #[test_case(
1779        fnet_filter::Change::Remove(fnet_filter::ResourceId::Namespace(String::from("namespace"))),
1780        Change::Remove(ResourceId::Namespace(NamespaceId(String::from("namespace"))));
1781        "Change"
1782    )]
1783    fn convert_from_fidl_and_back<F, E>(fidl_type: F, local_type: E)
1784    where
1785        E: TryFrom<F> + Clone + Debug + PartialEq,
1786        <E as TryFrom<F>>::Error: Debug + PartialEq,
1787        F: From<E> + Clone + Debug + PartialEq,
1788    {
1789        assert_eq!(fidl_type.clone().try_into(), Ok(local_type.clone()));
1790        assert_eq!(<_ as Into<F>>::into(local_type), fidl_type.clone());
1791    }
1792
1793    #[test]
1794    fn resource_id_try_from_unknown_variant() {
1795        assert_eq!(
1796            ResourceId::try_from(fnet_filter::ResourceId::__SourceBreaking { unknown_ordinal: 0 }),
1797            Err(FidlConversionError::UnknownUnionVariant(type_names::RESOURCE_ID))
1798        );
1799    }
1800
1801    #[test]
1802    fn domain_try_from_unknown_variant() {
1803        assert_eq!(
1804            Domain::try_from(fnet_filter::Domain::__SourceBreaking { unknown_ordinal: 0 }),
1805            Err(FidlConversionError::UnknownUnionVariant(type_names::DOMAIN))
1806        );
1807    }
1808
1809    #[test]
1810    fn namespace_try_from_missing_properties() {
1811        assert_eq!(
1812            Namespace::try_from(fnet_filter::Namespace {
1813                id: None,
1814                domain: Some(fnet_filter::Domain::Ipv4),
1815                ..Default::default()
1816            }),
1817            Err(FidlConversionError::MissingNamespaceId)
1818        );
1819        assert_eq!(
1820            Namespace::try_from(fnet_filter::Namespace {
1821                id: Some(String::from("namespace")),
1822                domain: None,
1823                ..Default::default()
1824            }),
1825            Err(FidlConversionError::MissingNamespaceDomain)
1826        );
1827    }
1828
1829    #[test]
1830    fn ip_installation_hook_try_from_unknown_variant() {
1831        assert_eq!(
1832            IpHook::try_from(fnet_filter::IpInstallationHook::__SourceBreaking {
1833                unknown_ordinal: 0
1834            }),
1835            Err(FidlConversionError::UnknownUnionVariant(type_names::IP_INSTALLATION_HOOK))
1836        );
1837    }
1838
1839    #[test]
1840    fn nat_installation_hook_try_from_unknown_variant() {
1841        assert_eq!(
1842            NatHook::try_from(fnet_filter::NatInstallationHook::__SourceBreaking {
1843                unknown_ordinal: 0
1844            }),
1845            Err(FidlConversionError::UnknownUnionVariant(type_names::NAT_INSTALLATION_HOOK))
1846        );
1847    }
1848
1849    #[test]
1850    fn installed_ip_routine_try_from_missing_hook() {
1851        assert_eq!(
1852            InstalledIpRoutine::try_from(fnet_filter::InstalledIpRoutine {
1853                hook: None,
1854                ..Default::default()
1855            }),
1856            Err(FidlConversionError::MissingIpInstallationHook)
1857        );
1858    }
1859
1860    #[test]
1861    fn installed_nat_routine_try_from_missing_hook() {
1862        assert_eq!(
1863            InstalledNatRoutine::try_from(fnet_filter::InstalledNatRoutine {
1864                hook: None,
1865                ..Default::default()
1866            }),
1867            Err(FidlConversionError::MissingNatInstallationHook)
1868        );
1869    }
1870
1871    #[test]
1872    fn routine_type_try_from_unknown_variant() {
1873        assert_eq!(
1874            RoutineType::try_from(fnet_filter::RoutineType::__SourceBreaking {
1875                unknown_ordinal: 0
1876            }),
1877            Err(FidlConversionError::UnknownUnionVariant(type_names::ROUTINE_TYPE))
1878        );
1879    }
1880
1881    #[test]
1882    fn routine_try_from_missing_properties() {
1883        assert_eq!(
1884            Routine::try_from(fnet_filter::Routine { id: None, ..Default::default() }),
1885            Err(FidlConversionError::MissingRoutineId)
1886        );
1887        assert_eq!(
1888            Routine::try_from(fnet_filter::Routine {
1889                id: Some(fnet_filter::RoutineId {
1890                    namespace: String::from("namespace"),
1891                    name: String::from("routine"),
1892                }),
1893                type_: None,
1894                ..Default::default()
1895            }),
1896            Err(FidlConversionError::MissingRoutineType)
1897        );
1898    }
1899
1900    #[test_case(
1901        fnet_matchers_ext::PortError::InvalidPortRange =>
1902        FidlConversionError::InvalidPortMatcherRange
1903    )]
1904    #[test_case(
1905        fnet_matchers_ext::InterfaceError::ZeroId =>
1906        FidlConversionError::ZeroInterfaceId
1907    )]
1908    #[test_case(
1909        fnet_matchers_ext::InterfaceError::UnknownUnionVariant =>
1910        FidlConversionError::UnknownUnionVariant(type_names::INTERFACE_MATCHER)
1911    )]
1912    #[test_case(
1913        {
1914            let invalid_port_class = fnet_interfaces::PortClass::__SourceBreaking {
1915                unknown_ordinal: 0
1916            };
1917            let error = fnet_interfaces_ext::PortClass::try_from(
1918                invalid_port_class
1919            ).unwrap_err();
1920            fnet_matchers_ext::InterfaceError::UnknownPortClass(error)
1921        } =>
1922        FidlConversionError::UnknownUnionVariant(type_names::NET_INTERFACES_PORT_CLASS);
1923        "UnknownPortClass=>UnknownUnionVariant"
1924    )]
1925    #[test_case(
1926        {
1927            let invalid_port_class = fhardware_network::PortClass::__SourceBreaking {
1928                unknown_ordinal: 0
1929            };
1930            let error = fnet_interfaces_ext::PortClass::try_from(
1931                invalid_port_class
1932            ).unwrap_err();
1933            fnet_matchers_ext::InterfaceError::UnknownPortClass(
1934                fnet_interfaces_ext::UnknownPortClassError::HardwareNetwork(error))
1935        } =>
1936        FidlConversionError::UnknownUnionVariant(type_names::HARDWARE_NETWORK_PORT_CLASS);
1937        "UnknownPortClass(HardwareNetwork)=>UnknownUnionVariant"
1938    )]
1939    #[test_case(
1940        fnet_matchers_ext::SubnetError::PrefixTooLong =>
1941        FidlConversionError::SubnetPrefixTooLong
1942    )]
1943    #[test_case(
1944        fnet_matchers_ext::SubnetError::HostBitsSet =>
1945        FidlConversionError::SubnetHostBitsSet
1946    )]
1947    #[test_case(
1948        fnet_matchers_ext::AddressRangeError::Invalid =>
1949        FidlConversionError::InvalidAddressRange
1950    )]
1951    #[test_case(
1952        fnet_matchers_ext::AddressRangeError::FamilyMismatch =>
1953        FidlConversionError::AddressRangeFamilyMismatch
1954    )]
1955    #[test_case(
1956        fnet_matchers_ext::AddressMatcherTypeError::Subnet(
1957            fnet_matchers_ext::SubnetError::PrefixTooLong) =>
1958        FidlConversionError::SubnetPrefixTooLong
1959    )]
1960    #[test_case(
1961        fnet_matchers_ext::AddressMatcherTypeError::Subnet(
1962            fnet_matchers_ext::SubnetError::HostBitsSet) =>
1963        FidlConversionError::SubnetHostBitsSet
1964    )]
1965    #[test_case(
1966        fnet_matchers_ext::AddressMatcherTypeError::AddressRange(
1967            fnet_matchers_ext::AddressRangeError::Invalid) =>
1968        FidlConversionError::InvalidAddressRange
1969    )]
1970    #[test_case(
1971        fnet_matchers_ext::AddressMatcherTypeError::AddressRange(
1972            fnet_matchers_ext::AddressRangeError::FamilyMismatch) =>
1973        FidlConversionError::AddressRangeFamilyMismatch
1974    )]
1975    #[test_case(
1976        fnet_matchers_ext::AddressMatcherTypeError::UnknownUnionVariant =>
1977        FidlConversionError::UnknownUnionVariant(type_names::ADDRESS_MATCHER_TYPE)
1978    )]
1979    #[test_case(
1980        fnet_matchers_ext::AddressError::AddressMatcherType(
1981            fnet_matchers_ext::AddressMatcherTypeError::Subnet(
1982                fnet_matchers_ext::SubnetError::PrefixTooLong)) =>
1983        FidlConversionError::SubnetPrefixTooLong
1984    )]
1985    #[test_case(
1986        fnet_matchers_ext::AddressError::AddressMatcherType(
1987            fnet_matchers_ext::AddressMatcherTypeError::Subnet(
1988                fnet_matchers_ext::SubnetError::HostBitsSet)) =>
1989        FidlConversionError::SubnetHostBitsSet
1990    )]
1991    #[test_case(
1992        fnet_matchers_ext::AddressError::AddressMatcherType(
1993            fnet_matchers_ext::AddressMatcherTypeError::AddressRange(
1994                fnet_matchers_ext::AddressRangeError::Invalid)) =>
1995        FidlConversionError::InvalidAddressRange
1996    )]
1997    #[test_case(
1998        fnet_matchers_ext::AddressError::AddressMatcherType(
1999            fnet_matchers_ext::AddressMatcherTypeError::AddressRange(
2000                fnet_matchers_ext::AddressRangeError::FamilyMismatch)) =>
2001        FidlConversionError::AddressRangeFamilyMismatch
2002    )]
2003    #[test_case(
2004        fnet_matchers_ext::AddressError::AddressMatcherType(
2005            fnet_matchers_ext::AddressMatcherTypeError::UnknownUnionVariant) =>
2006            FidlConversionError::UnknownUnionVariant(type_names::ADDRESS_MATCHER_TYPE)
2007    )]
2008    #[test_case(
2009        fnet_matchers_ext::TransportProtocolError::Port(
2010            fnet_matchers_ext::PortError::InvalidPortRange) =>
2011        FidlConversionError::InvalidPortMatcherRange
2012    )]
2013    #[test_case(
2014        fnet_matchers_ext::TransportProtocolError::UnknownUnionVariant =>
2015            FidlConversionError::UnknownUnionVariant(type_names::TRANSPORT_PROTOCOL)
2016    )]
2017    fn fidl_error_from_matcher_error<E: Into<FidlConversionError>>(
2018        error: E,
2019    ) -> FidlConversionError {
2020        error.into()
2021    }
2022
2023    #[test]
2024    fn action_try_from_unknown_variant() {
2025        assert_eq!(
2026            Action::try_from(fnet_filter::Action::__SourceBreaking { unknown_ordinal: 0 }),
2027            Err(FidlConversionError::UnknownUnionVariant(type_names::ACTION))
2028        );
2029    }
2030
2031    #[test]
2032    fn resource_try_from_unknown_variant() {
2033        assert_eq!(
2034            Resource::try_from(fnet_filter::Resource::__SourceBreaking { unknown_ordinal: 0 }),
2035            Err(FidlConversionError::UnknownUnionVariant(type_names::RESOURCE))
2036        );
2037    }
2038
2039    #[test]
2040    fn event_try_from_unknown_variant() {
2041        assert_eq!(
2042            Event::try_from(fnet_filter::Event::__SourceBreaking { unknown_ordinal: 0 }),
2043            Err(FidlConversionError::UnknownUnionVariant(type_names::EVENT))
2044        );
2045    }
2046
2047    #[test]
2048    fn change_try_from_unknown_variant() {
2049        assert_eq!(
2050            Change::try_from(fnet_filter::Change::__SourceBreaking { unknown_ordinal: 0 }),
2051            Err(FidlConversionError::UnknownUnionVariant(type_names::CHANGE))
2052        );
2053    }
2054
2055    fn test_controller_a() -> ControllerId {
2056        ControllerId(String::from("test-controller-a"))
2057    }
2058
2059    fn test_controller_b() -> ControllerId {
2060        ControllerId(String::from("test-controller-b"))
2061    }
2062
2063    pub(crate) fn test_resource_id() -> ResourceId {
2064        ResourceId::Namespace(NamespaceId(String::from("test-namespace")))
2065    }
2066
2067    pub(crate) fn test_resource() -> Resource {
2068        Resource::Namespace(Namespace {
2069            id: NamespaceId(String::from("test-namespace")),
2070            domain: Domain::AllIp,
2071        })
2072    }
2073
2074    // We can't easily create an invalid resource, so we just pretend and fake
2075    // the server response in tests.
2076    pub(crate) fn pretend_invalid_resource() -> Resource {
2077        Resource::Namespace(Namespace {
2078            id: NamespaceId(String::from("pretend-invalid-namespace")),
2079            domain: Domain::AllIp,
2080        })
2081    }
2082
2083    pub(crate) fn unknown_resource_id() -> ResourceId {
2084        ResourceId::Namespace(NamespaceId(String::from("does-not-exist")))
2085    }
2086
2087    #[fuchsia_async::run_singlethreaded(test)]
2088    async fn event_stream_from_state_conversion_error() {
2089        let client = flex_local::local_client_empty();
2090        let (proxy, mut request_stream) =
2091            client.create_proxy_and_stream::<fnet_filter::StateMarker>();
2092        let stream = event_stream_from_state(proxy).expect("get event stream");
2093        futures::pin_mut!(stream);
2094
2095        let send_invalid_event = async {
2096            let fnet_filter::StateRequest::GetWatcher { options: _, request, control_handle: _ } =
2097                request_stream
2098                    .next()
2099                    .await
2100                    .expect("client should call state")
2101                    .expect("request should not error");
2102            let fnet_filter::WatcherRequest::Watch { responder } = request
2103                .into_stream()
2104                .next()
2105                .await
2106                .expect("client should call watch")
2107                .expect("request should not error");
2108            responder
2109                .send(&[fnet_filter::Event::Added(fnet_filter::AddedResource {
2110                    controller: String::from("controller"),
2111                    resource: fnet_filter::Resource::Namespace(fnet_filter::Namespace {
2112                        id: None,
2113                        domain: None,
2114                        ..Default::default()
2115                    }),
2116                })])
2117                .expect("send batch with invalid event");
2118        };
2119        let ((), result) = futures::future::join(send_invalid_event, stream.next()).await;
2120        assert_matches!(
2121            result,
2122            Some(Err(WatchError::Conversion(FidlConversionError::MissingNamespaceId)))
2123        );
2124    }
2125
2126    #[fuchsia_async::run_singlethreaded(test)]
2127    async fn event_stream_from_state_empty_event_batch() {
2128        let client = flex_local::local_client_empty();
2129        let (proxy, mut request_stream) =
2130            client.create_proxy_and_stream::<fnet_filter::StateMarker>();
2131        let stream = event_stream_from_state(proxy).expect("get event stream");
2132        futures::pin_mut!(stream);
2133
2134        let send_empty_batch = async {
2135            let fnet_filter::StateRequest::GetWatcher { options: _, request, control_handle: _ } =
2136                request_stream
2137                    .next()
2138                    .await
2139                    .expect("client should call state")
2140                    .expect("request should not error");
2141            let fnet_filter::WatcherRequest::Watch { responder } = request
2142                .into_stream()
2143                .next()
2144                .await
2145                .expect("client should call watch")
2146                .expect("request should not error");
2147            responder.send(&[]).expect("send empty batch");
2148        };
2149        let ((), result) = futures::future::join(send_empty_batch, stream.next()).await;
2150        assert_matches!(result, Some(Err(WatchError::EmptyEventBatch)));
2151    }
2152
2153    #[fuchsia_async::run_singlethreaded(test)]
2154    async fn get_existing_resources_success() {
2155        let event_stream = futures::stream::iter([
2156            Ok(Event::Existing(test_controller_a(), test_resource())),
2157            Ok(Event::Existing(test_controller_b(), test_resource())),
2158            Ok(Event::Idle),
2159            Ok(Event::Removed(test_controller_a(), test_resource_id())),
2160        ]);
2161        futures::pin_mut!(event_stream);
2162
2163        let existing = get_existing_resources::<HashMap<_, _>>(event_stream.by_ref())
2164            .await
2165            .expect("get existing resources");
2166        assert_eq!(
2167            existing,
2168            HashMap::from([
2169                (test_controller_a(), HashMap::from([(test_resource_id(), test_resource())])),
2170                (test_controller_b(), HashMap::from([(test_resource_id(), test_resource())])),
2171            ])
2172        );
2173
2174        let trailing_events = event_stream.collect::<Vec<_>>().await;
2175        assert_matches!(
2176            &trailing_events[..],
2177            [Ok(Event::Removed(controller, resource))] if controller == &test_controller_a() &&
2178                                                           resource == &test_resource_id()
2179        );
2180    }
2181
2182    #[fuchsia_async::run_singlethreaded(test)]
2183    async fn get_existing_resources_error_in_stream() {
2184        let event_stream =
2185            futures::stream::once(futures::future::ready(Err(WatchError::EmptyEventBatch)));
2186        futures::pin_mut!(event_stream);
2187        assert_matches!(
2188            get_existing_resources::<HashMap<_, _>>(event_stream).await,
2189            Err(GetExistingResourcesError::ErrorInStream(WatchError::EmptyEventBatch))
2190        )
2191    }
2192
2193    #[fuchsia_async::run_singlethreaded(test)]
2194    async fn get_existing_resources_unexpected_event() {
2195        let event_stream = futures::stream::once(futures::future::ready(Ok(Event::EndOfUpdate)));
2196        futures::pin_mut!(event_stream);
2197        assert_matches!(
2198            get_existing_resources::<HashMap<_, _>>(event_stream).await,
2199            Err(GetExistingResourcesError::UnexpectedEvent(Event::EndOfUpdate))
2200        )
2201    }
2202
2203    #[fuchsia_async::run_singlethreaded(test)]
2204    async fn get_existing_resources_duplicate_resource() {
2205        let event_stream = futures::stream::iter([
2206            Ok(Event::Existing(test_controller_a(), test_resource())),
2207            Ok(Event::Existing(test_controller_a(), test_resource())),
2208        ]);
2209        futures::pin_mut!(event_stream);
2210        assert_matches!(
2211            get_existing_resources::<HashMap<_, _>>(event_stream).await,
2212            Err(GetExistingResourcesError::DuplicateResource(resource))
2213                if resource == test_resource()
2214        )
2215    }
2216
2217    #[fuchsia_async::run_singlethreaded(test)]
2218    async fn get_existing_resources_stream_ended() {
2219        let event_stream = futures::stream::once(futures::future::ready(Ok(Event::Existing(
2220            test_controller_a(),
2221            test_resource(),
2222        ))));
2223        futures::pin_mut!(event_stream);
2224        assert_matches!(
2225            get_existing_resources::<HashMap<_, _>>(event_stream).await,
2226            Err(GetExistingResourcesError::StreamEnded)
2227        )
2228    }
2229
2230    #[fuchsia_async::run_singlethreaded(test)]
2231    async fn wait_for_condition_add_remove() {
2232        let mut state = HashMap::new();
2233
2234        // Verify that checking for the presence of a resource blocks until the
2235        // resource is added.
2236        let has_resource = |resources: &HashMap<_, HashMap<_, _>>| {
2237            resources.get(&test_controller_a()).map_or(false, |controller| {
2238                controller
2239                    .get(&test_resource_id())
2240                    .map_or(false, |resource| resource == &test_resource())
2241            })
2242        };
2243        assert_matches!(
2244            wait_for_condition(futures::stream::pending(), &mut state, has_resource).now_or_never(),
2245            None
2246        );
2247        assert!(state.is_empty());
2248        assert_matches!(
2249            wait_for_condition(
2250                futures::stream::iter([
2251                    Ok(Event::Added(test_controller_b(), test_resource())),
2252                    Ok(Event::EndOfUpdate),
2253                    Ok(Event::Added(test_controller_a(), test_resource())),
2254                    Ok(Event::EndOfUpdate),
2255                ]),
2256                &mut state,
2257                has_resource
2258            )
2259            .now_or_never(),
2260            Some(Ok(()))
2261        );
2262        assert_eq!(
2263            state,
2264            HashMap::from([
2265                (test_controller_a(), HashMap::from([(test_resource_id(), test_resource())])),
2266                (test_controller_b(), HashMap::from([(test_resource_id(), test_resource())])),
2267            ])
2268        );
2269
2270        // Re-add the resource and observe an error.
2271        assert_matches!(
2272            wait_for_condition(
2273                futures::stream::iter([
2274                    Ok(Event::Added(test_controller_a(), test_resource())),
2275                    Ok(Event::EndOfUpdate),
2276                ]),
2277                &mut state,
2278                has_resource
2279            )
2280            .now_or_never(),
2281            Some(Err(WaitForConditionError::AddedAlreadyExisting(r))) if r == test_resource()
2282        );
2283        assert_eq!(
2284            state,
2285            HashMap::from([
2286                (test_controller_a(), HashMap::from([(test_resource_id(), test_resource())])),
2287                (test_controller_b(), HashMap::from([(test_resource_id(), test_resource())])),
2288            ])
2289        );
2290
2291        // Verify that checking for the absence of a resource blocks until the
2292        // resource is removed.
2293        let does_not_have_resource = |resources: &HashMap<_, HashMap<_, _>>| {
2294            resources.get(&test_controller_a()).map_or(false, |controller| controller.is_empty())
2295        };
2296        assert_matches!(
2297            wait_for_condition(futures::stream::pending(), &mut state, does_not_have_resource)
2298                .now_or_never(),
2299            None
2300        );
2301        assert_eq!(
2302            state,
2303            HashMap::from([
2304                (test_controller_a(), HashMap::from([(test_resource_id(), test_resource())])),
2305                (test_controller_b(), HashMap::from([(test_resource_id(), test_resource())])),
2306            ])
2307        );
2308        assert_matches!(
2309            wait_for_condition(
2310                futures::stream::iter([
2311                    Ok(Event::Removed(test_controller_b(), test_resource_id())),
2312                    Ok(Event::EndOfUpdate),
2313                    Ok(Event::Removed(test_controller_a(), test_resource_id())),
2314                    Ok(Event::EndOfUpdate),
2315                ]),
2316                &mut state,
2317                does_not_have_resource
2318            )
2319            .now_or_never(),
2320            Some(Ok(()))
2321        );
2322        assert_eq!(
2323            state,
2324            HashMap::from([
2325                (test_controller_a(), HashMap::new()),
2326                (test_controller_b(), HashMap::new()),
2327            ])
2328        );
2329
2330        // Remove a non-existent resource and observe an error.
2331        assert_matches!(
2332            wait_for_condition(
2333                futures::stream::iter([
2334                    Ok(Event::Removed(test_controller_a(), test_resource_id())),
2335                    Ok(Event::EndOfUpdate),
2336                ]),
2337                &mut state,
2338                does_not_have_resource
2339            ).now_or_never(),
2340            Some(Err(WaitForConditionError::RemovedNonExistent(r))) if r == test_resource_id()
2341        );
2342        assert_eq!(
2343            state,
2344            HashMap::from([
2345                (test_controller_a(), HashMap::new()),
2346                (test_controller_b(), HashMap::new()),
2347            ])
2348        );
2349    }
2350
2351    #[test]
2352    fn predicate_not_tested_until_update_complete() {
2353        let mut state = HashMap::new();
2354        let (mut tx, rx) = mpsc::unbounded();
2355
2356        let wait = wait_for_condition(rx, &mut state, |state| !state.is_empty()).fuse();
2357        futures::pin_mut!(wait);
2358
2359        // Sending an `Added` event should *not* allow the wait operation to
2360        // complete, because the predicate should only be tested once the full
2361        // update has been observed.
2362        let mut exec = fuchsia_async::TestExecutor::new();
2363        exec.run_singlethreaded(async {
2364            tx.send(Ok(Event::Added(test_controller_a(), test_resource())))
2365                .await
2366                .expect("receiver should not be closed");
2367            assert_matches!((&mut wait).now_or_never(), None);
2368        });
2369
2370        exec.run_singlethreaded(async {
2371            tx.send(Ok(Event::EndOfUpdate)).await.expect("receiver should not be closed");
2372            wait.await.expect("condition should be satisfied once update is complete");
2373        });
2374    }
2375
2376    #[fuchsia_async::run_singlethreaded(test)]
2377    async fn wait_for_condition_error_in_stream() {
2378        let mut state = HashMap::new();
2379        let event_stream =
2380            futures::stream::once(futures::future::ready(Err(WatchError::EmptyEventBatch)));
2381        assert_matches!(
2382            wait_for_condition(event_stream, &mut state, |_| true).await,
2383            Err(WaitForConditionError::ErrorInStream(WatchError::EmptyEventBatch))
2384        );
2385        assert!(state.is_empty());
2386    }
2387
2388    #[fuchsia_async::run_singlethreaded(test)]
2389    async fn wait_for_condition_stream_ended() {
2390        let mut state = HashMap::new();
2391        let event_stream = futures::stream::empty();
2392        assert_matches!(
2393            wait_for_condition(event_stream, &mut state, |_| true).await,
2394            Err(WaitForConditionError::StreamEnded)
2395        );
2396        assert!(state.is_empty());
2397    }
2398
2399    pub(crate) async fn handle_open_controller(
2400        mut request_stream: fnet_filter::ControlRequestStream,
2401    ) -> fnet_filter::NamespaceControllerRequestStream {
2402        let (id, request, _control_handle) = request_stream
2403            .next()
2404            .await
2405            .expect("client should open controller")
2406            .expect("request should not error")
2407            .into_open_controller()
2408            .expect("client should open controller");
2409        let (stream, control_handle) = request.into_stream_and_control_handle();
2410        control_handle.send_on_id_assigned(&id).expect("send assigned ID");
2411
2412        stream
2413    }
2414
2415    pub(crate) async fn handle_push_changes(
2416        stream: &mut fnet_filter::NamespaceControllerRequestStream,
2417        push_changes_result: fnet_filter::ChangeValidationResult,
2418    ) {
2419        let (_changes, responder) = stream
2420            .next()
2421            .await
2422            .expect("client should push changes")
2423            .expect("request should not error")
2424            .into_push_changes()
2425            .expect("client should push changes");
2426        responder.send(push_changes_result).expect("send empty batch");
2427    }
2428
2429    pub(crate) async fn handle_commit(
2430        stream: &mut fnet_filter::NamespaceControllerRequestStream,
2431        commit_result: fnet_filter::CommitResult,
2432    ) {
2433        let (_options, responder) = stream
2434            .next()
2435            .await
2436            .expect("client should commit")
2437            .expect("request should not error")
2438            .into_commit()
2439            .expect("client should commit");
2440        responder.send(commit_result).expect("send commit result");
2441    }
2442
2443    #[fuchsia_async::run_singlethreaded(test)]
2444    async fn controller_push_changes_reports_invalid_change() {
2445        let client = flex_local::local_client_empty();
2446        let (control, request_stream) =
2447            client.create_proxy_and_stream::<fnet_filter::ControlMarker>();
2448        let push_invalid_change = async {
2449            let mut controller = Controller::new(&control, &ControllerId(String::from("test")))
2450                .await
2451                .expect("create controller");
2452            let result = controller
2453                .push_changes(vec![
2454                    Change::Create(test_resource()),
2455                    // We fake the server response to say this is invalid even
2456                    // though it really isn't.
2457                    Change::Create(pretend_invalid_resource()),
2458                    Change::Remove(test_resource_id()),
2459                ])
2460                .await;
2461            assert_matches!(
2462                result,
2463                Err(PushChangesError::ErrorOnChange(errors)) if errors == vec![(
2464                    Change::Create(pretend_invalid_resource()),
2465                    ChangeValidationError::InvalidPortMatcher
2466                )]
2467            );
2468        };
2469
2470        let handle_controller = async {
2471            let mut stream = handle_open_controller(request_stream).await;
2472            handle_push_changes(
2473                &mut stream,
2474                fnet_filter::ChangeValidationResult::ErrorOnChange(vec![
2475                    fnet_filter::ChangeValidationError::Ok,
2476                    fnet_filter::ChangeValidationError::InvalidPortMatcher,
2477                    fnet_filter::ChangeValidationError::NotReached,
2478                ]),
2479            )
2480            .await;
2481        };
2482
2483        let ((), ()) = futures::future::join(push_invalid_change, handle_controller).await;
2484    }
2485
2486    #[fuchsia_async::run_singlethreaded(test)]
2487    async fn controller_commit_reports_invalid_change() {
2488        let client = flex_local::local_client_empty();
2489        let (control, request_stream) =
2490            client.create_proxy_and_stream::<fnet_filter::ControlMarker>();
2491        let commit_invalid_change = async {
2492            let mut controller = Controller::new(&control, &ControllerId(String::from("test")))
2493                .await
2494                .expect("create controller");
2495            controller
2496                .push_changes(vec![
2497                    Change::Create(test_resource()),
2498                    Change::Remove(unknown_resource_id()),
2499                    Change::Remove(test_resource_id()),
2500                ])
2501                .await
2502                .expect("push changes");
2503            let result = controller.commit().await;
2504            assert_matches!(
2505                result,
2506                Err(CommitError::ErrorOnChange(errors)) if errors == vec![(
2507                    Change::Remove(unknown_resource_id()),
2508                    ChangeCommitError::NamespaceNotFound,
2509                )]
2510            );
2511        };
2512        let handle_controller = async {
2513            let mut stream = handle_open_controller(request_stream).await;
2514            handle_push_changes(
2515                &mut stream,
2516                fnet_filter::ChangeValidationResult::Ok(fnet_filter::Empty {}),
2517            )
2518            .await;
2519            handle_commit(
2520                &mut stream,
2521                fnet_filter::CommitResult::ErrorOnChange(vec![
2522                    fnet_filter::CommitError::Ok,
2523                    fnet_filter::CommitError::NamespaceNotFound,
2524                    fnet_filter::CommitError::Ok,
2525                ]),
2526            )
2527            .await;
2528        };
2529        let ((), ()) = futures::future::join(commit_invalid_change, handle_controller).await;
2530    }
2531}