summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/conntrack-other.c11
-rw-r--r--lib/conntrack-private.h21
-rw-r--r--lib/conntrack-tcp.c20
-rw-r--r--lib/conntrack.c186
-rw-r--r--lib/conntrack.h36
5 files changed, 243 insertions, 31 deletions
diff --git a/lib/conntrack-other.c b/lib/conntrack-other.c
index 295cb2c87..2920889e7 100644
--- a/lib/conntrack-other.c
+++ b/lib/conntrack-other.c
@@ -43,8 +43,8 @@ conn_other_cast(const struct conn *conn)
}
static enum ct_update_res
-other_conn_update(struct conn *conn_, struct dp_packet *pkt OVS_UNUSED,
- bool reply, long long now)
+other_conn_update(struct conn *conn_, struct conntrack_bucket *ctb,
+ struct dp_packet *pkt OVS_UNUSED, bool reply, long long now)
{
struct conn_other *conn = conn_other_cast(conn_);
@@ -54,7 +54,7 @@ other_conn_update(struct conn *conn_, struct dp_packet *pkt OVS_UNUSED,
conn->state = OTHERS_MULTIPLE;
}
- update_expiration(conn_, other_timeouts[conn->state], now);
+ conn_update_expiration(ctb, &conn->up, other_timeouts[conn->state], now);
return CT_UPDATE_VALID;
}
@@ -66,14 +66,15 @@ other_valid_new(struct dp_packet *pkt OVS_UNUSED)
}
static struct conn *
-other_new_conn(struct dp_packet *pkt OVS_UNUSED, long long now)
+other_new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt OVS_UNUSED,
+ long long now)
{
struct conn_other *conn;
conn = xzalloc(sizeof *conn);
conn->state = OTHERS_FIRST;
- update_expiration(&conn->up, other_timeouts[conn->state], now);
+ conn_init_expiration(ctb, &conn->up, other_timeouts[conn->state], now);
return &conn->up;
}
diff --git a/lib/conntrack-private.h b/lib/conntrack-private.h
index bc32448fe..5aac938cc 100644
--- a/lib/conntrack-private.h
+++ b/lib/conntrack-private.h
@@ -69,10 +69,13 @@ enum ct_update_res {
};
struct ct_l4_proto {
- struct conn *(*new_conn)(struct dp_packet *pkt, long long now);
+ struct conn *(*new_conn)(struct conntrack_bucket *, struct dp_packet *pkt,
+ long long now);
bool (*valid_new)(struct dp_packet *pkt);
- enum ct_update_res (*conn_update)(struct conn *conn, struct dp_packet *pkt,
- bool reply, long long now);
+ enum ct_update_res (*conn_update)(struct conn *conn,
+ struct conntrack_bucket *,
+ struct dp_packet *pkt, bool reply,
+ long long now);
};
extern struct ct_l4_proto ct_proto_tcp;
@@ -81,9 +84,19 @@ extern struct ct_l4_proto ct_proto_other;
extern long long ct_timeout_val[];
static inline void
-update_expiration(struct conn *conn, enum ct_timeout tm, long long now)
+conn_init_expiration(struct conntrack_bucket *ctb, struct conn *conn,
+ enum ct_timeout tm, long long now)
{
conn->expiration = now + ct_timeout_val[tm];
+ ovs_list_push_back(&ctb->exp_lists[tm], &conn->exp_node);
+}
+
+static inline void
+conn_update_expiration(struct conntrack_bucket *ctb, struct conn *conn,
+ enum ct_timeout tm, long long now)
+{
+ ovs_list_remove(&conn->exp_node);
+ conn_init_expiration(ctb, conn, tm, now);
}
#endif /* conntrack-private.h */
diff --git a/lib/conntrack-tcp.c b/lib/conntrack-tcp.c
index 6da798dda..7edcce3b0 100644
--- a/lib/conntrack-tcp.c
+++ b/lib/conntrack-tcp.c
@@ -152,8 +152,8 @@ tcp_payload_length(struct dp_packet *pkt)
}
static enum ct_update_res
-tcp_conn_update(struct conn* conn_, struct dp_packet *pkt, bool reply,
- long long now)
+tcp_conn_update(struct conn *conn_, struct conntrack_bucket *ctb,
+ struct dp_packet *pkt, bool reply, long long now)
{
struct conn_tcp *conn = conn_tcp_cast(conn_);
struct tcp_header *tcp = dp_packet_l4(pkt);
@@ -319,18 +319,18 @@ tcp_conn_update(struct conn* conn_, struct dp_packet *pkt, bool reply,
if (src->state >= CT_DPIF_TCPS_FIN_WAIT_2
&& dst->state >= CT_DPIF_TCPS_FIN_WAIT_2) {
- update_expiration(conn_, CT_TM_TCP_CLOSED, now);
+ conn_update_expiration(ctb, &conn->up, CT_TM_TCP_CLOSED, now);
} else if (src->state >= CT_DPIF_TCPS_CLOSING
&& dst->state >= CT_DPIF_TCPS_CLOSING) {
- update_expiration(conn_, CT_TM_TCP_FIN_WAIT, now);
+ conn_update_expiration(ctb, &conn->up, CT_TM_TCP_FIN_WAIT, now);
} else if (src->state < CT_DPIF_TCPS_ESTABLISHED
|| dst->state < CT_DPIF_TCPS_ESTABLISHED) {
- update_expiration(conn_, now, CT_TM_TCP_OPENING);
+ conn_update_expiration(ctb, &conn->up, CT_TM_TCP_OPENING, now);
} else if (src->state >= CT_DPIF_TCPS_CLOSING
|| dst->state >= CT_DPIF_TCPS_CLOSING) {
- update_expiration(conn_, now, CT_TM_TCP_CLOSING);
+ conn_update_expiration(ctb, &conn->up, CT_TM_TCP_CLOSING, now);
} else {
- update_expiration(conn_, now, CT_TM_TCP_ESTABLISHED);
+ conn_update_expiration(ctb, &conn->up, CT_TM_TCP_ESTABLISHED, now);
}
} else if ((dst->state < CT_DPIF_TCPS_SYN_SENT
|| dst->state >= CT_DPIF_TCPS_FIN_WAIT_2
@@ -414,7 +414,8 @@ tcp_valid_new(struct dp_packet *pkt)
}
static struct conn *
-tcp_new_conn(struct dp_packet *pkt, long long now)
+tcp_new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt,
+ long long now)
{
struct conn_tcp* newconn = NULL;
struct tcp_header *tcp = dp_packet_l4(pkt);
@@ -450,7 +451,8 @@ tcp_new_conn(struct dp_packet *pkt, long long now)
src->state = CT_DPIF_TCPS_SYN_SENT;
dst->state = CT_DPIF_TCPS_CLOSED;
- update_expiration(&newconn->up, now, CT_TM_TCP_FIRST_PACKET);
+ conn_init_expiration(ctb, &newconn->up, CT_TM_TCP_FIRST_PACKET,
+ now);
return &newconn->up;
}
diff --git a/lib/conntrack.c b/lib/conntrack.c
index ff5ce6fcc..094a2300d 100644
--- a/lib/conntrack.c
+++ b/lib/conntrack.c
@@ -33,12 +33,15 @@
#include "openvswitch/hmap.h"
#include "openvswitch/vlog.h"
#include "ovs-rcu.h"
+#include "ovs-thread.h"
+#include "poll-loop.h"
#include "random.h"
#include "timeval.h"
VLOG_DEFINE_THIS_MODULE(conntrack);
COVERAGE_DEFINE(conntrack_full);
+COVERAGE_DEFINE(conntrack_long_cleanup);
struct conn_lookup_ctx {
struct conn_key key;
@@ -56,17 +59,20 @@ static void conn_key_lookup(struct conntrack_bucket *ctb,
struct conn_lookup_ctx *ctx,
long long now);
static bool valid_new(struct dp_packet *pkt, struct conn_key *);
-static struct conn *new_conn(struct dp_packet *pkt, struct conn_key *,
- long long now);
+static struct conn *new_conn(struct conntrack_bucket *, struct dp_packet *pkt,
+ struct conn_key *, long long now);
static void delete_conn(struct conn *);
-static enum ct_update_res conn_update(struct conn *, struct dp_packet*,
- bool reply, long long now);
+static enum ct_update_res conn_update(struct conn *,
+ struct conntrack_bucket *ctb,
+ struct dp_packet *, bool reply,
+ long long now);
static bool conn_expired(struct conn *, long long now);
static void set_mark(struct dp_packet *, struct conn *,
uint32_t val, uint32_t mask);
static void set_label(struct dp_packet *, struct conn *,
const struct ovs_key_ct_labels *val,
const struct ovs_key_ct_labels *mask);
+static void *clean_thread_main(void *f_);
static struct ct_l4_proto *l4_protos[] = {
[IPPROTO_TCP] = &ct_proto_tcp,
@@ -90,7 +96,8 @@ long long ct_timeout_val[] = {
void
conntrack_init(struct conntrack *ct)
{
- unsigned i;
+ unsigned i, j;
+ long long now = time_msec();
for (i = 0; i < CONNTRACK_BUCKETS; i++) {
struct conntrack_bucket *ctb = &ct->buckets[i];
@@ -98,11 +105,20 @@ conntrack_init(struct conntrack *ct)
ct_lock_init(&ctb->lock);
ct_lock_lock(&ctb->lock);
hmap_init(&ctb->connections);
+ for (j = 0; j < ARRAY_SIZE(ctb->exp_lists); j++) {
+ ovs_list_init(&ctb->exp_lists[j]);
+ }
ct_lock_unlock(&ctb->lock);
+ ovs_mutex_init(&ctb->cleanup_mutex);
+ ovs_mutex_lock(&ctb->cleanup_mutex);
+ ctb->next_cleanup = now + CT_TM_MIN;
+ ovs_mutex_unlock(&ctb->cleanup_mutex);
}
ct->hash_basis = random_uint32();
atomic_count_init(&ct->n_conn, 0);
atomic_init(&ct->n_conn_limit, DEFAULT_N_CONN_LIMIT);
+ latch_init(&ct->clean_thread_exit);
+ ct->clean_thread = ovs_thread_create("ct_clean", clean_thread_main, ct);
}
/* Destroys the connection tracker 'ct' and frees all the allocated memory. */
@@ -111,10 +127,14 @@ conntrack_destroy(struct conntrack *ct)
{
unsigned i;
+ latch_set(&ct->clean_thread_exit);
+ pthread_join(ct->clean_thread, NULL);
+ latch_destroy(&ct->clean_thread_exit);
for (i = 0; i < CONNTRACK_BUCKETS; i++) {
struct conntrack_bucket *ctb = &ct->buckets[i];
struct conn *conn;
+ ovs_mutex_destroy(&ctb->cleanup_mutex);
ct_lock_lock(&ctb->lock);
HMAP_FOR_EACH_POP(conn, node, &ctb->connections) {
atomic_count_dec(&ct->n_conn);
@@ -170,7 +190,7 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
return nc;
}
- nc = new_conn(pkt, &ctx->key, now);
+ nc = new_conn(&ct->buckets[bucket], pkt, &ctx->key, now);
memcpy(&nc->rev_key, &ctx->key, sizeof nc->rev_key);
@@ -200,7 +220,8 @@ process_one(struct conntrack *ct, struct dp_packet *pkt,
} else {
enum ct_update_res res;
- res = conn_update(conn, pkt, ctx->reply, now);
+ res = conn_update(conn, &ct->buckets[bucket], pkt,
+ ctx->reply, now);
switch (res) {
case CT_UPDATE_VALID:
@@ -213,6 +234,7 @@ process_one(struct conntrack *ct, struct dp_packet *pkt,
state |= CS_INVALID;
break;
case CT_UPDATE_NEW:
+ ovs_list_remove(&conn->exp_node);
hmap_remove(&ct->buckets[bucket].connections, &conn->node);
atomic_count_dec(&ct->n_conn);
delete_conn(conn);
@@ -345,6 +367,144 @@ set_label(struct dp_packet *pkt, struct conn *conn,
conn->label = pkt->md.ct_label;
}
+/* Delete the expired connections from 'ctb', up to 'limit'. Returns the
+ * earliest expiration time among the remaining connections in 'ctb'. Returns
+ * LLONG_MAX if 'ctb' is empty. The return value might be smaller than 'now',
+ * if 'limit' is reached */
+static long long
+sweep_bucket(struct conntrack *ct, struct conntrack_bucket *ctb, long long now,
+ size_t limit)
+ OVS_REQUIRES(ctb->lock)
+{
+ struct conn *conn, *next;
+ long long min_expiration = LLONG_MAX;
+ unsigned i;
+ size_t count = 0;
+
+ for (i = 0; i < N_CT_TM; i++) {
+ LIST_FOR_EACH_SAFE (conn, next, exp_node, &ctb->exp_lists[i]) {
+ if (!conn_expired(conn, now) || count >= limit) {
+ min_expiration = MIN(min_expiration, conn->expiration);
+ if (count >= limit) {
+ /* Do not check other lists. */
+ COVERAGE_INC(conntrack_long_cleanup);
+ return min_expiration;
+ }
+ break;
+ }
+ ovs_list_remove(&conn->exp_node);
+ hmap_remove(&ctb->connections, &conn->node);
+ atomic_count_dec(&ct->n_conn);
+ delete_conn(conn);
+ count++;
+ }
+ }
+
+ return min_expiration;
+}
+
+/* Cleans up old connection entries from 'ct'. Returns the time when the
+ * next expiration might happen. The return value might be smaller than
+ * 'now', meaning that an internal limit has been reached, and some expired
+ * connections have not been deleted. */
+static long long
+conntrack_clean(struct conntrack *ct, long long now)
+{
+ long long next_wakeup = now + CT_TM_MIN;
+ unsigned int n_conn_limit;
+ size_t clean_count = 0;
+ unsigned i;
+
+ atomic_read_relaxed(&ct->n_conn_limit, &n_conn_limit);
+
+ for (i = 0; i < CONNTRACK_BUCKETS; i++) {
+ struct conntrack_bucket *ctb = &ct->buckets[i];
+ size_t prev_count;
+ long long min_exp;
+
+ ovs_mutex_lock(&ctb->cleanup_mutex);
+ if (ctb->next_cleanup > now) {
+ goto next_bucket;
+ }
+
+ ct_lock_lock(&ctb->lock);
+ prev_count = hmap_count(&ctb->connections);
+ /* If the connections are well distributed among buckets, we want to
+ * limit to 10% of the global limit equally split among buckets. If
+ * the bucket is busier than the others, we limit to 10% of its
+ * current size. */
+ min_exp = sweep_bucket(ct, ctb, now,
+ MAX(prev_count/10, n_conn_limit/(CONNTRACK_BUCKETS*10)));
+ clean_count += prev_count - hmap_count(&ctb->connections);
+
+ if (min_exp > now) {
+ /* We call hmap_shrink() only if sweep_bucket() managed to delete
+ * every expired connection. */
+ hmap_shrink(&ctb->connections);
+ }
+
+ ct_lock_unlock(&ctb->lock);
+
+ ctb->next_cleanup = MIN(min_exp, now + CT_TM_MIN);
+
+next_bucket:
+ next_wakeup = MIN(next_wakeup, ctb->next_cleanup);
+ ovs_mutex_unlock(&ctb->cleanup_mutex);
+ }
+
+ VLOG_DBG("conntrack cleanup %"PRIuSIZE" entries in %lld msec",
+ clean_count, time_msec() - now);
+
+ return next_wakeup;
+}
+
+/* Cleanup:
+ *
+ *
+ * We must call conntrack_clean() periodically. conntrack_clean() return
+ * value gives an hint on when the next cleanup must be done (either because
+ * there is an actual connection that expires, or because a new connection
+ * might be created with the minimum timeout).
+ *
+ * The logic below has two goals:
+ *
+ * - Avoid calling conntrack_clean() too often. If we call conntrack_clean()
+ * each time a connection expires, the thread will consume 100% CPU, so we
+ * try to call the function _at most_ once every CT_CLEAN_INTERVAL, to batch
+ * removal.
+ *
+ * - On the other hand, it's not a good idea to keep the buckets locked for
+ * too long, as we might prevent traffic from flowing. If conntrack_clean()
+ * returns a value which is in the past, it means that the internal limit
+ * has been reached and more cleanup is required. In this case, just wait
+ * CT_CLEAN_MIN_INTERVAL before the next call.
+ */
+#define CT_CLEAN_INTERVAL 5000 /* 5 seconds */
+#define CT_CLEAN_MIN_INTERVAL 200 /* 0.2 seconds */
+
+static void *
+clean_thread_main(void *f_)
+{
+ struct conntrack *ct = f_;
+
+ while (!latch_is_set(&ct->clean_thread_exit)) {
+ long long next_wake;
+ long long now = time_msec();
+
+ next_wake = conntrack_clean(ct, now);
+
+ if (next_wake < now) {
+ poll_timer_wait_until(now + CT_CLEAN_MIN_INTERVAL);
+ } else {
+ poll_timer_wait_until(MAX(next_wake, now + CT_CLEAN_INTERVAL));
+ }
+ latch_wait(&ct->clean_thread_exit);
+ poll_block();
+ }
+
+ return NULL;
+}
+
/* Key extraction */
/* The function stores a pointer to the first byte after the header in
@@ -851,10 +1011,11 @@ conn_key_lookup(struct conntrack_bucket *ctb,
}
static enum ct_update_res
-conn_update(struct conn *conn, struct dp_packet *pkt, bool reply,
- long long now)
+conn_update(struct conn *conn, struct conntrack_bucket *ctb,
+ struct dp_packet *pkt, bool reply, long long now)
{
- return l4_protos[conn->key.nw_proto]->conn_update(conn, pkt, reply, now);
+ return l4_protos[conn->key.nw_proto]->conn_update(conn, ctb, pkt,
+ reply, now);
}
static bool
@@ -870,11 +1031,12 @@ valid_new(struct dp_packet *pkt, struct conn_key *key)
}
static struct conn *
-new_conn(struct dp_packet *pkt, struct conn_key *key, long long now)
+new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt,
+ struct conn_key *key, long long now)
{
struct conn *newconn;
- newconn = l4_protos[key->nw_proto]->new_conn(pkt, now);
+ newconn = l4_protos[key->nw_proto]->new_conn(ctb, pkt, now);
if (newconn) {
newconn->key = *key;
diff --git a/lib/conntrack.h b/lib/conntrack.h
index 331a890d8..8758e5b60 100644
--- a/lib/conntrack.h
+++ b/lib/conntrack.h
@@ -19,8 +19,10 @@
#include <stdbool.h>
+#include "latch.h"
#include "odp-netlink.h"
#include "openvswitch/hmap.h"
+#include "openvswitch/list.h"
#include "openvswitch/thread.h"
#include "openvswitch/types.h"
#include "ovs-atomic.h"
@@ -60,7 +62,6 @@ struct dp_packet_batch;
struct conntrack;
void conntrack_init(struct conntrack *);
-void conntrack_run(struct conntrack *);
void conntrack_destroy(struct conntrack *);
int conntrack_execute(struct conntrack *, struct dp_packet_batch *,
@@ -113,6 +114,14 @@ static inline void ct_lock_destroy(struct ct_lock *lock)
CT_TIMEOUT(OTHER_MULTIPLE, 60 * 1000) \
CT_TIMEOUT(OTHER_BIDIR, 30 * 1000) \
+/* The smallest of the above values: it is used as an upper bound for the
+ * interval between two rounds of cleanup of expired entries */
+#define CT_TM_MIN (30 * 1000)
+
+#define CT_TIMEOUT(NAME, VAL) BUILD_ASSERT_DECL(VAL >= CT_TM_MIN);
+ CT_TIMEOUTS
+#undef CT_TIMEOUT
+
enum ct_timeout {
#define CT_TIMEOUT(NAME, VALUE) CT_TM_##NAME,
CT_TIMEOUTS
@@ -124,10 +133,29 @@ enum ct_timeout {
*
* The connections are kept in different buckets, which are completely
* independent. The connection bucket is determined by the hash of its key.
+ *
+ * Each bucket has two locks. Acquisition order is, from outermost to
+ * innermost:
+ *
+ * cleanup_mutex
+ * lock
+ *
* */
struct conntrack_bucket {
+ /* Protects 'connections' and 'exp_lists'. Used in the fast path */
struct ct_lock lock;
+ /* Contains the connections in the bucket, indexed by 'struct conn_key' */
struct hmap connections OVS_GUARDED;
+ /* For each possible timeout we have a list of connections. When the
+ * timeout of a connection is updated, we move it to the back of the list.
+ * Since the connection in a list have the same relative timeout, the list
+ * will be ordered, with the oldest connections to the front. */
+ struct ovs_list exp_lists[N_CT_TM] OVS_GUARDED;
+
+ /* Protects 'next_cleanup'. Used to make sure that there's only one thread
+ * performing the cleanup. */
+ struct ovs_mutex cleanup_mutex;
+ long long next_cleanup OVS_GUARDED;
};
#define CONNTRACK_BUCKETS_SHIFT 8
@@ -140,6 +168,12 @@ struct conntrack {
/* Salt for hashing a connection key. */
uint32_t hash_basis;
+ /* The thread performing periodic cleanup of the connection
+ * tracker */
+ pthread_t clean_thread;
+ /* Latch to destroy the 'clean_thread' */
+ struct latch clean_thread_exit;
+
/* Number of connections currently in the connection tracker. */
atomic_count n_conn;
/* Connections limit. When this limit is reached, no new connection