netstack3_filter/
conntrack.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod tcp;
6
7use alloc::collections::HashMap;
8use alloc::fmt::Debug;
9use alloc::sync::{Arc, Weak};
10use alloc::vec::Vec;
11use assert_matches::assert_matches;
12use core::any::Any;
13use core::fmt::Display;
14use core::hash::Hash;
15use core::time::Duration;
16
17use derivative::Derivative;
18use net_types::ip::{GenericOverIp, Ip, IpVersionMarker};
19use packet_formats::ip::{IpExt, IpProto, Ipv4Proto, Ipv6Proto};
20
21use crate::context::{FilterBindingsContext, FilterBindingsTypes};
22use crate::logic::FilterTimerId;
23use crate::packets::TransportPacketData;
24use netstack3_base::sync::Mutex;
25use netstack3_base::{CoreTimerContext, Inspectable, Inspector, Instant, TimerContext};
26
27/// The time from the end of one GC cycle to the beginning of the next.
28const GC_INTERVAL: Duration = Duration::from_secs(10);
29
30/// The time since the last seen packet after which an established UDP
31/// connection will be considered expired and is eligible for garbage
32/// collection.
33///
34/// This was taken from RFC 4787 REQ-5.
35const CONNECTION_EXPIRY_TIME_UDP: Duration = Duration::from_secs(120);
36
37/// The time since the last seen packet after which a generic connection will be
38/// considered expired and is eligible for garbage collection.
39const CONNECTION_EXPIRY_OTHER: Duration = Duration::from_secs(30);
40
41/// The maximum number of entries in the conntrack table.
42///
43/// NOTE: This is subtly different from the number of connections in the table
44/// because self-connected sockets only have a single entry instead of the
45/// normal two.
46pub(crate) const MAXIMUM_ENTRIES: usize = 100_000;
47
48/// Implements a connection tracking subsystem.
49///
50/// The `E` parameter is for external data that is stored in the [`Connection`]
51/// struct and can be extracted with the [`Connection::external_data()`]
52/// function.
53pub struct Table<I: IpExt, E, BT: FilterBindingsTypes> {
54    inner: Mutex<TableInner<I, E, BT>>,
55}
56
57struct TableInner<I: IpExt, E, BT: FilterBindingsTypes> {
58    /// A connection is inserted into the map twice: once for the original
59    /// tuple, and once for the reply tuple.
60    table: HashMap<Tuple<I>, Arc<ConnectionShared<I, E, BT>>>,
61    /// A timer for triggering garbage collection events.
62    gc_timer: BT::Timer,
63    /// The number of times the table size limit was hit.
64    table_limit_hits: u32,
65    /// Of the times the table limit was hit, the number of times we had to drop
66    /// a packet because we couldn't make space in the table.
67    table_limit_drops: u32,
68}
69
70impl<I: IpExt, E, BT: FilterBindingsTypes> Table<I, E, BT> {
71    /// Returns whether the table contains a connection for the specified tuple.
72    ///
73    /// This is for NAT to determine whether a generated tuple will clash with
74    /// one already in the map. While it might seem inefficient, to require
75    /// locking in a loop, taking an uncontested lock is going to be
76    /// significantly faster than the RNG used to allocate NAT parameters.
77    pub fn contains_tuple(&self, tuple: &Tuple<I>) -> bool {
78        self.inner.lock().table.contains_key(tuple)
79    }
80
81    /// Returns a [`Connection`] for the flow indexed by `tuple`, if one exists.
82    pub(crate) fn get_shared_connection(
83        &self,
84        tuple: &Tuple<I>,
85    ) -> Option<Arc<ConnectionShared<I, E, BT>>> {
86        let guard = self.inner.lock();
87        let conn = guard.table.get(&tuple)?;
88        Some(conn.clone())
89    }
90
91    /// Returns a [`Connection`] for the flow indexed by `tuple`, if one exists.
92    pub fn get_connection(&self, tuple: &Tuple<I>) -> Option<Connection<I, E, BT>> {
93        let guard = self.inner.lock();
94        let conn = guard.table.get(&tuple)?;
95        Some(Connection::Shared(conn.clone()))
96    }
97
98    /// Returns the number of entries in the table.
99    ///
100    /// NOTE: This is usually twice the number of connections, but self-connected sockets will only
101    /// have a single entry.
102    #[cfg(feature = "testutils")]
103    pub fn num_entries(&self) -> usize {
104        self.inner.lock().table.len()
105    }
106
107    /// Removes the [`Connection`] for the flow indexed by `tuple`, if one exists,
108    /// and returns it to the caller.
109    #[cfg(feature = "testutils")]
110    pub fn remove_connection(&mut self, tuple: &Tuple<I>) -> Option<Connection<I, E, BT>> {
111        let mut guard = self.inner.lock();
112
113        // Remove the entry indexed by the tuple.
114        let conn = guard.table.remove(&tuple)?;
115        let (original, reply) = (&conn.inner.original_tuple, &conn.inner.reply_tuple);
116
117        // If this is not a self-connected flow, we need to remove the other tuple on
118        // which the connection is indexed.
119        if original != reply {
120            if tuple == original {
121                assert!(guard.table.remove(reply).is_some());
122            } else {
123                assert!(guard.table.remove(original).is_some());
124            }
125        }
126
127        Some(Connection::Shared(conn))
128    }
129}
130
131fn schedule_gc<BC>(bindings_ctx: &mut BC, timer: &mut BC::Timer)
132where
133    BC: TimerContext,
134{
135    let _ = bindings_ctx.schedule_timer(GC_INTERVAL, timer);
136}
137
138impl<I: IpExt, E, BC: FilterBindingsContext> Table<I, E, BC> {
139    pub(crate) fn new<CC: CoreTimerContext<FilterTimerId<I>, BC>>(bindings_ctx: &mut BC) -> Self {
140        Self {
141            inner: Mutex::new(TableInner {
142                table: HashMap::new(),
143                gc_timer: CC::new_timer(
144                    bindings_ctx,
145                    FilterTimerId::ConntrackGc(IpVersionMarker::<I>::new()),
146                ),
147                table_limit_hits: 0,
148                table_limit_drops: 0,
149            }),
150        }
151    }
152}
153
154impl<
155        I: IpExt,
156        E: Default + Send + Sync + PartialEq + CompatibleWith + 'static,
157        BC: FilterBindingsContext,
158    > Table<I, E, BC>
159{
160    /// Attempts to insert the `Connection` into the table.
161    ///
162    /// To be called once a packet for the connection has passed all filtering.
163    /// The boolean return value represents whether the connection was newly
164    /// added to the connection tracking state.
165    ///
166    /// This is on [`Table`] instead of [`Connection`] because conntrack needs
167    /// to be able to manipulate its internal map.
168    pub(crate) fn finalize_connection(
169        &self,
170        bindings_ctx: &mut BC,
171        connection: Connection<I, E, BC>,
172    ) -> Result<(bool, Option<Arc<ConnectionShared<I, E, BC>>>), FinalizeConnectionError> {
173        let exclusive = match connection {
174            Connection::Exclusive(c) => c,
175            // Given that make_shared is private, the only way for us to receive
176            // a shared connection is if it was already present in the map. This
177            // is far and away the most common case under normal operation.
178            Connection::Shared(inner) => return Ok((false, Some(inner))),
179        };
180
181        if exclusive.do_not_insert {
182            return Ok((false, None));
183        }
184
185        let mut guard = self.inner.lock();
186
187        if guard.table.len() >= MAXIMUM_ENTRIES {
188            guard.table_limit_hits = guard.table_limit_hits.saturating_add(1);
189
190            struct Info<'a, I: IpExt, BT: FilterBindingsTypes> {
191                original_tuple: &'a Tuple<I>,
192                reply_tuple: &'a Tuple<I>,
193                lifecycle: EstablishmentLifecycle,
194                last_seen: BT::Instant,
195            }
196
197            let mut info: Option<Info<'_, I, BC>> = None;
198
199            let now = bindings_ctx.now();
200            // Find a non-established connection to evict.
201            //
202            // 1. If a connection is expired, immediately choose it.
203            // 2. Otherwise, pick the connection that is "least established".
204            //    - SeenOriginal is less established than SeenReply
205            //    - A connection is less established than another with the same
206            //      establishment lifecycle if it saw a packet less recently.
207            //
208            // If all connections are established, then we can't free any space
209            // and report the error to the caller.
210            for (_, conn) in &guard.table {
211                let state = conn.state.lock();
212                if state.is_expired(now) {
213                    info = Some(Info {
214                        original_tuple: &conn.inner.original_tuple,
215                        reply_tuple: &conn.inner.reply_tuple,
216                        lifecycle: state.establishment_lifecycle,
217                        last_seen: state.last_packet_time,
218                    });
219                    break;
220                }
221
222                match state.establishment_lifecycle {
223                    EstablishmentLifecycle::SeenOriginal | EstablishmentLifecycle::SeenReply => {
224                        match &info {
225                            None => {
226                                info = Some(Info {
227                                    original_tuple: &conn.inner.original_tuple,
228                                    reply_tuple: &conn.inner.reply_tuple,
229                                    lifecycle: state.establishment_lifecycle,
230                                    last_seen: state.last_packet_time,
231                                })
232                            }
233                            Some(existing) => {
234                                if state.establishment_lifecycle < existing.lifecycle
235                                    || (state.establishment_lifecycle == existing.lifecycle
236                                        && state.last_packet_time < existing.last_seen)
237                                {
238                                    info = Some(Info {
239                                        original_tuple: &conn.inner.original_tuple,
240                                        reply_tuple: &conn.inner.reply_tuple,
241                                        lifecycle: state.establishment_lifecycle,
242                                        last_seen: state.last_packet_time,
243                                    })
244                                }
245                            }
246                        }
247                    }
248                    EstablishmentLifecycle::Established => {}
249                }
250            }
251
252            if let Some(Info { original_tuple, reply_tuple, .. }) = info {
253                let original_tuple = original_tuple.clone();
254                let reply_tuple = reply_tuple.clone();
255
256                assert!(guard.table.remove(&original_tuple).is_some());
257                if original_tuple != reply_tuple {
258                    assert!(guard.table.remove(&reply_tuple).is_some());
259                }
260            } else {
261                guard.table_limit_drops = guard.table_limit_drops.saturating_add(1);
262                return Err(FinalizeConnectionError::TableFull);
263            }
264        }
265
266        // The expected case here is that there isn't a conflict.
267        //
268        // Normally, we'd want to use the entry API to reduce the number of map
269        // lookups, but this setup allows us to completely avoid any heap
270        // allocations until we're sure that the insertion will succeed. This
271        // wastes a little CPU in the common case to avoid pathological behavior
272        // in degenerate cases.
273        if guard.table.contains_key(&exclusive.inner.original_tuple)
274            || guard.table.contains_key(&exclusive.inner.reply_tuple)
275        {
276            // NOTE: It's possible for the first two packets (or more) in the
277            // same flow to create ExclusiveConnections. Typically packets for
278            // the same flow are handled sequentically, so each subsequent
279            // packet should see the connection created by the first one.
280            // However, it is possible (e.g. if these two packets arrive on
281            // different interfaces) for them to race.
282            //
283            // In this case, subsequent packets would be reported as conflicts.
284            // To avoid this race condition, we check whether the conflicting
285            // connection in the table is actually the same as the connection
286            // that we are attempting to finalize; if so, we can simply adopt
287            // the already-finalized connection.
288            let conn = if let Some(conn) = guard.table.get(&exclusive.inner.original_tuple) {
289                conn
290            } else {
291                guard
292                    .table
293                    .get(&exclusive.inner.reply_tuple)
294                    .expect("checked that tuple is in table and table is locked")
295            };
296            if conn.compatible_with(&exclusive) {
297                return Ok((false, Some(conn.clone())));
298            }
299
300            // TODO(https://fxbug.dev/372549231): add a counter for this error.
301            Err(FinalizeConnectionError::Conflict)
302        } else {
303            let shared = exclusive.make_shared();
304            let clone = Arc::clone(&shared);
305
306            let res = guard.table.insert(shared.inner.original_tuple.clone(), shared.clone());
307            debug_assert!(res.is_none());
308
309            if shared.inner.reply_tuple != shared.inner.original_tuple {
310                let res = guard.table.insert(shared.inner.reply_tuple.clone(), shared);
311                debug_assert!(res.is_none());
312            }
313
314            // For the most part, this will only schedule the timer once, when
315            // the first packet hits the netstack. However, since the GC timer
316            // is only rescheduled during GC when the table has entries, it's
317            // possible that this will be called again if the table ever becomes
318            // empty.
319            if bindings_ctx.scheduled_instant(&mut guard.gc_timer).is_none() {
320                schedule_gc(bindings_ctx, &mut guard.gc_timer);
321            }
322
323            Ok((true, Some(clone)))
324        }
325    }
326}
327
328impl<I: IpExt, E: Default, BC: FilterBindingsContext> Table<I, E, BC> {
329    /// Returns a [`Connection`] for the packet's flow. If a connection does not
330    /// currently exist, a new one is created.
331    ///
332    /// At the same time, process the packet for the connection, updating
333    /// internal connection state.
334    ///
335    /// After processing is complete, you must call
336    /// [`finalize_connection`](Table::finalize_connection) with this
337    /// connection.
338    pub(crate) fn get_connection_for_packet_and_update(
339        &self,
340        bindings_ctx: &BC,
341        packet: PacketMetadata<I>,
342    ) -> Result<Option<(Connection<I, E, BC>, ConnectionDirection)>, GetConnectionError<I, E, BC>>
343    {
344        let tuple = packet.tuple();
345        let mut connection = match self.inner.lock().table.get(&tuple) {
346            Some(connection) => Connection::Shared(connection.clone()),
347            None => match ConnectionExclusive::from_deconstructed_packet(bindings_ctx, &packet) {
348                None => return Ok(None),
349                Some(c) => Connection::Exclusive(c),
350            },
351        };
352
353        let direction = connection
354            .direction(&tuple)
355            .expect("tuple must match connection as we just looked up connection by tuple");
356
357        match connection.update(bindings_ctx, &packet, direction) {
358            Ok(ConnectionUpdateAction::NoAction) => Ok(Some((connection, direction))),
359            Ok(ConnectionUpdateAction::RemoveEntry) => match connection {
360                Connection::Exclusive(mut conn) => {
361                    conn.do_not_insert = true;
362                    Ok(Some((Connection::Exclusive(conn), direction)))
363                }
364                Connection::Shared(conn) => {
365                    // RACE: It's possible that GC already removed the
366                    // connection from the table, since we released the table
367                    // lock while updating the connection.
368                    let mut guard = self.inner.lock();
369                    let _ = guard.table.remove(&conn.inner.original_tuple);
370                    let _ = guard.table.remove(&conn.inner.reply_tuple);
371
372                    Ok(Some((Connection::Shared(conn), direction)))
373                }
374            },
375            Err(ConnectionUpdateError::InvalidPacket) => {
376                Err(GetConnectionError::InvalidPacket(connection, direction))
377            }
378        }
379    }
380
381    pub(crate) fn perform_gc(&self, bindings_ctx: &mut BC) {
382        let now = bindings_ctx.now();
383        let mut guard = self.inner.lock();
384
385        // Sadly, we can't easily remove entries from the map in-place for two
386        // reasons:
387        // - HashMap::retain() will look at each connection twice, since it will
388        // be inserted under both tuples. If a packet updates last_packet_time
389        // between these two checks, we might remove one tuple of the connection
390        // but not the other, leaving a single tuple in the table, which breaks
391        // a core invariant.
392        // - You can't modify a std::HashMap while iterating over it.
393        let to_remove: Vec<_> = guard
394            .table
395            .iter()
396            .filter_map(|(tuple, conn)| {
397                if *tuple == conn.inner.original_tuple && conn.is_expired(now) {
398                    Some((conn.inner.original_tuple.clone(), conn.inner.reply_tuple.clone()))
399                } else {
400                    None
401                }
402            })
403            .collect();
404
405        for (original_tuple, reply_tuple) in to_remove {
406            assert!(guard.table.remove(&original_tuple).is_some());
407            if reply_tuple != original_tuple {
408                assert!(guard.table.remove(&reply_tuple).is_some());
409            }
410        }
411
412        // The table is only expected to be empty in exceptional cases, or
413        // during tests. The test case especially important, because some tests
414        // will wait for core to quiesce by waiting for timers to stop firing.
415        // By only rescheduling when there are still entries in the table, we
416        // ensure that we won't enter an infinite timer firing/scheduling loop.
417        if !guard.table.is_empty() {
418            schedule_gc(bindings_ctx, &mut guard.gc_timer);
419        }
420    }
421}
422
423impl<I: IpExt, E: Inspectable, BT: FilterBindingsTypes> Inspectable for Table<I, E, BT> {
424    fn record<Inspector: netstack3_base::Inspector>(&self, inspector: &mut Inspector) {
425        let guard = self.inner.lock();
426
427        inspector.record_usize("num_entries", guard.table.len());
428        inspector.record_uint("table_limit_hits", guard.table_limit_hits);
429        inspector.record_uint("table_limit_drops", guard.table_limit_drops);
430
431        inspector.record_child("connections", |inspector| {
432            guard
433                .table
434                .iter()
435                .filter_map(|(tuple, connection)| {
436                    if *tuple == connection.inner.original_tuple {
437                        Some(connection)
438                    } else {
439                        None
440                    }
441                })
442                .for_each(|connection| {
443                    inspector.record_unnamed_child(|inspector| {
444                        inspector.delegate_inspectable(connection.as_ref())
445                    });
446                });
447        });
448    }
449}
450
451/// A tuple for a flow in a single direction.
452#[derive(Debug, Clone, PartialEq, Eq, Hash, GenericOverIp)]
453#[generic_over_ip(I, Ip)]
454pub struct Tuple<I: IpExt> {
455    /// The IP protocol number of the flow.
456    pub protocol: TransportProtocol,
457    /// The source IP address of the flow.
458    pub src_addr: I::Addr,
459    /// The destination IP address of the flow.
460    pub dst_addr: I::Addr,
461    /// The transport-layer source port or ID of the flow.
462    pub src_port_or_id: u16,
463    /// The transport-layer destination port or ID of the flow.
464    pub dst_port_or_id: u16,
465}
466
467impl<I: IpExt> Tuple<I> {
468    fn new(
469        src_addr: I::Addr,
470        dst_addr: I::Addr,
471        src_port_or_id: u16,
472        dst_port_or_id: u16,
473        protocol: TransportProtocol,
474    ) -> Self {
475        Self { protocol, src_addr, dst_addr, src_port_or_id, dst_port_or_id }
476    }
477
478    /// Returns the inverted version of the tuple.
479    ///
480    /// This means the src and dst addresses are swapped. For TCP and UDP, the
481    /// ports are reversed, but for ICMP, where the ports stand in for other
482    /// information, things are more complicated.
483    pub(crate) fn invert(self) -> Tuple<I> {
484        // TODO(https://fxbug.dev/328064082): Support tracking different ICMP
485        // request/response types.
486        Self {
487            protocol: self.protocol,
488            src_addr: self.dst_addr,
489            dst_addr: self.src_addr,
490            src_port_or_id: self.dst_port_or_id,
491            dst_port_or_id: self.src_port_or_id,
492        }
493    }
494}
495
496impl<I: IpExt> Inspectable for Tuple<I> {
497    fn record<Inspector: netstack3_base::Inspector>(&self, inspector: &mut Inspector) {
498        inspector.record_debug("protocol", self.protocol);
499        inspector.record_ip_addr("src_addr", self.src_addr);
500        inspector.record_ip_addr("dst_addr", self.dst_addr);
501        inspector.record_usize("src_port_or_id", self.src_port_or_id);
502        inspector.record_usize("dst_port_or_id", self.dst_port_or_id);
503    }
504}
505
506/// The direction of a packet when compared to a given connection.
507#[derive(Debug, Copy, Clone, PartialEq, Eq)]
508pub enum ConnectionDirection {
509    /// The packet is traveling in the same direction as the first packet seen
510    /// for the [`Connection`].
511    Original,
512
513    /// The packet is traveling in the opposite direction from the first packet
514    /// seen for the [`Connection`].
515    Reply,
516}
517
518/// An error returned from [`Table::finalize_connection`].
519#[derive(Debug)]
520pub(crate) enum FinalizeConnectionError {
521    /// There is a conflicting connection already tracked by conntrack. The
522    /// to-be-finalized connection was not inserted into the table.
523    Conflict,
524
525    /// The table has reached the hard size cap and no room could be made.
526    TableFull,
527}
528
529/// Type to track additional processing required after updating a connection.
530#[derive(Debug, PartialEq, Eq)]
531enum ConnectionUpdateAction {
532    /// Processing completed and no further action necessary.
533    NoAction,
534
535    /// The entry for this connection should be removed from the conntrack table.
536    RemoveEntry,
537}
538
539/// An error returned from [`Connection::update`].
540#[derive(Debug, PartialEq, Eq)]
541enum ConnectionUpdateError {
542    /// The packet was invalid. The caller may decide whether to drop this
543    /// packet or not.
544    InvalidPacket,
545}
546
547/// An error returned from [`Table::get_connection_for_packet_and_update`].
548#[derive(Derivative)]
549#[derivative(Debug(bound = "E: Debug"))]
550pub(crate) enum GetConnectionError<I: IpExt, E, BT: FilterBindingsTypes> {
551    /// The packet was invalid. The caller may decide whether to drop it or not.
552    InvalidPacket(Connection<I, E, BT>, ConnectionDirection),
553}
554
555/// A `Connection` contains all of the information about a single connection
556/// tracked by conntrack.
557#[derive(Derivative)]
558#[derivative(Debug(bound = "E: Debug"))]
559pub enum Connection<I: IpExt, E, BT: FilterBindingsTypes> {
560    /// A connection that is directly owned by the packet that originated the
561    /// connection and no others. All fields are modifiable.
562    Exclusive(ConnectionExclusive<I, E, BT>),
563
564    /// This is an existing connection, and there are possibly many other
565    /// packets that are concurrently modifying it.
566    Shared(Arc<ConnectionShared<I, E, BT>>),
567}
568
569/// An error when attempting to retrieve the underlying conntrack entry from a
570/// weak handle to it.
571#[derive(Debug)]
572pub enum WeakConnectionError {
573    /// The entry was removed from the table after the weak handle was created.
574    EntryRemoved,
575    /// The entry does not match the type that is expected (due to an IP version
576    /// mismatch, for example).
577    InvalidEntry,
578}
579
580/// A type-erased weak handle to a connection tracking entry.
581///
582/// We use type erasure here to get rid of the parameterization on IP version;
583/// this handle is meant to be able to transit the device layer and at that
584/// point things are not parameterized on IP version (IPv4 and IPv6 packets both
585/// end up in the same device queue, for example).
586///
587/// When this is received for incoming packets, [`WeakConnection::into_inner`]
588/// can be used to downcast to the expected concrete [`Connection`] type.
589#[derive(Debug, Clone)]
590pub struct WeakConnection(pub(crate) Weak<dyn Any + Send + Sync>);
591
592impl WeakConnection {
593    /// Creates a new type-erased weak handle to the provided conntrack entry,
594    /// provided it is a shared entry.
595    pub fn new<I: IpExt, BT: FilterBindingsTypes + 'static, E: Send + Sync + 'static>(
596        conn: &Connection<I, E, BT>,
597    ) -> Option<Self> {
598        let shared = match conn {
599            Connection::Exclusive(_) => return None,
600            Connection::Shared(shared) => shared,
601        };
602        let weak = Arc::downgrade(shared);
603        Some(Self(weak))
604    }
605
606    /// Attempts to upgrade the provided weak handle to the conntrack entry and
607    /// downcast it to the specified concrete [`Connection`] type.
608    ///
609    /// Fails if either the weak handle cannot be upgraded (because the conntrack
610    /// entry has since been removed), or the type-erased handle cannot be downcast
611    /// to the concrete type (because the packet was modified after the creation of
612    /// this handle such that it no longer matches, e.g. the IP version of the
613    /// connection).
614    pub fn into_inner<I: IpExt, BT: FilterBindingsTypes + 'static, E: Send + Sync + 'static>(
615        self,
616    ) -> Result<Connection<I, E, BT>, WeakConnectionError> {
617        let Self(inner) = self;
618        let shared = inner
619            .upgrade()
620            .ok_or(WeakConnectionError::EntryRemoved)?
621            .downcast()
622            .map_err(|_err: Arc<_>| WeakConnectionError::InvalidEntry)?;
623        Ok(Connection::Shared(shared))
624    }
625}
626
627impl<I: IpExt, E, BT: FilterBindingsTypes> Connection<I, E, BT> {
628    /// Returns the tuple of the original direction of this connection.
629    pub fn original_tuple(&self) -> &Tuple<I> {
630        match self {
631            Connection::Exclusive(c) => &c.inner.original_tuple,
632            Connection::Shared(c) => &c.inner.original_tuple,
633        }
634    }
635
636    /// Returns the tuple of the reply direction of this connection.
637    pub(crate) fn reply_tuple(&self) -> &Tuple<I> {
638        match self {
639            Connection::Exclusive(c) => &c.inner.reply_tuple,
640            Connection::Shared(c) => &c.inner.reply_tuple,
641        }
642    }
643
644    /// Returns a reference to the [`Connection::external_data`] field.
645    pub fn external_data(&self) -> &E {
646        match self {
647            Connection::Exclusive(c) => &c.inner.external_data,
648            Connection::Shared(c) => &c.inner.external_data,
649        }
650    }
651
652    /// Returns the direction the tuple represents with respect to the
653    /// connection.
654    pub(crate) fn direction(&self, tuple: &Tuple<I>) -> Option<ConnectionDirection> {
655        let (original, reply) = match self {
656            Connection::Exclusive(c) => (&c.inner.original_tuple, &c.inner.reply_tuple),
657            Connection::Shared(c) => (&c.inner.original_tuple, &c.inner.reply_tuple),
658        };
659
660        // The ordering here is sadly mildly load-bearing. For self-connected
661        // sockets, the first comparison will be true, so having the original
662        // tuple first would mean that the connection is never marked
663        // established.
664        //
665        // This ordering means that all self-connected connections will be
666        // marked as established immediately upon receiving the first packet.
667        if tuple == reply {
668            Some(ConnectionDirection::Reply)
669        } else if tuple == original {
670            Some(ConnectionDirection::Original)
671        } else {
672            None
673        }
674    }
675
676    /// Returns a copy of the internal connection state
677    #[allow(dead_code)]
678    pub(crate) fn state(&self) -> ConnectionState<BT> {
679        match self {
680            Connection::Exclusive(c) => c.state.clone(),
681            Connection::Shared(c) => c.state.lock().clone(),
682        }
683    }
684}
685
686impl<I: IpExt, E, BC: FilterBindingsContext> Connection<I, E, BC> {
687    fn update(
688        &mut self,
689        bindings_ctx: &BC,
690        packet: &PacketMetadata<I>,
691        direction: ConnectionDirection,
692    ) -> Result<ConnectionUpdateAction, ConnectionUpdateError> {
693        match packet {
694            PacketMetadata::Full { transport_data, .. } => {
695                let now = bindings_ctx.now();
696                match self {
697                    Connection::Exclusive(c) => c.state.update(direction, &transport_data, now),
698                    Connection::Shared(c) => c.state.lock().update(direction, &transport_data, now),
699                }
700            }
701            PacketMetadata::IcmpError(_) => Ok(ConnectionUpdateAction::NoAction),
702        }
703    }
704}
705
706/// Fields common to both [`ConnectionExclusive`] and [`ConnectionShared`].
707#[derive(Derivative)]
708#[derivative(Debug(bound = "E: Debug"), PartialEq(bound = "E: PartialEq"))]
709pub struct ConnectionCommon<I: IpExt, E> {
710    /// The 5-tuple for the connection in the original direction. This is
711    /// arbitrary, and is just the direction where a packet was first seen.
712    pub(crate) original_tuple: Tuple<I>,
713
714    /// The 5-tuple for the connection in the reply direction. This is what's
715    /// used for packet rewriting for NAT.
716    pub(crate) reply_tuple: Tuple<I>,
717
718    /// Extra information that is not needed by the conntrack module itself. In
719    /// the case of NAT, we expect this to contain things such as the kind of
720    /// rewriting that will occur (e.g. SNAT vs DNAT).
721    pub(crate) external_data: E,
722}
723
724impl<I: IpExt, E: Inspectable> Inspectable for ConnectionCommon<I, E> {
725    fn record<Inspector: netstack3_base::Inspector>(&self, inspector: &mut Inspector) {
726        inspector.record_child("original_tuple", |inspector| {
727            inspector.delegate_inspectable(&self.original_tuple);
728        });
729
730        inspector.record_child("reply_tuple", |inspector| {
731            inspector.delegate_inspectable(&self.reply_tuple);
732        });
733
734        // We record external_data as an inspectable because that allows us to
735        // prevent accidentally leaking data, which could happen if we just used
736        // the Debug impl.
737        inspector.record_child("external_data", |inspector| {
738            inspector.delegate_inspectable(&self.external_data);
739        });
740    }
741}
742
743#[derive(Debug, Clone)]
744enum ProtocolState {
745    Tcp(tcp::Connection),
746    Udp,
747    Other,
748}
749
750impl ProtocolState {
751    fn update(
752        &mut self,
753        dir: ConnectionDirection,
754        transport_data: &TransportPacketData,
755    ) -> Result<ConnectionUpdateAction, ConnectionUpdateError> {
756        match self {
757            ProtocolState::Tcp(tcp_conn) => {
758                let (segment, payload_len) = assert_matches!(
759                    transport_data,
760                    TransportPacketData::Tcp { segment, payload_len, .. } => (segment, payload_len)
761                );
762                tcp_conn.update(&segment, *payload_len, dir)
763            }
764            ProtocolState::Udp | ProtocolState::Other => Ok(ConnectionUpdateAction::NoAction),
765        }
766    }
767}
768
769/// The lifecycle of the connection getting to being established.
770///
771/// To mimic Linux behavior, we require seeing three packets in order to mark a
772/// connection established.
773/// 1. Original
774/// 2. Reply
775/// 3. Original
776///
777/// The first packet is implicit in the creation of the connection when the
778/// first packet is seen.
779#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
780enum EstablishmentLifecycle {
781    SeenOriginal,
782    SeenReply,
783    Established,
784}
785
786impl EstablishmentLifecycle {
787    fn update(self, dir: ConnectionDirection) -> Self {
788        match self {
789            EstablishmentLifecycle::SeenOriginal => match dir {
790                ConnectionDirection::Original => self,
791                ConnectionDirection::Reply => EstablishmentLifecycle::SeenReply,
792            },
793            EstablishmentLifecycle::SeenReply => match dir {
794                ConnectionDirection::Original => EstablishmentLifecycle::Established,
795                ConnectionDirection::Reply => self,
796            },
797            EstablishmentLifecycle::Established => self,
798        }
799    }
800}
801
802/// Dynamic per-connection state.
803#[derive(Derivative)]
804#[derivative(Clone(bound = ""), Debug(bound = ""))]
805pub(crate) struct ConnectionState<BT: FilterBindingsTypes> {
806    /// The time the last packet was seen for this connection (in either of the
807    /// original or reply directions).
808    last_packet_time: BT::Instant,
809
810    /// Where in the generic establishment lifecycle the current connection is.
811    establishment_lifecycle: EstablishmentLifecycle,
812
813    /// State that is specific to a given protocol (e.g. TCP or UDP).
814    protocol_state: ProtocolState,
815}
816
817impl<BT: FilterBindingsTypes> ConnectionState<BT> {
818    fn update(
819        &mut self,
820        dir: ConnectionDirection,
821        transport_data: &TransportPacketData,
822        now: BT::Instant,
823    ) -> Result<ConnectionUpdateAction, ConnectionUpdateError> {
824        if self.last_packet_time < now {
825            self.last_packet_time = now;
826        }
827
828        self.establishment_lifecycle = self.establishment_lifecycle.update(dir);
829
830        self.protocol_state.update(dir, transport_data)
831    }
832
833    fn is_expired(&self, now: BT::Instant) -> bool {
834        let duration = now.saturating_duration_since(self.last_packet_time);
835
836        let expiry_duration = match &self.protocol_state {
837            ProtocolState::Tcp(tcp_conn) => tcp_conn.expiry_duration(self.establishment_lifecycle),
838            ProtocolState::Udp => CONNECTION_EXPIRY_TIME_UDP,
839            // ICMP ends up here. The ICMP messages we track are simple
840            // request/response protocols, so we always expect to get a response
841            // quickly (within 2 RTT). Any followup messages (e.g. if making
842            // periodic ECHO requests) should reuse this existing connection.
843            ProtocolState::Other => CONNECTION_EXPIRY_OTHER,
844        };
845
846        duration >= expiry_duration
847    }
848}
849
850impl<BT: FilterBindingsTypes> Inspectable for ConnectionState<BT> {
851    fn record<Inspector: netstack3_base::Inspector>(&self, inspector: &mut Inspector) {
852        inspector.record_bool(
853            "established",
854            match self.establishment_lifecycle {
855                EstablishmentLifecycle::SeenOriginal | EstablishmentLifecycle::SeenReply => false,
856                EstablishmentLifecycle::Established => true,
857            },
858        );
859        inspector.record_inspectable_value("last_packet_time", &self.last_packet_time);
860    }
861}
862
863/// A conntrack connection with single ownership.
864///
865/// Because of this, many fields may be updated without synchronization. There
866/// is no chance of messing with other packets for this connection or ending up
867/// out-of-sync with the table (e.g. by changing the tuples once the connection
868/// has been inserted).
869#[derive(Derivative)]
870#[derivative(Debug(bound = "E: Debug"))]
871pub struct ConnectionExclusive<I: IpExt, E, BT: FilterBindingsTypes> {
872    pub(crate) inner: ConnectionCommon<I, E>,
873    pub(crate) state: ConnectionState<BT>,
874
875    /// When true, do not insert the connection into the conntrack table.
876    ///
877    /// This allows the stack to still operate against the connection (e.g. for
878    /// NAT), while guaranteeing that it won't make it into the table.
879    do_not_insert: bool,
880}
881
882impl<I: IpExt, E, BT: FilterBindingsTypes> ConnectionExclusive<I, E, BT> {
883    /// Turn this exclusive connection into a shared one. This is required in
884    /// order to insert into the [`Table`] table.
885    fn make_shared(self) -> Arc<ConnectionShared<I, E, BT>> {
886        Arc::new(ConnectionShared { inner: self.inner, state: Mutex::new(self.state) })
887    }
888
889    pub(crate) fn reply_tuple(&self) -> &Tuple<I> {
890        &self.inner.reply_tuple
891    }
892
893    pub(crate) fn rewrite_reply_dst_addr(&mut self, addr: I::Addr) {
894        self.inner.reply_tuple.dst_addr = addr;
895    }
896
897    pub(crate) fn rewrite_reply_src_addr(&mut self, addr: I::Addr) {
898        self.inner.reply_tuple.src_addr = addr;
899    }
900
901    pub(crate) fn rewrite_reply_src_port_or_id(&mut self, port_or_id: u16) {
902        self.inner.reply_tuple.src_port_or_id = port_or_id;
903        match self.inner.reply_tuple.protocol {
904            TransportProtocol::Icmp => {
905                // ICMP uses a single ID and conntrack keeps track of it in both
906                // ID fields. This makes it easier to keep a single logic to
907                // flip the direction. Hence we need to update the rest of the
908                // tuple.
909                //
910                // TODO(https://fxbug.dev/328064082): Probably needs revisiting
911                // as part of better support for ICMP request/response.
912                self.inner.reply_tuple.dst_port_or_id = port_or_id;
913            }
914            TransportProtocol::Tcp | TransportProtocol::Udp | TransportProtocol::Other(_) => {}
915        }
916    }
917
918    pub(crate) fn rewrite_reply_dst_port_or_id(&mut self, port_or_id: u16) {
919        self.inner.reply_tuple.dst_port_or_id = port_or_id;
920        match self.inner.reply_tuple.protocol {
921            TransportProtocol::Icmp => {
922                // ICMP uses a single ID and conntrack keeps track of it in both
923                // ID fields. This makes it easier to keep a single logic to
924                // flip the direction. Hence we need to update the rest of the
925                // tuple.
926                //
927                // TODO(https://fxbug.dev/328064082): Probably needs revisiting
928                // as part of better support for ICMP request/response.
929                self.inner.reply_tuple.src_port_or_id = port_or_id;
930            }
931            TransportProtocol::Tcp | TransportProtocol::Udp | TransportProtocol::Other(_) => {}
932        }
933    }
934}
935
936impl<I: IpExt, E: Default, BC: FilterBindingsContext> ConnectionExclusive<I, E, BC> {
937    pub(crate) fn from_deconstructed_packet(
938        bindings_ctx: &BC,
939        packet_metadata: &PacketMetadata<I>,
940    ) -> Option<Self> {
941        let (tuple, transport_data) = match packet_metadata {
942            PacketMetadata::Full { tuple, transport_data } => (tuple, transport_data),
943            PacketMetadata::IcmpError(_) => return None,
944        };
945
946        let reply_tuple = tuple.clone().invert();
947        let self_connected = reply_tuple == *tuple;
948
949        Some(Self {
950            inner: ConnectionCommon {
951                original_tuple: tuple.clone(),
952                reply_tuple,
953                external_data: E::default(),
954            },
955            state: ConnectionState {
956                last_packet_time: bindings_ctx.now(),
957                establishment_lifecycle: EstablishmentLifecycle::SeenOriginal,
958                protocol_state: match tuple.protocol {
959                    TransportProtocol::Tcp => {
960                        let (segment, payload_len) = transport_data
961                            .tcp_segment_and_len()
962                            .expect("protocol was TCP, so transport data should have TCP info");
963
964                        ProtocolState::Tcp(tcp::Connection::new(
965                            segment,
966                            payload_len,
967                            self_connected,
968                        )?)
969                    }
970                    TransportProtocol::Udp => ProtocolState::Udp,
971                    TransportProtocol::Icmp | TransportProtocol::Other(_) => ProtocolState::Other,
972                },
973            },
974            do_not_insert: false,
975        })
976    }
977}
978
979/// A conntrack connection with shared ownership.
980///
981/// All fields are private, because other packets, and the conntrack table
982/// itself, will be depending on them not to change. Fields must be accessed
983/// through the associated methods.
984#[derive(Derivative)]
985#[derivative(Debug(bound = "E: Debug"))]
986pub struct ConnectionShared<I: IpExt, E, BT: FilterBindingsTypes> {
987    inner: ConnectionCommon<I, E>,
988    state: Mutex<ConnectionState<BT>>,
989}
990
991/// The IP-agnostic transport protocol of a packet.
992#[allow(missing_docs)]
993#[derive(Copy, Clone, PartialEq, Eq, Hash, GenericOverIp)]
994#[generic_over_ip()]
995pub enum TransportProtocol {
996    Tcp,
997    Udp,
998    Icmp,
999    Other(u8),
1000}
1001
1002impl From<Ipv4Proto> for TransportProtocol {
1003    fn from(value: Ipv4Proto) -> Self {
1004        match value {
1005            Ipv4Proto::Proto(IpProto::Tcp) => TransportProtocol::Tcp,
1006            Ipv4Proto::Proto(IpProto::Udp) => TransportProtocol::Udp,
1007            Ipv4Proto::Icmp => TransportProtocol::Icmp,
1008            v => TransportProtocol::Other(v.into()),
1009        }
1010    }
1011}
1012
1013impl From<Ipv6Proto> for TransportProtocol {
1014    fn from(value: Ipv6Proto) -> Self {
1015        match value {
1016            Ipv6Proto::Proto(IpProto::Tcp) => TransportProtocol::Tcp,
1017            Ipv6Proto::Proto(IpProto::Udp) => TransportProtocol::Udp,
1018            Ipv6Proto::Icmpv6 => TransportProtocol::Icmp,
1019            v => TransportProtocol::Other(v.into()),
1020        }
1021    }
1022}
1023
1024impl From<IpProto> for TransportProtocol {
1025    fn from(value: IpProto) -> Self {
1026        match value {
1027            IpProto::Tcp => TransportProtocol::Tcp,
1028            IpProto::Udp => TransportProtocol::Udp,
1029            v @ IpProto::Reserved => TransportProtocol::Other(v.into()),
1030        }
1031    }
1032}
1033
1034impl Display for TransportProtocol {
1035    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1036        match self {
1037            TransportProtocol::Tcp => write!(f, "TCP"),
1038            TransportProtocol::Udp => write!(f, "UDP"),
1039            TransportProtocol::Icmp => write!(f, "ICMP"),
1040            TransportProtocol::Other(n) => write!(f, "Other({n})"),
1041        }
1042    }
1043}
1044
1045impl Debug for TransportProtocol {
1046    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1047        Display::fmt(&self, f)
1048    }
1049}
1050
1051impl<I: IpExt, E, BT: FilterBindingsTypes> ConnectionShared<I, E, BT> {
1052    fn is_expired(&self, now: BT::Instant) -> bool {
1053        self.state.lock().is_expired(now)
1054    }
1055}
1056
1057impl<I: IpExt, E: CompatibleWith, BT: FilterBindingsTypes> ConnectionShared<I, E, BT> {
1058    /// Returns whether the provided exclusive connection is compatible with this
1059    /// one, to the extent that a shared reference to this tracked connection could
1060    /// be adopted in place of the exclusive connection.
1061    pub(crate) fn compatible_with(&self, conn: &ConnectionExclusive<I, E, BT>) -> bool {
1062        self.inner.original_tuple == conn.inner.original_tuple
1063            && self.inner.reply_tuple == conn.inner.reply_tuple
1064            && self.inner.external_data.compatible_with(&conn.inner.external_data)
1065    }
1066}
1067
1068impl<I: IpExt, E: Inspectable, BT: FilterBindingsTypes> Inspectable for ConnectionShared<I, E, BT> {
1069    fn record<Inspector: netstack3_base::Inspector>(&self, inspector: &mut Inspector) {
1070        inspector.delegate_inspectable(&self.inner);
1071        inspector.delegate_inspectable(&*self.state.lock());
1072    }
1073}
1074
1075/// Allows a caller to check whether a given connection tracking entry (or some
1076/// configuration owned by that entry) is compatible with another.
1077pub trait CompatibleWith {
1078    /// Returns whether the provided entity is compatible with this entity in the
1079    /// context of connection tracking.
1080    fn compatible_with(&self, other: &Self) -> bool;
1081}
1082
1083/// A struct containing relevant fields extracted from the IP and transport
1084/// headers that means we only have to touch the incoming packet once. Also acts
1085/// as a witness type that the tuple and transport data have the same transport
1086/// protocol.
1087#[derive(Debug, Clone, PartialEq, Eq)]
1088pub enum PacketMetadata<I: IpExt> {
1089    Full { tuple: Tuple<I>, transport_data: TransportPacketData },
1090    IcmpError(Tuple<I>),
1091}
1092
1093impl<I: IpExt> PacketMetadata<I> {
1094    pub(crate) fn new(
1095        src_addr: I::Addr,
1096        dst_addr: I::Addr,
1097        protocol: TransportProtocol,
1098        transport_data: TransportPacketData,
1099    ) -> Self {
1100        match protocol {
1101            TransportProtocol::Tcp => {
1102                assert_matches!(transport_data, TransportPacketData::Tcp { .. })
1103            }
1104            TransportProtocol::Udp | TransportProtocol::Icmp | TransportProtocol::Other(_) => {
1105                assert_matches!(transport_data, TransportPacketData::Generic { .. })
1106            }
1107        }
1108
1109        Self::Full {
1110            tuple: Tuple::new(
1111                src_addr,
1112                dst_addr,
1113                transport_data.src_port(),
1114                transport_data.dst_port(),
1115                protocol,
1116            ),
1117            transport_data,
1118        }
1119    }
1120
1121    pub(crate) fn new_from_icmp_error(
1122        src_addr: I::Addr,
1123        dst_addr: I::Addr,
1124        src_port: u16,
1125        dst_port: u16,
1126        protocol: TransportProtocol,
1127    ) -> Self {
1128        Self::IcmpError(Tuple::new(src_addr, dst_addr, src_port, dst_port, protocol))
1129    }
1130
1131    pub(crate) fn tuple(&self) -> Tuple<I> {
1132        match self {
1133            PacketMetadata::Full { tuple, .. } => tuple.clone(),
1134            // We need to invert the tuple for ICMP errors because they aren't
1135            // necessarily ones that exist in the map due to being post-NAT.
1136            //
1137            // For example, if we have a connection A -> R -> B with Masquerade
1138            // NAT on R rewriting packets from A, we'll end up with the
1139            // following tuples for a connection originating at A:
1140            //
1141            // Original {
1142            //   src_ip: A
1143            //   dst_ip: B
1144            // }
1145            //
1146            // Reply {
1147            //   src_ip: B
1148            //   dst_ip: R
1149            // }
1150            //
1151            // However, if there's an ICMP error packet from B in response to A,
1152            // the tuple of the original packet in the ICMP error's payload will
1153            // look like R -> B. In the opposite direction, the ICMP error from
1154            // A in response to B will have an inner payload with a tuple that
1155            // looks like B -> A.
1156            //
1157            // If we invert the error tuple, then we get the correct connection
1158            // from the map, and the direction also corresponds to the direction
1159            // of the outer packet, which is what we need for NAT to work
1160            // correctly.
1161            PacketMetadata::IcmpError(tuple) => tuple.clone().invert(),
1162        }
1163    }
1164}
1165
1166#[cfg(test)]
1167pub(crate) mod testutils {
1168    use crate::packets::testutil::internal::{FakeIpPacket, FakeUdpPacket, TestIpExt};
1169
1170    /// Create a pair of UDP packets that are inverses of one another. Uses `index` to create
1171    /// packets that are unique.
1172    pub(crate) fn make_test_udp_packets<I: TestIpExt>(
1173        index: u32,
1174    ) -> (FakeIpPacket<I, FakeUdpPacket>, FakeIpPacket<I, FakeUdpPacket>) {
1175        // This ensures that, no matter how big index is, we'll always have
1176        // unique src and dst ports, and thus unique connections.
1177        let src_port = (index % (u16::MAX as u32)) as u16;
1178        let dst_port = (index / (u16::MAX as u32)) as u16;
1179
1180        let packet = FakeIpPacket::<I, _> {
1181            src_ip: I::SRC_IP,
1182            dst_ip: I::DST_IP,
1183            body: FakeUdpPacket { src_port, dst_port },
1184        };
1185        let reply_packet = FakeIpPacket::<I, _> {
1186            src_ip: I::DST_IP,
1187            dst_ip: I::SRC_IP,
1188            body: FakeUdpPacket { src_port: dst_port, dst_port: src_port },
1189        };
1190
1191        (packet, reply_packet)
1192    }
1193}
1194
1195#[cfg(test)]
1196mod tests {
1197    use assert_matches::assert_matches;
1198    use ip_test_macro::ip_test;
1199    use netstack3_base::testutil::FakeTimerCtxExt;
1200    use netstack3_base::{Control, IntoCoreTimerCtx, SegmentHeader, SeqNum, UnscaledWindowSize};
1201    use test_case::test_case;
1202
1203    use super::testutils::make_test_udp_packets;
1204    use super::*;
1205    use crate::context::testutil::{FakeBindingsCtx, FakeCtx};
1206    use crate::packets::testutil::internal::ArbitraryValue;
1207    use crate::packets::IpPacket;
1208    use crate::state::IpRoutines;
1209    use crate::testutil::TestIpExt;
1210
1211    impl CompatibleWith for () {
1212        fn compatible_with(&self, (): &()) -> bool {
1213            true
1214        }
1215    }
1216
1217    #[test_case(
1218        EstablishmentLifecycle::SeenOriginal,
1219        ConnectionDirection::Original
1220          => EstablishmentLifecycle::SeenOriginal
1221    )]
1222    #[test_case(
1223        EstablishmentLifecycle::SeenOriginal,
1224        ConnectionDirection::Reply
1225          => EstablishmentLifecycle::SeenReply
1226    )]
1227    #[test_case(
1228        EstablishmentLifecycle::SeenReply,
1229        ConnectionDirection::Original
1230          => EstablishmentLifecycle::Established
1231    )]
1232    #[test_case(
1233        EstablishmentLifecycle::SeenReply,
1234        ConnectionDirection::Reply
1235          => EstablishmentLifecycle::SeenReply
1236    )]
1237    #[test_case(
1238        EstablishmentLifecycle::Established,
1239        ConnectionDirection::Original
1240          => EstablishmentLifecycle::Established
1241    )]
1242    #[test_case(
1243        EstablishmentLifecycle::Established,
1244        ConnectionDirection::Reply
1245          => EstablishmentLifecycle::Established
1246    )]
1247    fn establishment_lifecycle_test(
1248        lifecycle: EstablishmentLifecycle,
1249        dir: ConnectionDirection,
1250    ) -> EstablishmentLifecycle {
1251        lifecycle.update(dir)
1252    }
1253
1254    #[ip_test(I)]
1255    #[test_case(TransportProtocol::Udp)]
1256    #[test_case(TransportProtocol::Tcp)]
1257    fn tuple_invert_udp_tcp<I: IpExt + TestIpExt>(protocol: TransportProtocol) {
1258        let orig_tuple = Tuple::<I> {
1259            protocol: protocol,
1260            src_addr: I::SRC_IP,
1261            dst_addr: I::DST_IP,
1262            src_port_or_id: I::SRC_PORT,
1263            dst_port_or_id: I::DST_PORT,
1264        };
1265
1266        let expected = Tuple::<I> {
1267            protocol: protocol,
1268            src_addr: I::DST_IP,
1269            dst_addr: I::SRC_IP,
1270            src_port_or_id: I::DST_PORT,
1271            dst_port_or_id: I::SRC_PORT,
1272        };
1273
1274        let inverted = orig_tuple.invert();
1275
1276        assert_eq!(inverted, expected);
1277    }
1278
1279    #[ip_test(I)]
1280    fn tuple_from_tcp_packet<I: IpExt + TestIpExt>() {
1281        let expected = Tuple::<I> {
1282            protocol: TransportProtocol::Tcp,
1283            src_addr: I::SRC_IP,
1284            dst_addr: I::DST_IP,
1285            src_port_or_id: I::SRC_PORT,
1286            dst_port_or_id: I::DST_PORT,
1287        };
1288
1289        let packet = PacketMetadata::<I>::new(
1290            I::SRC_IP,
1291            I::DST_IP,
1292            TransportProtocol::Tcp,
1293            TransportPacketData::Tcp {
1294                src_port: I::SRC_PORT,
1295                dst_port: I::DST_PORT,
1296                segment: SegmentHeader::arbitrary_value(),
1297                payload_len: 4,
1298            },
1299        );
1300
1301        assert_eq!(packet.tuple(), expected);
1302    }
1303
1304    #[ip_test(I)]
1305    fn connection_from_tuple<I: IpExt + TestIpExt>() {
1306        let bindings_ctx = FakeBindingsCtx::<I>::new();
1307
1308        let packet = PacketMetadata::<I>::new(
1309            I::SRC_IP,
1310            I::DST_IP,
1311            TransportProtocol::Udp,
1312            TransportPacketData::Generic { src_port: I::SRC_PORT, dst_port: I::DST_PORT },
1313        );
1314        let original_tuple = packet.tuple();
1315        let reply_tuple = packet.tuple().invert();
1316
1317        let connection =
1318            ConnectionExclusive::<_, (), _>::from_deconstructed_packet(&bindings_ctx, &packet)
1319                .unwrap();
1320
1321        assert_eq!(&connection.inner.original_tuple, &original_tuple);
1322        assert_eq!(&connection.inner.reply_tuple, &reply_tuple);
1323    }
1324
1325    #[ip_test(I)]
1326    fn connection_make_shared_has_same_underlying_info<I: IpExt + TestIpExt>() {
1327        let bindings_ctx = FakeBindingsCtx::<I>::new();
1328
1329        let packet = PacketMetadata::<I>::new(
1330            I::SRC_IP,
1331            I::DST_IP,
1332            TransportProtocol::Udp,
1333            TransportPacketData::Generic { src_port: I::SRC_PORT, dst_port: I::DST_PORT },
1334        );
1335        let original_tuple = packet.tuple().clone();
1336        let reply_tuple = original_tuple.clone().invert();
1337
1338        let mut connection =
1339            ConnectionExclusive::from_deconstructed_packet(&bindings_ctx, &packet).unwrap();
1340        connection.inner.external_data = 1234;
1341        let shared = connection.make_shared();
1342
1343        assert_eq!(shared.inner.original_tuple, original_tuple);
1344        assert_eq!(shared.inner.reply_tuple, reply_tuple);
1345        assert_eq!(shared.inner.external_data, 1234);
1346    }
1347
1348    enum ConnectionKind {
1349        Exclusive,
1350        Shared,
1351    }
1352
1353    #[ip_test(I)]
1354    #[test_case(ConnectionKind::Exclusive)]
1355    #[test_case(ConnectionKind::Shared)]
1356    fn connection_getters<I: IpExt + TestIpExt>(connection_kind: ConnectionKind) {
1357        let bindings_ctx = FakeBindingsCtx::<I>::new();
1358
1359        let packet = PacketMetadata::<I>::new(
1360            I::SRC_IP,
1361            I::DST_IP,
1362            TransportProtocol::Udp,
1363            TransportPacketData::Generic { src_port: I::SRC_PORT, dst_port: I::DST_PORT },
1364        );
1365        let original_tuple = packet.tuple().clone();
1366        let reply_tuple = original_tuple.clone().invert();
1367
1368        let mut connection =
1369            ConnectionExclusive::from_deconstructed_packet(&bindings_ctx, &packet).unwrap();
1370        connection.inner.external_data = 1234;
1371
1372        let connection = match connection_kind {
1373            ConnectionKind::Exclusive => Connection::Exclusive(connection),
1374            ConnectionKind::Shared => Connection::Shared(connection.make_shared()),
1375        };
1376
1377        assert_eq!(connection.original_tuple(), &original_tuple);
1378        assert_eq!(connection.reply_tuple(), &reply_tuple);
1379        assert_eq!(connection.external_data(), &1234);
1380    }
1381
1382    #[ip_test(I)]
1383    #[test_case(ConnectionKind::Exclusive)]
1384    #[test_case(ConnectionKind::Shared)]
1385    fn connection_direction<I: IpExt + TestIpExt>(connection_kind: ConnectionKind) {
1386        let bindings_ctx = FakeBindingsCtx::<I>::new();
1387
1388        let packet = PacketMetadata::<I>::new(
1389            I::SRC_IP,
1390            I::DST_IP,
1391            TransportProtocol::Udp,
1392            TransportPacketData::Generic { src_port: I::SRC_PORT, dst_port: I::DST_PORT },
1393        );
1394        let original_tuple = packet.tuple().clone();
1395        let reply_tuple = original_tuple.clone().invert();
1396
1397        let mut other_tuple = original_tuple.clone();
1398        other_tuple.src_port_or_id += 1;
1399
1400        let connection: ConnectionExclusive<_, (), _> =
1401            ConnectionExclusive::from_deconstructed_packet(&bindings_ctx, &packet).unwrap();
1402        let connection = match connection_kind {
1403            ConnectionKind::Exclusive => Connection::Exclusive(connection),
1404            ConnectionKind::Shared => Connection::Shared(connection.make_shared()),
1405        };
1406
1407        assert_matches!(connection.direction(&original_tuple), Some(ConnectionDirection::Original));
1408        assert_matches!(connection.direction(&reply_tuple), Some(ConnectionDirection::Reply));
1409        assert_matches!(connection.direction(&other_tuple), None);
1410    }
1411
1412    #[ip_test(I)]
1413    #[test_case(ConnectionKind::Exclusive)]
1414    #[test_case(ConnectionKind::Shared)]
1415    fn connection_update<I: IpExt + TestIpExt>(connection_kind: ConnectionKind) {
1416        let mut bindings_ctx = FakeBindingsCtx::<I>::new();
1417        bindings_ctx.sleep(Duration::from_secs(1));
1418
1419        let packet = PacketMetadata::<I>::new(
1420            I::SRC_IP,
1421            I::DST_IP,
1422            TransportProtocol::Udp,
1423            TransportPacketData::Generic { src_port: I::SRC_PORT, dst_port: I::DST_PORT },
1424        );
1425
1426        let reply_packet = PacketMetadata::<I>::new(
1427            I::DST_IP,
1428            I::SRC_IP,
1429            TransportProtocol::Udp,
1430            TransportPacketData::Generic { src_port: I::DST_PORT, dst_port: I::SRC_PORT },
1431        );
1432
1433        let connection =
1434            ConnectionExclusive::<_, (), _>::from_deconstructed_packet(&bindings_ctx, &packet)
1435                .unwrap();
1436        let mut connection = match connection_kind {
1437            ConnectionKind::Exclusive => Connection::Exclusive(connection),
1438            ConnectionKind::Shared => Connection::Shared(connection.make_shared()),
1439        };
1440
1441        assert_matches!(
1442            connection.update(&bindings_ctx, &packet, ConnectionDirection::Original),
1443            Ok(ConnectionUpdateAction::NoAction)
1444        );
1445        let state = connection.state();
1446        assert_matches!(state.establishment_lifecycle, EstablishmentLifecycle::SeenOriginal);
1447        assert_eq!(state.last_packet_time.offset, Duration::from_secs(1));
1448
1449        // Tuple in reply direction should set established to true and obviously
1450        // update last packet time.
1451        bindings_ctx.sleep(Duration::from_secs(1));
1452        assert_matches!(
1453            connection.update(&bindings_ctx, &reply_packet, ConnectionDirection::Reply),
1454            Ok(ConnectionUpdateAction::NoAction)
1455        );
1456        let state = connection.state();
1457        assert_matches!(state.establishment_lifecycle, EstablishmentLifecycle::SeenReply);
1458        assert_eq!(state.last_packet_time.offset, Duration::from_secs(2));
1459    }
1460
1461    #[ip_test(I)]
1462    #[test_case(ConnectionKind::Exclusive)]
1463    #[test_case(ConnectionKind::Shared)]
1464    fn skip_connection_update_for_icmp_error<I: IpExt + TestIpExt>(
1465        connection_kind: ConnectionKind,
1466    ) {
1467        let mut bindings_ctx = FakeBindingsCtx::<I>::new();
1468        bindings_ctx.sleep(Duration::from_secs(1));
1469
1470        let packet = PacketMetadata::<I>::new(
1471            I::SRC_IP,
1472            I::DST_IP,
1473            TransportProtocol::Udp,
1474            TransportPacketData::Generic { src_port: I::SRC_PORT, dst_port: I::DST_PORT },
1475        );
1476
1477        let reply_packet = PacketMetadata::<I>::new_from_icmp_error(
1478            I::DST_IP,
1479            I::SRC_IP,
1480            I::DST_PORT,
1481            I::SRC_PORT,
1482            TransportProtocol::Udp,
1483        );
1484
1485        let connection =
1486            ConnectionExclusive::<_, (), _>::from_deconstructed_packet(&bindings_ctx, &packet)
1487                .unwrap();
1488        let mut connection = match connection_kind {
1489            ConnectionKind::Exclusive => Connection::Exclusive(connection),
1490            ConnectionKind::Shared => Connection::Shared(connection.make_shared()),
1491        };
1492
1493        assert_matches!(
1494            connection.update(&bindings_ctx, &packet, ConnectionDirection::Original),
1495            Ok(ConnectionUpdateAction::NoAction)
1496        );
1497        let state = connection.state();
1498        assert_matches!(state.establishment_lifecycle, EstablishmentLifecycle::SeenOriginal);
1499        assert_eq!(state.last_packet_time.offset, Duration::from_secs(1));
1500
1501        // The tuple in the reply direction was actually inside an ICMP error,
1502        // so it shouldn't update anything.
1503        bindings_ctx.sleep(Duration::from_secs(1));
1504        assert_matches!(
1505            connection.update(&bindings_ctx, &reply_packet, ConnectionDirection::Reply),
1506            Ok(ConnectionUpdateAction::NoAction)
1507        );
1508        let state = connection.state();
1509        assert_matches!(state.establishment_lifecycle, EstablishmentLifecycle::SeenOriginal);
1510        assert_eq!(state.last_packet_time.offset, Duration::from_secs(1));
1511    }
1512
1513    #[ip_test(I)]
1514    fn skip_connection_creation_for_icmp_error<I: IpExt + TestIpExt>() {
1515        let mut bindings_ctx = FakeBindingsCtx::new();
1516        bindings_ctx.sleep(Duration::from_secs(1));
1517        let table = Table::<_, (), _>::new::<IntoCoreTimerCtx>(&mut bindings_ctx);
1518
1519        let packet = PacketMetadata::<I>::new_from_icmp_error(
1520            I::DST_IP,
1521            I::SRC_IP,
1522            I::DST_PORT,
1523            I::SRC_PORT,
1524            TransportProtocol::Udp,
1525        );
1526
1527        // Because `packet` is an ICMP error, we shouldn't create and return a
1528        // connection for it.
1529        assert!(table
1530            .get_connection_for_packet_and_update(&bindings_ctx, packet.clone())
1531            .expect("packet should be valid")
1532            .is_none());
1533        assert!(!table.contains_tuple(&packet.tuple()));
1534    }
1535
1536    #[ip_test(I)]
1537    fn table_get_exclusive_connection_and_finalize_shared<I: IpExt + TestIpExt>() {
1538        let mut bindings_ctx = FakeBindingsCtx::new();
1539        bindings_ctx.sleep(Duration::from_secs(1));
1540        let table = Table::<_, (), _>::new::<IntoCoreTimerCtx>(&mut bindings_ctx);
1541
1542        let packet = PacketMetadata::<I>::new(
1543            I::SRC_IP,
1544            I::DST_IP,
1545            TransportProtocol::Udp,
1546            TransportPacketData::Generic { src_port: I::SRC_PORT, dst_port: I::DST_PORT },
1547        );
1548
1549        let reply_packet = PacketMetadata::<I>::new(
1550            I::DST_IP,
1551            I::SRC_IP,
1552            TransportProtocol::Udp,
1553            TransportPacketData::Generic { src_port: I::DST_PORT, dst_port: I::SRC_PORT },
1554        );
1555
1556        let original_tuple = packet.tuple().clone();
1557        let reply_tuple = reply_packet.tuple().clone();
1558
1559        let (conn, dir) = table
1560            .get_connection_for_packet_and_update(&bindings_ctx, packet.clone())
1561            .expect("packet should be valid")
1562            .expect("connection should be present");
1563        let state = conn.state();
1564        assert_matches!(state.establishment_lifecycle, EstablishmentLifecycle::SeenOriginal);
1565        assert_eq!(state.last_packet_time.offset, Duration::from_secs(1));
1566
1567        // Since the connection isn't present in the map, we should get a
1568        // freshly-allocated exclusive connection and the map should not have
1569        // been touched.
1570        assert_matches!(conn, Connection::Exclusive(_));
1571        assert_eq!(dir, ConnectionDirection::Original);
1572        assert!(!table.contains_tuple(&original_tuple));
1573        assert!(!table.contains_tuple(&reply_tuple));
1574
1575        // Once we finalize the connection, it should be present in the map.
1576        assert_matches!(table.finalize_connection(&mut bindings_ctx, conn), Ok((true, Some(_))));
1577        assert!(table.contains_tuple(&original_tuple));
1578        assert!(table.contains_tuple(&reply_tuple));
1579
1580        // We should now get a shared connection back for packets in either
1581        // direction now that the connection is present in the table.
1582        bindings_ctx.sleep(Duration::from_secs(1));
1583        let (conn, dir) = table
1584            .get_connection_for_packet_and_update(&bindings_ctx, packet.clone())
1585            .expect("packet should be valid")
1586            .expect("connection should be present");
1587        assert_eq!(dir, ConnectionDirection::Original);
1588        let state = conn.state();
1589        assert_matches!(state.establishment_lifecycle, EstablishmentLifecycle::SeenOriginal);
1590        assert_eq!(state.last_packet_time.offset, Duration::from_secs(2));
1591        let conn = assert_matches!(conn, Connection::Shared(conn) => conn);
1592
1593        bindings_ctx.sleep(Duration::from_secs(1));
1594        let (reply_conn, dir) = table
1595            .get_connection_for_packet_and_update(&bindings_ctx, reply_packet)
1596            .expect("packet should be valid")
1597            .expect("connection should be present");
1598        assert_eq!(dir, ConnectionDirection::Reply);
1599        let state = reply_conn.state();
1600        assert_matches!(state.establishment_lifecycle, EstablishmentLifecycle::SeenReply);
1601        assert_eq!(state.last_packet_time.offset, Duration::from_secs(3));
1602        let reply_conn = assert_matches!(reply_conn, Connection::Shared(conn) => conn);
1603
1604        // We should be getting the same connection in both directions.
1605        assert!(Arc::ptr_eq(&conn, &reply_conn));
1606
1607        // Inserting the connection a second time shouldn't change the map.
1608        let (conn, _dir) = table
1609            .get_connection_for_packet_and_update(&bindings_ctx, packet)
1610            .expect("packet should be valid")
1611            .unwrap();
1612        assert_matches!(table.finalize_connection(&mut bindings_ctx, conn), Ok((false, Some(_))));
1613        assert!(table.contains_tuple(&original_tuple));
1614        assert!(table.contains_tuple(&reply_tuple));
1615    }
1616
1617    #[ip_test(I)]
1618    fn table_conflict<I: IpExt + TestIpExt>() {
1619        let mut bindings_ctx = FakeBindingsCtx::new();
1620        let table = Table::<_, (), _>::new::<IntoCoreTimerCtx>(&mut bindings_ctx);
1621
1622        let original_packet = PacketMetadata::<I>::new(
1623            I::SRC_IP,
1624            I::DST_IP,
1625            TransportProtocol::Udp,
1626            TransportPacketData::Generic { src_port: I::SRC_PORT, dst_port: I::DST_PORT },
1627        );
1628
1629        let nated_original_packet = PacketMetadata::<I>::new(
1630            I::SRC_IP,
1631            I::DST_IP,
1632            TransportProtocol::Udp,
1633            TransportPacketData::Generic { src_port: I::SRC_PORT + 1, dst_port: I::DST_PORT + 1 },
1634        );
1635
1636        let conn1 = Connection::Exclusive(
1637            ConnectionExclusive::<_, (), _>::from_deconstructed_packet(
1638                &bindings_ctx,
1639                &original_packet,
1640            )
1641            .unwrap(),
1642        );
1643
1644        // Fake NAT that ends up allocating the same reply tuple as an existing
1645        // connection.
1646        let mut conn2 = ConnectionExclusive::<_, (), _>::from_deconstructed_packet(
1647            &bindings_ctx,
1648            &original_packet,
1649        )
1650        .unwrap();
1651        conn2.inner.original_tuple = nated_original_packet.tuple().clone();
1652        let conn2 = Connection::Exclusive(conn2);
1653
1654        // Fake NAT that ends up allocating the same original tuple as an
1655        // existing connection.
1656        let mut conn3 = ConnectionExclusive::<_, (), _>::from_deconstructed_packet(
1657            &bindings_ctx,
1658            &original_packet,
1659        )
1660        .unwrap();
1661        conn3.inner.reply_tuple = nated_original_packet.tuple().clone().invert();
1662        let conn3 = Connection::Exclusive(conn3);
1663
1664        assert_matches!(table.finalize_connection(&mut bindings_ctx, conn1), Ok((true, Some(_))));
1665        assert_matches!(
1666            table.finalize_connection(&mut bindings_ctx, conn2),
1667            Err(FinalizeConnectionError::Conflict)
1668        );
1669        assert_matches!(
1670            table.finalize_connection(&mut bindings_ctx, conn3),
1671            Err(FinalizeConnectionError::Conflict)
1672        );
1673    }
1674
1675    #[ip_test(I)]
1676    fn table_conflict_identical_connection<
1677        I: IpExt + crate::packets::testutil::internal::TestIpExt,
1678    >() {
1679        let mut bindings_ctx = FakeBindingsCtx::new();
1680        let table = Table::<_, (), _>::new::<IntoCoreTimerCtx>(&mut bindings_ctx);
1681
1682        let original_packet = PacketMetadata::<I>::new(
1683            I::SRC_IP,
1684            I::DST_IP,
1685            TransportProtocol::Udp,
1686            TransportPacketData::Generic { src_port: I::SRC_PORT, dst_port: I::DST_PORT },
1687        );
1688
1689        // Simulate a race where two packets in the same flow both end up
1690        // creating identical exclusive connections.
1691
1692        let conn = Connection::Exclusive(
1693            ConnectionExclusive::<_, (), _>::from_deconstructed_packet(
1694                &bindings_ctx,
1695                &original_packet,
1696            )
1697            .unwrap(),
1698        );
1699        let finalized = assert_matches!(
1700            table.finalize_connection(&mut bindings_ctx, conn),
1701            Ok((true, Some(conn))) => conn
1702        );
1703
1704        let conn = Connection::Exclusive(
1705            ConnectionExclusive::<_, (), _>::from_deconstructed_packet(
1706                &bindings_ctx,
1707                &original_packet,
1708            )
1709            .unwrap(),
1710        );
1711        let conn = assert_matches!(
1712            table.finalize_connection(&mut bindings_ctx, conn),
1713            Ok((false, Some(conn))) => conn
1714        );
1715        assert!(Arc::ptr_eq(&finalized, &conn));
1716    }
1717
1718    #[derive(Copy, Clone)]
1719    enum GcTrigger {
1720        /// Call [`perform_gc`] function directly, avoiding any timer logic.
1721        Direct,
1722        /// Trigger a timer expiry, which indirectly calls into [`perform_gc`].
1723        Timer,
1724    }
1725
1726    #[ip_test(I)]
1727    #[test_case(GcTrigger::Direct)]
1728    #[test_case(GcTrigger::Timer)]
1729    fn garbage_collection<I: TestIpExt>(gc_trigger: GcTrigger) {
1730        fn perform_gc<I: TestIpExt>(
1731            core_ctx: &mut FakeCtx<I>,
1732            bindings_ctx: &mut FakeBindingsCtx<I>,
1733            gc_trigger: GcTrigger,
1734        ) {
1735            match gc_trigger {
1736                GcTrigger::Direct => core_ctx.conntrack().perform_gc(bindings_ctx),
1737                GcTrigger::Timer => {
1738                    for timer in bindings_ctx
1739                        .trigger_timers_until_instant(bindings_ctx.timer_ctx.instant.time, core_ctx)
1740                    {
1741                        assert_matches!(timer, FilterTimerId::ConntrackGc(_));
1742                    }
1743                }
1744            }
1745        }
1746
1747        let mut bindings_ctx = FakeBindingsCtx::new();
1748        let mut core_ctx = FakeCtx::with_ip_routines(&mut bindings_ctx, IpRoutines::default());
1749
1750        let first_packet = PacketMetadata::<I>::new(
1751            I::SRC_IP,
1752            I::DST_IP,
1753            TransportProtocol::Udp,
1754            TransportPacketData::Generic { src_port: I::SRC_PORT, dst_port: I::DST_PORT },
1755        );
1756
1757        let second_packet = PacketMetadata::<I>::new(
1758            I::SRC_IP,
1759            I::DST_IP,
1760            TransportProtocol::Udp,
1761            TransportPacketData::Generic { src_port: I::SRC_PORT + 1, dst_port: I::DST_PORT },
1762        );
1763        let second_packet_reply = PacketMetadata::<I>::new(
1764            I::DST_IP,
1765            I::SRC_IP,
1766            TransportProtocol::Udp,
1767            TransportPacketData::Generic { src_port: I::DST_PORT, dst_port: I::SRC_PORT + 1 },
1768        );
1769
1770        let first_tuple = first_packet.tuple().clone();
1771        let first_tuple_reply = first_tuple.clone().invert();
1772        let second_tuple = second_packet.tuple().clone();
1773        let second_tuple_reply = second_packet_reply.tuple().clone();
1774
1775        // T=0: Packets for two connections come in.
1776        let (conn, _dir) = core_ctx
1777            .conntrack()
1778            .get_connection_for_packet_and_update(&bindings_ctx, first_packet)
1779            .expect("packet should be valid")
1780            .expect("packet should be trackable");
1781        assert_matches!(
1782            core_ctx
1783                .conntrack()
1784                .finalize_connection(&mut bindings_ctx, conn)
1785                .expect("connection finalize should succeed"),
1786            (true, Some(_))
1787        );
1788        let (conn, _dir) = core_ctx
1789            .conntrack()
1790            .get_connection_for_packet_and_update(&bindings_ctx, second_packet)
1791            .expect("packet should be valid")
1792            .expect("packet should be trackable");
1793        assert_matches!(
1794            core_ctx
1795                .conntrack()
1796                .finalize_connection(&mut bindings_ctx, conn)
1797                .expect("connection finalize should succeed"),
1798            (true, Some(_))
1799        );
1800        assert!(core_ctx.conntrack().contains_tuple(&first_tuple));
1801        assert!(core_ctx.conntrack().contains_tuple(&second_tuple));
1802        assert_eq!(core_ctx.conntrack().inner.lock().table.len(), 4);
1803
1804        // T=GC_INTERVAL: Triggering a GC does not clean up any connections,
1805        // because no connections are stale yet.
1806        bindings_ctx.sleep(GC_INTERVAL);
1807        perform_gc(&mut core_ctx, &mut bindings_ctx, gc_trigger);
1808        assert_eq!(core_ctx.conntrack().contains_tuple(&first_tuple), true);
1809        assert_eq!(core_ctx.conntrack().contains_tuple(&first_tuple_reply), true);
1810        assert_eq!(core_ctx.conntrack().contains_tuple(&second_tuple), true);
1811        assert_eq!(core_ctx.conntrack().contains_tuple(&second_tuple_reply), true);
1812        assert_eq!(core_ctx.conntrack().inner.lock().table.len(), 4);
1813
1814        // T=GC_INTERVAL a packet for just the second connection comes in.
1815        let (conn, _dir) = core_ctx
1816            .conntrack()
1817            .get_connection_for_packet_and_update(&bindings_ctx, second_packet_reply)
1818            .expect("packet should be valid")
1819            .expect("packet should be trackable");
1820        assert_matches!(conn.state().establishment_lifecycle, EstablishmentLifecycle::SeenReply);
1821        assert_matches!(
1822            core_ctx
1823                .conntrack()
1824                .finalize_connection(&mut bindings_ctx, conn)
1825                .expect("connection finalize should succeed"),
1826            (false, Some(_))
1827        );
1828        assert_eq!(core_ctx.conntrack().contains_tuple(&first_tuple), true);
1829        assert_eq!(core_ctx.conntrack().contains_tuple(&first_tuple_reply), true);
1830        assert_eq!(core_ctx.conntrack().contains_tuple(&second_tuple), true);
1831        assert_eq!(core_ctx.conntrack().contains_tuple(&second_tuple_reply), true);
1832        assert_eq!(core_ctx.conntrack().inner.lock().table.len(), 4);
1833
1834        // The state in the table at this point is:
1835        // Connection 1:
1836        //   - Last packet seen at T=0
1837        //   - Expires after T=CONNECTION_EXPIRY_TIME_UDP
1838        // Connection 2:
1839        //   - Last packet seen at T=GC_INTERVAL
1840        //   - Expires after CONNECTION_EXPIRY_TIME_UDP + GC_INTERVAL
1841
1842        // T=2*GC_INTERVAL: Triggering a GC does not clean up any connections.
1843        bindings_ctx.sleep(GC_INTERVAL);
1844        perform_gc(&mut core_ctx, &mut bindings_ctx, gc_trigger);
1845        assert_eq!(core_ctx.conntrack().contains_tuple(&first_tuple), true);
1846        assert_eq!(core_ctx.conntrack().contains_tuple(&first_tuple_reply), true);
1847        assert_eq!(core_ctx.conntrack().contains_tuple(&second_tuple), true);
1848        assert_eq!(core_ctx.conntrack().contains_tuple(&second_tuple_reply), true);
1849        assert_eq!(core_ctx.conntrack().inner.lock().table.len(), 4);
1850
1851        // Time advances to expiry for the first packet
1852        // (T=CONNECTION_EXPIRY_TIME_UDP) trigger gc and note that the first
1853        // connection was cleaned up
1854        bindings_ctx.sleep(CONNECTION_EXPIRY_TIME_UDP - 2 * GC_INTERVAL);
1855        perform_gc(&mut core_ctx, &mut bindings_ctx, gc_trigger);
1856        assert_eq!(core_ctx.conntrack().contains_tuple(&first_tuple), false);
1857        assert_eq!(core_ctx.conntrack().contains_tuple(&first_tuple_reply), false);
1858        assert_eq!(core_ctx.conntrack().contains_tuple(&second_tuple), true);
1859        assert_eq!(core_ctx.conntrack().contains_tuple(&second_tuple_reply), true);
1860        assert_eq!(core_ctx.conntrack().inner.lock().table.len(), 2);
1861
1862        // Advance time past the expiry time for the second connection
1863        // (T=CONNECTION_EXPIRY_TIME_UDP + GC_INTERVAL) and see that it is
1864        // cleaned up.
1865        bindings_ctx.sleep(GC_INTERVAL);
1866        perform_gc(&mut core_ctx, &mut bindings_ctx, gc_trigger);
1867        assert_eq!(core_ctx.conntrack().contains_tuple(&first_tuple), false);
1868        assert_eq!(core_ctx.conntrack().contains_tuple(&first_tuple_reply), false);
1869        assert_eq!(core_ctx.conntrack().contains_tuple(&second_tuple), false);
1870        assert_eq!(core_ctx.conntrack().contains_tuple(&second_tuple_reply), false);
1871        assert!(core_ctx.conntrack().inner.lock().table.is_empty());
1872    }
1873
1874    fn fill_table<I, E, BC>(
1875        bindings_ctx: &mut BC,
1876        table: &Table<I, E, BC>,
1877        entries: impl Iterator<Item = u32>,
1878        establishment_lifecycle: EstablishmentLifecycle,
1879    ) where
1880        I: IpExt + TestIpExt,
1881        E: Debug + Default + Send + Sync + PartialEq + CompatibleWith + 'static,
1882        BC: FilterBindingsContext,
1883    {
1884        for i in entries {
1885            let (packet, reply_packet) = make_test_udp_packets(i);
1886            let packet = packet.conntrack_packet().unwrap();
1887            let reply_packet = reply_packet.conntrack_packet().unwrap();
1888
1889            let (conn, _dir) = table
1890                .get_connection_for_packet_and_update(&bindings_ctx, packet.clone())
1891                .expect("packet should be valid")
1892                .expect("packet should be trackable");
1893            assert_matches!(
1894                table
1895                    .finalize_connection(bindings_ctx, conn)
1896                    .expect("connection finalize should succeed"),
1897                (true, Some(_))
1898            );
1899
1900            if establishment_lifecycle >= EstablishmentLifecycle::SeenReply {
1901                let (conn, _dir) = table
1902                    .get_connection_for_packet_and_update(&bindings_ctx, reply_packet.clone())
1903                    .expect("packet should be valid")
1904                    .expect("packet should be trackable");
1905                assert_matches!(
1906                    table
1907                        .finalize_connection(bindings_ctx, conn)
1908                        .expect("connection finalize should succeed"),
1909                    (false, Some(_))
1910                );
1911
1912                if establishment_lifecycle >= EstablishmentLifecycle::Established {
1913                    let (conn, _dir) = table
1914                        .get_connection_for_packet_and_update(&bindings_ctx, packet)
1915                        .expect("packet should be valid")
1916                        .expect("packet should be trackable");
1917                    assert_matches!(
1918                        table
1919                            .finalize_connection(bindings_ctx, conn)
1920                            .expect("connection finalize should succeed"),
1921                        (false, Some(_))
1922                    );
1923                }
1924            }
1925        }
1926    }
1927
1928    #[ip_test(I)]
1929    #[test_case(EstablishmentLifecycle::SeenOriginal; "existing connections unestablished")]
1930    #[test_case(EstablishmentLifecycle::SeenReply; "existing connections partially established")]
1931    #[test_case(EstablishmentLifecycle::Established; "existing connections established")]
1932    fn table_size_limit_evict_less_established<I: IpExt + TestIpExt>(
1933        existing_lifecycle: EstablishmentLifecycle,
1934    ) {
1935        let mut bindings_ctx = FakeBindingsCtx::<I>::new();
1936        bindings_ctx.sleep(Duration::from_secs(1));
1937        let table = Table::<_, (), _>::new::<IntoCoreTimerCtx>(&mut bindings_ctx);
1938
1939        fill_table(
1940            &mut bindings_ctx,
1941            &table,
1942            0..(MAXIMUM_ENTRIES / 2).try_into().unwrap(),
1943            existing_lifecycle,
1944        );
1945
1946        // The table should be full whether or not the connections are
1947        // established since finalize_connection always inserts the connection
1948        // under the original and reply tuples.
1949        assert_eq!(table.inner.lock().table.len(), MAXIMUM_ENTRIES);
1950
1951        let (packet, _) = make_test_udp_packets((MAXIMUM_ENTRIES / 2).try_into().unwrap());
1952        let packet = packet.conntrack_packet().unwrap();
1953        let (conn, _dir) = table
1954            .get_connection_for_packet_and_update(&bindings_ctx, packet)
1955            .expect("packet should be valid")
1956            .expect("packet should be trackable");
1957        if existing_lifecycle == EstablishmentLifecycle::Established {
1958            // Inserting a new connection should fail because it would grow the
1959            // table.
1960            assert_matches!(
1961                table.finalize_connection(&mut bindings_ctx, conn),
1962                Err(FinalizeConnectionError::TableFull)
1963            );
1964
1965            // Inserting an existing connection again should succeed because
1966            // it's not growing the table.
1967            let (packet, _) = make_test_udp_packets((MAXIMUM_ENTRIES / 2 - 1).try_into().unwrap());
1968            let packet = packet.conntrack_packet().unwrap();
1969            let (conn, _dir) = table
1970                .get_connection_for_packet_and_update(&bindings_ctx, packet)
1971                .expect("packet should be valid")
1972                .expect("packet should be trackable");
1973            assert_matches!(
1974                table
1975                    .finalize_connection(&mut bindings_ctx, conn)
1976                    .expect("connection finalize should succeed"),
1977                (false, Some(_))
1978            );
1979        } else {
1980            assert_matches!(
1981                table
1982                    .finalize_connection(&mut bindings_ctx, conn)
1983                    .expect("connection finalize should succeed"),
1984                (true, Some(_))
1985            );
1986        }
1987    }
1988
1989    #[ip_test(I)]
1990    fn table_size_limit_evict_expired<I: IpExt + TestIpExt>() {
1991        let mut bindings_ctx = FakeBindingsCtx::<I>::new();
1992        let table = Table::<_, (), _>::new::<IntoCoreTimerCtx>(&mut bindings_ctx);
1993
1994        // Add one connection that expires a second sooner than the others.
1995        let evicted_tuple = {
1996            let (packet, _) = make_test_udp_packets(0);
1997            let packet = packet.conntrack_packet().unwrap();
1998            packet.tuple().clone()
1999        };
2000        fill_table(&mut bindings_ctx, &table, 0..=0, EstablishmentLifecycle::Established);
2001        bindings_ctx.sleep(Duration::from_secs(1));
2002        fill_table(
2003            &mut bindings_ctx,
2004            &table,
2005            1..(MAXIMUM_ENTRIES / 2).try_into().unwrap(),
2006            EstablishmentLifecycle::Established,
2007        );
2008
2009        assert_eq!(table.inner.lock().table.len(), MAXIMUM_ENTRIES);
2010        assert!(table.contains_tuple(&evicted_tuple));
2011
2012        let (packet, _) = make_test_udp_packets((MAXIMUM_ENTRIES / 2).try_into().unwrap());
2013        let packet = packet.conntrack_packet().unwrap();
2014        // The table is full, and no connections can be evicted (they're all
2015        // established and unexpired), so we can't insert a new connection.
2016        let (conn, _dir) = table
2017            .get_connection_for_packet_and_update(&bindings_ctx, packet.clone())
2018            .expect("packet should be valid")
2019            .expect("packet should be trackable");
2020        assert_matches!(
2021            table.finalize_connection(&mut bindings_ctx, conn),
2022            Err(FinalizeConnectionError::TableFull)
2023        );
2024
2025        // Now the first connection can be evicted because it's expired, and we
2026        // see that we're able to insert a new connection.
2027        bindings_ctx.sleep(CONNECTION_EXPIRY_TIME_UDP - Duration::from_secs(1));
2028        let (conn, _dir) = table
2029            .get_connection_for_packet_and_update(&bindings_ctx, packet)
2030            .expect("packet should be valid")
2031            .expect("packet should be trackable");
2032        assert_matches!(table.finalize_connection(&mut bindings_ctx, conn), Ok(_));
2033        assert!(!table.contains_tuple(&evicted_tuple));
2034    }
2035
2036    #[ip_test(I)]
2037    fn table_size_limit_less_established<I: IpExt + TestIpExt>() {
2038        let mut bindings_ctx = FakeBindingsCtx::<I>::new();
2039        let table = Table::<_, (), _>::new::<IntoCoreTimerCtx>(&mut bindings_ctx);
2040
2041        let evicted_tuple = {
2042            let (packet, _) = make_test_udp_packets(0);
2043            let packet = packet.conntrack_packet().unwrap();
2044            packet.tuple().clone()
2045        };
2046        // Add one connection that expires a second sooner than the others.
2047        fill_table(&mut bindings_ctx, &table, 0..=0, EstablishmentLifecycle::SeenOriginal);
2048        bindings_ctx.sleep(Duration::from_secs(1));
2049        fill_table(&mut bindings_ctx, &table, 1..=1, EstablishmentLifecycle::SeenOriginal);
2050        fill_table(
2051            &mut bindings_ctx,
2052            &table,
2053            2..(MAXIMUM_ENTRIES / 2).try_into().unwrap(),
2054            EstablishmentLifecycle::SeenReply,
2055        );
2056
2057        assert_eq!(table.inner.lock().table.len(), MAXIMUM_ENTRIES);
2058        assert!(table.contains_tuple(&evicted_tuple));
2059
2060        // We can insert since all connections in the table are eligible for
2061        // eviction, but we want to be sure that the least established
2062        // connection was the one that's actually evicted.
2063        let (packet, _) = make_test_udp_packets((MAXIMUM_ENTRIES / 2).try_into().unwrap());
2064        let packet = packet.conntrack_packet().unwrap();
2065        let (conn, _dir) = table
2066            .get_connection_for_packet_and_update(&bindings_ctx, packet)
2067            .expect("packet should be valid")
2068            .expect("packet should be trackable");
2069        assert_matches!(table.finalize_connection(&mut bindings_ctx, conn), Ok(_));
2070        assert!(!table.contains_tuple(&evicted_tuple));
2071    }
2072
2073    #[cfg(target_os = "fuchsia")]
2074    #[ip_test(I)]
2075    fn inspect<I: IpExt + TestIpExt>() {
2076        use alloc::boxed::Box;
2077        use alloc::string::ToString;
2078        use diagnostics_assertions::assert_data_tree;
2079        use diagnostics_traits::FuchsiaInspector;
2080        use fuchsia_inspect::Inspector;
2081
2082        let mut bindings_ctx = FakeBindingsCtx::<I>::new();
2083        bindings_ctx.sleep(Duration::from_secs(1));
2084        let table = Table::<_, (), _>::new::<IntoCoreTimerCtx>(&mut bindings_ctx);
2085
2086        {
2087            let inspector = Inspector::new(Default::default());
2088            let mut bindings_inspector = FuchsiaInspector::<()>::new(inspector.root());
2089            bindings_inspector.delegate_inspectable(&table);
2090
2091            assert_data_tree!(inspector, "root": {
2092                "table_limit_drops": 0u64,
2093                "table_limit_hits": 0u64,
2094                "num_entries": 0u64,
2095                "connections": {},
2096            });
2097        }
2098
2099        // Insert the first connection into the table in an unestablished state.
2100        // This will later be evicted when the table fills up.
2101        let (packet, _) = make_test_udp_packets::<I>(0);
2102        let packet = packet.conntrack_packet().unwrap();
2103        let (conn, _dir) = table
2104            .get_connection_for_packet_and_update(&bindings_ctx, packet)
2105            .expect("packet should be valid")
2106            .expect("packet should be trackable");
2107        assert_matches!(conn.state().establishment_lifecycle, EstablishmentLifecycle::SeenOriginal);
2108        assert_matches!(
2109            table
2110                .finalize_connection(&mut bindings_ctx, conn)
2111                .expect("connection finalize should succeed"),
2112            (true, Some(_))
2113        );
2114
2115        {
2116            let inspector = Inspector::new(Default::default());
2117            let mut bindings_inspector = FuchsiaInspector::<()>::new(inspector.root());
2118            bindings_inspector.delegate_inspectable(&table);
2119
2120            assert_data_tree!(inspector, "root": {
2121                "table_limit_drops": 0u64,
2122                "table_limit_hits": 0u64,
2123                "num_entries": 2u64,
2124                "connections": {
2125                    "0": {
2126                        "original_tuple": {
2127                            "protocol": "UDP",
2128                            "src_addr": I::SRC_IP.to_string(),
2129                            "dst_addr": I::DST_IP.to_string(),
2130                            "src_port_or_id": 0u64,
2131                            "dst_port_or_id": 0u64,
2132                        },
2133                        "reply_tuple": {
2134                            "protocol": "UDP",
2135                            "src_addr": I::DST_IP.to_string(),
2136                            "dst_addr": I::SRC_IP.to_string(),
2137                            "src_port_or_id": 0u64,
2138                            "dst_port_or_id": 0u64,
2139                        },
2140                        "external_data": {},
2141                        "established": false,
2142                        "last_packet_time": 1_000_000_000u64,
2143                    }
2144                },
2145            });
2146        }
2147
2148        // Fill the table up the rest of the way.
2149        fill_table(
2150            &mut bindings_ctx,
2151            &table,
2152            1..(MAXIMUM_ENTRIES / 2).try_into().unwrap(),
2153            EstablishmentLifecycle::Established,
2154        );
2155
2156        assert_eq!(table.inner.lock().table.len(), MAXIMUM_ENTRIES);
2157
2158        // This first one should succeed because it can evict the
2159        // non-established connection.
2160        let (packet, reply_packet) =
2161            make_test_udp_packets((MAXIMUM_ENTRIES / 2).try_into().unwrap());
2162        let packet = packet.conntrack_packet().unwrap();
2163        let reply_packet = reply_packet.conntrack_packet().unwrap();
2164        let (conn, _dir) = table
2165            .get_connection_for_packet_and_update(&bindings_ctx, packet.clone())
2166            .expect("packet should be valid")
2167            .expect("packet should be trackable");
2168        assert_matches!(
2169            table
2170                .finalize_connection(&mut bindings_ctx, conn)
2171                .expect("connection finalize should succeed"),
2172            (true, Some(_))
2173        );
2174        let (conn, _dir) = table
2175            .get_connection_for_packet_and_update(&bindings_ctx, reply_packet)
2176            .expect("packet should be valid")
2177            .expect("packet should be trackable");
2178        assert_matches!(
2179            table
2180                .finalize_connection(&mut bindings_ctx, conn)
2181                .expect("connection finalize should succeed"),
2182            (false, Some(_))
2183        );
2184        let (conn, _dir) = table
2185            .get_connection_for_packet_and_update(&bindings_ctx, packet)
2186            .expect("packet should be valid")
2187            .expect("packet should be trackable");
2188        assert_matches!(
2189            table
2190                .finalize_connection(&mut bindings_ctx, conn)
2191                .expect("connection finalize should succeed"),
2192            (false, Some(_))
2193        );
2194
2195        // This next one should fail because there are no connections left to
2196        // evict.
2197        let (packet, _) = make_test_udp_packets((MAXIMUM_ENTRIES / 2 + 1).try_into().unwrap());
2198        let packet = packet.conntrack_packet().unwrap();
2199        let (conn, _dir) = table
2200            .get_connection_for_packet_and_update(&bindings_ctx, packet)
2201            .expect("packet should be valid")
2202            .expect("packet should be trackable");
2203        assert_matches!(
2204            table.finalize_connection(&mut bindings_ctx, conn),
2205            Err(FinalizeConnectionError::TableFull)
2206        );
2207
2208        {
2209            let inspector = Inspector::new(Default::default());
2210            let mut bindings_inspector = FuchsiaInspector::<()>::new(inspector.root());
2211            bindings_inspector.delegate_inspectable(&table);
2212
2213            assert_data_tree!(inspector, "root": contains {
2214                "table_limit_drops": 1u64,
2215                "table_limit_hits": 2u64,
2216                "num_entries": MAXIMUM_ENTRIES as u64,
2217            });
2218        }
2219    }
2220
2221    #[ip_test(I)]
2222    fn self_connected_socket<I: IpExt + TestIpExt>() {
2223        let mut bindings_ctx = FakeBindingsCtx::new();
2224        let table = Table::<_, (), _>::new::<IntoCoreTimerCtx>(&mut bindings_ctx);
2225
2226        let packet = PacketMetadata::<I>::new(
2227            I::SRC_IP,
2228            I::SRC_IP,
2229            TransportProtocol::Udp,
2230            TransportPacketData::Generic { src_port: I::SRC_PORT, dst_port: I::SRC_PORT },
2231        );
2232
2233        let tuple = packet.tuple().clone();
2234        let reply_tuple = tuple.clone().invert();
2235
2236        assert_eq!(tuple, reply_tuple);
2237
2238        let (conn, _dir) = table
2239            .get_connection_for_packet_and_update(&bindings_ctx, packet)
2240            .expect("packet should be valid")
2241            .expect("packet should be trackable");
2242        let state = conn.state();
2243        // Since we can't differentiate between the original and reply tuple,
2244        // the connection ends up being marked established immediately.
2245        assert_matches!(state.establishment_lifecycle, EstablishmentLifecycle::SeenReply);
2246
2247        assert_matches!(conn, Connection::Exclusive(_));
2248        assert!(!table.contains_tuple(&tuple));
2249
2250        // Once we finalize the connection, it should be present in the map.
2251        assert_matches!(table.finalize_connection(&mut bindings_ctx, conn), Ok((true, Some(_))));
2252        assert!(table.contains_tuple(&tuple));
2253
2254        // There should be a single connection in the table, despite there only
2255        // being a single tuple.
2256        assert_eq!(table.inner.lock().table.len(), 1);
2257
2258        bindings_ctx.sleep(CONNECTION_EXPIRY_TIME_UDP);
2259        table.perform_gc(&mut bindings_ctx);
2260
2261        assert!(table.inner.lock().table.is_empty());
2262    }
2263
2264    #[ip_test(I)]
2265    fn remove_entry_on_update<I: IpExt + TestIpExt>() {
2266        let mut bindings_ctx = FakeBindingsCtx::new();
2267        let table = Table::<_, (), _>::new::<IntoCoreTimerCtx>(&mut bindings_ctx);
2268
2269        let original_packet = PacketMetadata::<I>::new(
2270            I::SRC_IP,
2271            I::DST_IP,
2272            TransportProtocol::Tcp,
2273            TransportPacketData::Tcp {
2274                src_port: I::SRC_PORT,
2275                dst_port: I::DST_PORT,
2276                segment: SegmentHeader {
2277                    seq: SeqNum::new(1024),
2278                    wnd: UnscaledWindowSize::from(16u16),
2279                    control: Some(Control::SYN),
2280                    ..Default::default()
2281                },
2282                payload_len: 0,
2283            },
2284        );
2285
2286        let reply_packet = PacketMetadata::<I>::new(
2287            I::DST_IP,
2288            I::SRC_IP,
2289            TransportProtocol::Tcp,
2290            TransportPacketData::Tcp {
2291                src_port: I::DST_PORT,
2292                dst_port: I::SRC_PORT,
2293                segment: SegmentHeader {
2294                    seq: SeqNum::new(0),
2295                    ack: Some(SeqNum::new(1025)),
2296                    wnd: UnscaledWindowSize::from(16u16),
2297                    control: Some(Control::RST),
2298                    ..Default::default()
2299                },
2300                payload_len: 0,
2301            },
2302        );
2303
2304        let tuple = original_packet.tuple().clone();
2305        let reply_tuple = tuple.clone().invert();
2306
2307        let (conn, _dir) = table
2308            .get_connection_for_packet_and_update(&bindings_ctx, original_packet)
2309            .expect("packet should be valid")
2310            .expect("packet should be trackable");
2311        assert_matches!(table.finalize_connection(&mut bindings_ctx, conn), Ok((true, Some(_))));
2312
2313        assert!(table.contains_tuple(&tuple));
2314        assert!(table.contains_tuple(&reply_tuple));
2315
2316        // Sending the reply RST through should result in the connection being
2317        // removed from the table.
2318        let (conn, _dir) = table
2319            .get_connection_for_packet_and_update(&bindings_ctx, reply_packet)
2320            .expect("packet should be valid")
2321            .expect("packet should be trackable");
2322
2323        assert!(!table.contains_tuple(&tuple));
2324        assert!(!table.contains_tuple(&reply_tuple));
2325        assert!(table.inner.lock().table.is_empty());
2326
2327        // The connection should not added back on finalization.
2328        assert_matches!(table.finalize_connection(&mut bindings_ctx, conn), Ok((false, Some(_))));
2329
2330        assert!(!table.contains_tuple(&tuple));
2331        assert!(!table.contains_tuple(&reply_tuple));
2332        assert!(table.inner.lock().table.is_empty());
2333
2334        // GC should complete successfully.
2335        bindings_ctx.sleep(Duration::from_secs(60 * 60 * 24 * 6));
2336        table.perform_gc(&mut bindings_ctx);
2337    }
2338
2339    #[ip_test(I)]
2340    fn do_not_insert<I: IpExt + TestIpExt>() {
2341        let mut bindings_ctx = FakeBindingsCtx::new();
2342        let table = Table::<_, (), _>::new::<IntoCoreTimerCtx>(&mut bindings_ctx);
2343
2344        let packet = PacketMetadata::<I>::new(
2345            I::SRC_IP,
2346            I::DST_IP,
2347            TransportProtocol::Udp,
2348            TransportPacketData::Generic { src_port: I::SRC_PORT, dst_port: I::DST_PORT },
2349        );
2350
2351        let tuple = packet.tuple().clone();
2352        let reply_tuple = tuple.clone().invert();
2353
2354        let (conn, _dir) = table
2355            .get_connection_for_packet_and_update(&bindings_ctx, packet)
2356            .expect("packet should be valid")
2357            .expect("packet should be trackable");
2358        let mut conn = assert_matches!(conn, Connection::Exclusive(conn) => conn);
2359        conn.do_not_insert = true;
2360        assert_matches!(
2361            table.finalize_connection(&mut bindings_ctx, Connection::Exclusive(conn)),
2362            Ok((false, None))
2363        );
2364
2365        assert!(!table.contains_tuple(&tuple));
2366        assert!(!table.contains_tuple(&reply_tuple));
2367    }
2368}