diff options
-rw-r--r-- | lib/conntrack-other.c | 11 | ||||
-rw-r--r-- | lib/conntrack-private.h | 21 | ||||
-rw-r--r-- | lib/conntrack-tcp.c | 20 | ||||
-rw-r--r-- | lib/conntrack.c | 186 | ||||
-rw-r--r-- | lib/conntrack.h | 36 |
5 files changed, 243 insertions, 31 deletions
diff --git a/lib/conntrack-other.c b/lib/conntrack-other.c index 295cb2c87..2920889e7 100644 --- a/lib/conntrack-other.c +++ b/lib/conntrack-other.c @@ -43,8 +43,8 @@ conn_other_cast(const struct conn *conn) } static enum ct_update_res -other_conn_update(struct conn *conn_, struct dp_packet *pkt OVS_UNUSED, - bool reply, long long now) +other_conn_update(struct conn *conn_, struct conntrack_bucket *ctb, + struct dp_packet *pkt OVS_UNUSED, bool reply, long long now) { struct conn_other *conn = conn_other_cast(conn_); @@ -54,7 +54,7 @@ other_conn_update(struct conn *conn_, struct dp_packet *pkt OVS_UNUSED, conn->state = OTHERS_MULTIPLE; } - update_expiration(conn_, other_timeouts[conn->state], now); + conn_update_expiration(ctb, &conn->up, other_timeouts[conn->state], now); return CT_UPDATE_VALID; } @@ -66,14 +66,15 @@ other_valid_new(struct dp_packet *pkt OVS_UNUSED) } static struct conn * -other_new_conn(struct dp_packet *pkt OVS_UNUSED, long long now) +other_new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt OVS_UNUSED, + long long now) { struct conn_other *conn; conn = xzalloc(sizeof *conn); conn->state = OTHERS_FIRST; - update_expiration(&conn->up, other_timeouts[conn->state], now); + conn_init_expiration(ctb, &conn->up, other_timeouts[conn->state], now); return &conn->up; } diff --git a/lib/conntrack-private.h b/lib/conntrack-private.h index bc32448fe..5aac938cc 100644 --- a/lib/conntrack-private.h +++ b/lib/conntrack-private.h @@ -69,10 +69,13 @@ enum ct_update_res { }; struct ct_l4_proto { - struct conn *(*new_conn)(struct dp_packet *pkt, long long now); + struct conn *(*new_conn)(struct conntrack_bucket *, struct dp_packet *pkt, + long long now); bool (*valid_new)(struct dp_packet *pkt); - enum ct_update_res (*conn_update)(struct conn *conn, struct dp_packet *pkt, - bool reply, long long now); + enum ct_update_res (*conn_update)(struct conn *conn, + struct conntrack_bucket *, + struct dp_packet *pkt, bool reply, + long long now); }; extern struct ct_l4_proto ct_proto_tcp; @@ -81,9 +84,19 @@ extern struct ct_l4_proto ct_proto_other; extern long long ct_timeout_val[]; static inline void -update_expiration(struct conn *conn, enum ct_timeout tm, long long now) +conn_init_expiration(struct conntrack_bucket *ctb, struct conn *conn, + enum ct_timeout tm, long long now) { conn->expiration = now + ct_timeout_val[tm]; + ovs_list_push_back(&ctb->exp_lists[tm], &conn->exp_node); +} + +static inline void +conn_update_expiration(struct conntrack_bucket *ctb, struct conn *conn, + enum ct_timeout tm, long long now) +{ + ovs_list_remove(&conn->exp_node); + conn_init_expiration(ctb, conn, tm, now); } #endif /* conntrack-private.h */ diff --git a/lib/conntrack-tcp.c b/lib/conntrack-tcp.c index 6da798dda..7edcce3b0 100644 --- a/lib/conntrack-tcp.c +++ b/lib/conntrack-tcp.c @@ -152,8 +152,8 @@ tcp_payload_length(struct dp_packet *pkt) } static enum ct_update_res -tcp_conn_update(struct conn* conn_, struct dp_packet *pkt, bool reply, - long long now) +tcp_conn_update(struct conn *conn_, struct conntrack_bucket *ctb, + struct dp_packet *pkt, bool reply, long long now) { struct conn_tcp *conn = conn_tcp_cast(conn_); struct tcp_header *tcp = dp_packet_l4(pkt); @@ -319,18 +319,18 @@ tcp_conn_update(struct conn* conn_, struct dp_packet *pkt, bool reply, if (src->state >= CT_DPIF_TCPS_FIN_WAIT_2 && dst->state >= CT_DPIF_TCPS_FIN_WAIT_2) { - update_expiration(conn_, CT_TM_TCP_CLOSED, now); + conn_update_expiration(ctb, &conn->up, CT_TM_TCP_CLOSED, now); } else if (src->state >= CT_DPIF_TCPS_CLOSING && dst->state >= CT_DPIF_TCPS_CLOSING) { - update_expiration(conn_, CT_TM_TCP_FIN_WAIT, now); + conn_update_expiration(ctb, &conn->up, CT_TM_TCP_FIN_WAIT, now); } else if (src->state < CT_DPIF_TCPS_ESTABLISHED || dst->state < CT_DPIF_TCPS_ESTABLISHED) { - update_expiration(conn_, now, CT_TM_TCP_OPENING); + conn_update_expiration(ctb, &conn->up, CT_TM_TCP_OPENING, now); } else if (src->state >= CT_DPIF_TCPS_CLOSING || dst->state >= CT_DPIF_TCPS_CLOSING) { - update_expiration(conn_, now, CT_TM_TCP_CLOSING); + conn_update_expiration(ctb, &conn->up, CT_TM_TCP_CLOSING, now); } else { - update_expiration(conn_, now, CT_TM_TCP_ESTABLISHED); + conn_update_expiration(ctb, &conn->up, CT_TM_TCP_ESTABLISHED, now); } } else if ((dst->state < CT_DPIF_TCPS_SYN_SENT || dst->state >= CT_DPIF_TCPS_FIN_WAIT_2 @@ -414,7 +414,8 @@ tcp_valid_new(struct dp_packet *pkt) } static struct conn * -tcp_new_conn(struct dp_packet *pkt, long long now) +tcp_new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt, + long long now) { struct conn_tcp* newconn = NULL; struct tcp_header *tcp = dp_packet_l4(pkt); @@ -450,7 +451,8 @@ tcp_new_conn(struct dp_packet *pkt, long long now) src->state = CT_DPIF_TCPS_SYN_SENT; dst->state = CT_DPIF_TCPS_CLOSED; - update_expiration(&newconn->up, now, CT_TM_TCP_FIRST_PACKET); + conn_init_expiration(ctb, &newconn->up, CT_TM_TCP_FIRST_PACKET, + now); return &newconn->up; } diff --git a/lib/conntrack.c b/lib/conntrack.c index ff5ce6fcc..094a2300d 100644 --- a/lib/conntrack.c +++ b/lib/conntrack.c @@ -33,12 +33,15 @@ #include "openvswitch/hmap.h" #include "openvswitch/vlog.h" #include "ovs-rcu.h" +#include "ovs-thread.h" +#include "poll-loop.h" #include "random.h" #include "timeval.h" VLOG_DEFINE_THIS_MODULE(conntrack); COVERAGE_DEFINE(conntrack_full); +COVERAGE_DEFINE(conntrack_long_cleanup); struct conn_lookup_ctx { struct conn_key key; @@ -56,17 +59,20 @@ static void conn_key_lookup(struct conntrack_bucket *ctb, struct conn_lookup_ctx *ctx, long long now); static bool valid_new(struct dp_packet *pkt, struct conn_key *); -static struct conn *new_conn(struct dp_packet *pkt, struct conn_key *, - long long now); +static struct conn *new_conn(struct conntrack_bucket *, struct dp_packet *pkt, + struct conn_key *, long long now); static void delete_conn(struct conn *); -static enum ct_update_res conn_update(struct conn *, struct dp_packet*, - bool reply, long long now); +static enum ct_update_res conn_update(struct conn *, + struct conntrack_bucket *ctb, + struct dp_packet *, bool reply, + long long now); static bool conn_expired(struct conn *, long long now); static void set_mark(struct dp_packet *, struct conn *, uint32_t val, uint32_t mask); static void set_label(struct dp_packet *, struct conn *, const struct ovs_key_ct_labels *val, const struct ovs_key_ct_labels *mask); +static void *clean_thread_main(void *f_); static struct ct_l4_proto *l4_protos[] = { [IPPROTO_TCP] = &ct_proto_tcp, @@ -90,7 +96,8 @@ long long ct_timeout_val[] = { void conntrack_init(struct conntrack *ct) { - unsigned i; + unsigned i, j; + long long now = time_msec(); for (i = 0; i < CONNTRACK_BUCKETS; i++) { struct conntrack_bucket *ctb = &ct->buckets[i]; @@ -98,11 +105,20 @@ conntrack_init(struct conntrack *ct) ct_lock_init(&ctb->lock); ct_lock_lock(&ctb->lock); hmap_init(&ctb->connections); + for (j = 0; j < ARRAY_SIZE(ctb->exp_lists); j++) { + ovs_list_init(&ctb->exp_lists[j]); + } ct_lock_unlock(&ctb->lock); + ovs_mutex_init(&ctb->cleanup_mutex); + ovs_mutex_lock(&ctb->cleanup_mutex); + ctb->next_cleanup = now + CT_TM_MIN; + ovs_mutex_unlock(&ctb->cleanup_mutex); } ct->hash_basis = random_uint32(); atomic_count_init(&ct->n_conn, 0); atomic_init(&ct->n_conn_limit, DEFAULT_N_CONN_LIMIT); + latch_init(&ct->clean_thread_exit); + ct->clean_thread = ovs_thread_create("ct_clean", clean_thread_main, ct); } /* Destroys the connection tracker 'ct' and frees all the allocated memory. */ @@ -111,10 +127,14 @@ conntrack_destroy(struct conntrack *ct) { unsigned i; + latch_set(&ct->clean_thread_exit); + pthread_join(ct->clean_thread, NULL); + latch_destroy(&ct->clean_thread_exit); for (i = 0; i < CONNTRACK_BUCKETS; i++) { struct conntrack_bucket *ctb = &ct->buckets[i]; struct conn *conn; + ovs_mutex_destroy(&ctb->cleanup_mutex); ct_lock_lock(&ctb->lock); HMAP_FOR_EACH_POP(conn, node, &ctb->connections) { atomic_count_dec(&ct->n_conn); @@ -170,7 +190,7 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt, return nc; } - nc = new_conn(pkt, &ctx->key, now); + nc = new_conn(&ct->buckets[bucket], pkt, &ctx->key, now); memcpy(&nc->rev_key, &ctx->key, sizeof nc->rev_key); @@ -200,7 +220,8 @@ process_one(struct conntrack *ct, struct dp_packet *pkt, } else { enum ct_update_res res; - res = conn_update(conn, pkt, ctx->reply, now); + res = conn_update(conn, &ct->buckets[bucket], pkt, + ctx->reply, now); switch (res) { case CT_UPDATE_VALID: @@ -213,6 +234,7 @@ process_one(struct conntrack *ct, struct dp_packet *pkt, state |= CS_INVALID; break; case CT_UPDATE_NEW: + ovs_list_remove(&conn->exp_node); hmap_remove(&ct->buckets[bucket].connections, &conn->node); atomic_count_dec(&ct->n_conn); delete_conn(conn); @@ -345,6 +367,144 @@ set_label(struct dp_packet *pkt, struct conn *conn, conn->label = pkt->md.ct_label; } +/* Delete the expired connections from 'ctb', up to 'limit'. Returns the + * earliest expiration time among the remaining connections in 'ctb'. Returns + * LLONG_MAX if 'ctb' is empty. The return value might be smaller than 'now', + * if 'limit' is reached */ +static long long +sweep_bucket(struct conntrack *ct, struct conntrack_bucket *ctb, long long now, + size_t limit) + OVS_REQUIRES(ctb->lock) +{ + struct conn *conn, *next; + long long min_expiration = LLONG_MAX; + unsigned i; + size_t count = 0; + + for (i = 0; i < N_CT_TM; i++) { + LIST_FOR_EACH_SAFE (conn, next, exp_node, &ctb->exp_lists[i]) { + if (!conn_expired(conn, now) || count >= limit) { + min_expiration = MIN(min_expiration, conn->expiration); + if (count >= limit) { + /* Do not check other lists. */ + COVERAGE_INC(conntrack_long_cleanup); + return min_expiration; + } + break; + } + ovs_list_remove(&conn->exp_node); + hmap_remove(&ctb->connections, &conn->node); + atomic_count_dec(&ct->n_conn); + delete_conn(conn); + count++; + } + } + + return min_expiration; +} + +/* Cleans up old connection entries from 'ct'. Returns the time when the + * next expiration might happen. The return value might be smaller than + * 'now', meaning that an internal limit has been reached, and some expired + * connections have not been deleted. */ +static long long +conntrack_clean(struct conntrack *ct, long long now) +{ + long long next_wakeup = now + CT_TM_MIN; + unsigned int n_conn_limit; + size_t clean_count = 0; + unsigned i; + + atomic_read_relaxed(&ct->n_conn_limit, &n_conn_limit); + + for (i = 0; i < CONNTRACK_BUCKETS; i++) { + struct conntrack_bucket *ctb = &ct->buckets[i]; + size_t prev_count; + long long min_exp; + + ovs_mutex_lock(&ctb->cleanup_mutex); + if (ctb->next_cleanup > now) { + goto next_bucket; + } + + ct_lock_lock(&ctb->lock); + prev_count = hmap_count(&ctb->connections); + /* If the connections are well distributed among buckets, we want to + * limit to 10% of the global limit equally split among buckets. If + * the bucket is busier than the others, we limit to 10% of its + * current size. */ + min_exp = sweep_bucket(ct, ctb, now, + MAX(prev_count/10, n_conn_limit/(CONNTRACK_BUCKETS*10))); + clean_count += prev_count - hmap_count(&ctb->connections); + + if (min_exp > now) { + /* We call hmap_shrink() only if sweep_bucket() managed to delete + * every expired connection. */ + hmap_shrink(&ctb->connections); + } + + ct_lock_unlock(&ctb->lock); + + ctb->next_cleanup = MIN(min_exp, now + CT_TM_MIN); + +next_bucket: + next_wakeup = MIN(next_wakeup, ctb->next_cleanup); + ovs_mutex_unlock(&ctb->cleanup_mutex); + } + + VLOG_DBG("conntrack cleanup %"PRIuSIZE" entries in %lld msec", + clean_count, time_msec() - now); + + return next_wakeup; +} + +/* Cleanup: + * + * + * We must call conntrack_clean() periodically. conntrack_clean() return + * value gives an hint on when the next cleanup must be done (either because + * there is an actual connection that expires, or because a new connection + * might be created with the minimum timeout). + * + * The logic below has two goals: + * + * - Avoid calling conntrack_clean() too often. If we call conntrack_clean() + * each time a connection expires, the thread will consume 100% CPU, so we + * try to call the function _at most_ once every CT_CLEAN_INTERVAL, to batch + * removal. + * + * - On the other hand, it's not a good idea to keep the buckets locked for + * too long, as we might prevent traffic from flowing. If conntrack_clean() + * returns a value which is in the past, it means that the internal limit + * has been reached and more cleanup is required. In this case, just wait + * CT_CLEAN_MIN_INTERVAL before the next call. + */ +#define CT_CLEAN_INTERVAL 5000 /* 5 seconds */ +#define CT_CLEAN_MIN_INTERVAL 200 /* 0.2 seconds */ + +static void * +clean_thread_main(void *f_) +{ + struct conntrack *ct = f_; + + while (!latch_is_set(&ct->clean_thread_exit)) { + long long next_wake; + long long now = time_msec(); + + next_wake = conntrack_clean(ct, now); + + if (next_wake < now) { + poll_timer_wait_until(now + CT_CLEAN_MIN_INTERVAL); + } else { + poll_timer_wait_until(MAX(next_wake, now + CT_CLEAN_INTERVAL)); + } + latch_wait(&ct->clean_thread_exit); + poll_block(); + } + + return NULL; +} + /* Key extraction */ /* The function stores a pointer to the first byte after the header in @@ -851,10 +1011,11 @@ conn_key_lookup(struct conntrack_bucket *ctb, } static enum ct_update_res -conn_update(struct conn *conn, struct dp_packet *pkt, bool reply, - long long now) +conn_update(struct conn *conn, struct conntrack_bucket *ctb, + struct dp_packet *pkt, bool reply, long long now) { - return l4_protos[conn->key.nw_proto]->conn_update(conn, pkt, reply, now); + return l4_protos[conn->key.nw_proto]->conn_update(conn, ctb, pkt, + reply, now); } static bool @@ -870,11 +1031,12 @@ valid_new(struct dp_packet *pkt, struct conn_key *key) } static struct conn * -new_conn(struct dp_packet *pkt, struct conn_key *key, long long now) +new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt, + struct conn_key *key, long long now) { struct conn *newconn; - newconn = l4_protos[key->nw_proto]->new_conn(pkt, now); + newconn = l4_protos[key->nw_proto]->new_conn(ctb, pkt, now); if (newconn) { newconn->key = *key; diff --git a/lib/conntrack.h b/lib/conntrack.h index 331a890d8..8758e5b60 100644 --- a/lib/conntrack.h +++ b/lib/conntrack.h @@ -19,8 +19,10 @@ #include <stdbool.h> +#include "latch.h" #include "odp-netlink.h" #include "openvswitch/hmap.h" +#include "openvswitch/list.h" #include "openvswitch/thread.h" #include "openvswitch/types.h" #include "ovs-atomic.h" @@ -60,7 +62,6 @@ struct dp_packet_batch; struct conntrack; void conntrack_init(struct conntrack *); -void conntrack_run(struct conntrack *); void conntrack_destroy(struct conntrack *); int conntrack_execute(struct conntrack *, struct dp_packet_batch *, @@ -113,6 +114,14 @@ static inline void ct_lock_destroy(struct ct_lock *lock) CT_TIMEOUT(OTHER_MULTIPLE, 60 * 1000) \ CT_TIMEOUT(OTHER_BIDIR, 30 * 1000) \ +/* The smallest of the above values: it is used as an upper bound for the + * interval between two rounds of cleanup of expired entries */ +#define CT_TM_MIN (30 * 1000) + +#define CT_TIMEOUT(NAME, VAL) BUILD_ASSERT_DECL(VAL >= CT_TM_MIN); + CT_TIMEOUTS +#undef CT_TIMEOUT + enum ct_timeout { #define CT_TIMEOUT(NAME, VALUE) CT_TM_##NAME, CT_TIMEOUTS @@ -124,10 +133,29 @@ enum ct_timeout { * * The connections are kept in different buckets, which are completely * independent. The connection bucket is determined by the hash of its key. + * + * Each bucket has two locks. Acquisition order is, from outermost to + * innermost: + * + * cleanup_mutex + * lock + * * */ struct conntrack_bucket { + /* Protects 'connections' and 'exp_lists'. Used in the fast path */ struct ct_lock lock; + /* Contains the connections in the bucket, indexed by 'struct conn_key' */ struct hmap connections OVS_GUARDED; + /* For each possible timeout we have a list of connections. When the + * timeout of a connection is updated, we move it to the back of the list. + * Since the connection in a list have the same relative timeout, the list + * will be ordered, with the oldest connections to the front. */ + struct ovs_list exp_lists[N_CT_TM] OVS_GUARDED; + + /* Protects 'next_cleanup'. Used to make sure that there's only one thread + * performing the cleanup. */ + struct ovs_mutex cleanup_mutex; + long long next_cleanup OVS_GUARDED; }; #define CONNTRACK_BUCKETS_SHIFT 8 @@ -140,6 +168,12 @@ struct conntrack { /* Salt for hashing a connection key. */ uint32_t hash_basis; + /* The thread performing periodic cleanup of the connection + * tracker */ + pthread_t clean_thread; + /* Latch to destroy the 'clean_thread' */ + struct latch clean_thread_exit; + /* Number of connections currently in the connection tracker. */ atomic_count n_conn; /* Connections limit. When this limit is reached, no new connection |