diff options
-rw-r--r-- | NEWS | 2 | ||||
-rw-r--r-- | lib/jsonrpc.c | 60 | ||||
-rw-r--r-- | lib/jsonrpc.h | 6 | ||||
-rw-r--r-- | ovsdb/raft.c | 5 |
4 files changed, 72 insertions, 1 deletions
@@ -6,6 +6,8 @@ Post-v2.14.0 * New unixctl command 'ovsdb-server/memory-trim-on-compaction on|off'. If turned on, ovsdb-server will try to reclaim all the unused memory after every DB compaction back to OS. Disabled by default. + * Maximum backlog on RAFT connections limited to 500 messages or 4GB. + Once threshold reached, connection is dropped (and re-established). - DPDK: * Removed support for vhost-user dequeue zero-copy. - The environment variable OVS_UNBOUND_CONF, if set, is now used diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c index ecbc939fe..08aaff061 100644 --- a/lib/jsonrpc.c +++ b/lib/jsonrpc.c @@ -50,6 +50,10 @@ struct jsonrpc { struct ovs_list output; /* Contains "struct ofpbuf"s. */ size_t output_count; /* Number of elements in "output". */ size_t backlog; + + /* Limits. */ + size_t max_output; /* 'output_count' disconnection threshold. */ + size_t max_backlog; /* 'backlog' disconnection threshold. */ }; /* Rate limit for error messages. */ @@ -178,6 +182,17 @@ jsonrpc_get_backlog(const struct jsonrpc *rpc) return rpc->status ? 0 : rpc->backlog; } +/* Sets thresholds for send backlog. If send backlog contains more than + * 'max_n_msgs' messages or is larger than 'max_backlog_bytes' bytes, + * connection will be dropped. */ +void +jsonrpc_set_backlog_threshold(struct jsonrpc *rpc, + size_t max_n_msgs, size_t max_backlog_bytes) +{ + rpc->max_output = max_n_msgs; + rpc->max_backlog = max_backlog_bytes; +} + /* Returns the number of bytes that have been received on 'rpc''s underlying * stream. (The value wraps around if it exceeds UINT_MAX.) */ unsigned int @@ -261,9 +276,26 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg) rpc->backlog += length; if (rpc->output_count >= 50) { - VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of" + static struct vlog_rate_limit bl_rl = VLOG_RATE_LIMIT_INIT(5, 5); + bool disconnect = false; + + VLOG_INFO_RL(&bl_rl, "excessive sending backlog, jsonrpc: %s, num of" " msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name, rpc->output_count, rpc->backlog); + if (rpc->max_output && rpc->output_count > rpc->max_output) { + disconnect = true; + VLOG_WARN("sending backlog exceeded maximum number of messages (%" + PRIuSIZE" > %"PRIuSIZE"), disconnecting, jsonrpc: %s.", + rpc->output_count, rpc->max_output, rpc->name); + } else if (rpc->max_backlog && rpc->backlog > rpc->max_backlog) { + disconnect = true; + VLOG_WARN("sending backlog exceeded maximum size (%"PRIuSIZE" > %" + PRIuSIZE" bytes), disconnecting, jsonrpc: %s.", + rpc->backlog, rpc->max_backlog, rpc->name); + } + if (disconnect) { + jsonrpc_error(rpc, E2BIG); + } } if (rpc->backlog == length) { @@ -787,6 +819,10 @@ struct jsonrpc_session { int last_error; unsigned int seqno; uint8_t dscp; + + /* Limits for jsonrpc. */ + size_t max_n_msgs; + size_t max_backlog_bytes; }; static void @@ -842,6 +878,8 @@ jsonrpc_session_open_multiple(const struct svec *remotes, bool retry) s->dscp = 0; s->last_error = 0; + jsonrpc_session_set_backlog_threshold(s, 0, 0); + const char *name = reconnect_get_name(s->reconnect); if (!pstream_verify_name(name)) { reconnect_set_passive(s->reconnect, true, time_msec()); @@ -882,6 +920,7 @@ jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc, uint8_t dscp) s->pstream = NULL; s->seqno = 1; + jsonrpc_session_set_backlog_threshold(s, 0, 0); return s; } @@ -970,6 +1009,8 @@ jsonrpc_session_run(struct jsonrpc_session *s) } reconnect_connected(s->reconnect, time_msec()); s->rpc = jsonrpc_open(stream); + jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs, + s->max_backlog_bytes); s->seqno++; } else if (error != EAGAIN) { reconnect_listen_error(s->reconnect, time_msec(), error); @@ -1010,6 +1051,8 @@ jsonrpc_session_run(struct jsonrpc_session *s) if (!error) { reconnect_connected(s->reconnect, time_msec()); s->rpc = jsonrpc_open(s->stream); + jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs, + s->max_backlog_bytes); s->stream = NULL; s->seqno++; } else if (error != EAGAIN) { @@ -1250,3 +1293,18 @@ jsonrpc_session_set_dscp(struct jsonrpc_session *s, uint8_t dscp) jsonrpc_session_force_reconnect(s); } } + +/* Sets thresholds for send backlog. If send backlog contains more than + * 'max_n_msgs' messages or is larger than 'max_backlog_bytes' bytes, + * connection will be closed (then reconnected, if that feature is enabled). */ +void +jsonrpc_session_set_backlog_threshold(struct jsonrpc_session *s, + size_t max_n_msgs, + size_t max_backlog_bytes) +{ + s->max_n_msgs = max_n_msgs; + s->max_backlog_bytes = max_backlog_bytes; + if (s->rpc) { + jsonrpc_set_backlog_threshold(s->rpc, max_n_msgs, max_backlog_bytes); + } +} diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h index a44114e8d..d75d66b86 100644 --- a/lib/jsonrpc.h +++ b/lib/jsonrpc.h @@ -51,6 +51,9 @@ void jsonrpc_wait(struct jsonrpc *); int jsonrpc_get_status(const struct jsonrpc *); size_t jsonrpc_get_backlog(const struct jsonrpc *); +void jsonrpc_set_backlog_threshold(struct jsonrpc *, size_t max_n_msgs, + size_t max_backlog_bytes); + unsigned int jsonrpc_get_received_bytes(const struct jsonrpc *); const char *jsonrpc_get_name(const struct jsonrpc *); @@ -140,6 +143,9 @@ void jsonrpc_session_set_probe_interval(struct jsonrpc_session *, int probe_interval); void jsonrpc_session_set_dscp(struct jsonrpc_session *, uint8_t dscp); +void jsonrpc_session_set_backlog_threshold(struct jsonrpc_session *, + size_t max_n_msgs, + size_t max_backlog_bytes); const char *jsonrpc_session_get_id(const struct jsonrpc_session *); #endif /* jsonrpc.h */ diff --git a/ovsdb/raft.c b/ovsdb/raft.c index f94a3eed8..67c714ff4 100644 --- a/ovsdb/raft.c +++ b/ovsdb/raft.c @@ -925,6 +925,9 @@ raft_reset_ping_timer(struct raft *raft) raft->ping_timeout = time_msec() + raft->election_timer / 3; } +#define RAFT_MAX_BACKLOG_N_MSGS 500 +#define RAFT_MAX_BACKLOG_BYTES UINT32_MAX + static void raft_add_conn(struct raft *raft, struct jsonrpc_session *js, const struct uuid *sid, bool incoming) @@ -940,6 +943,8 @@ raft_add_conn(struct raft *raft, struct jsonrpc_session *js, conn->incoming = incoming; conn->js_seqno = jsonrpc_session_get_seqno(conn->js); jsonrpc_session_set_probe_interval(js, 0); + jsonrpc_session_set_backlog_threshold(js, RAFT_MAX_BACKLOG_N_MSGS, + RAFT_MAX_BACKLOG_BYTES); } /* Starts the local server in an existing Raft cluster, using the local copy of |