summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--NEWS2
-rw-r--r--lib/jsonrpc.c60
-rw-r--r--lib/jsonrpc.h6
-rw-r--r--ovsdb/raft.c5
4 files changed, 72 insertions, 1 deletions
diff --git a/NEWS b/NEWS
index 2860a8e9c..ebdf8758b 100644
--- a/NEWS
+++ b/NEWS
@@ -6,6 +6,8 @@ Post-v2.14.0
* New unixctl command 'ovsdb-server/memory-trim-on-compaction on|off'.
If turned on, ovsdb-server will try to reclaim all the unused memory
after every DB compaction back to OS. Disabled by default.
+ * Maximum backlog on RAFT connections limited to 500 messages or 4GB.
+ Once threshold reached, connection is dropped (and re-established).
- DPDK:
* Removed support for vhost-user dequeue zero-copy.
- The environment variable OVS_UNBOUND_CONF, if set, is now used
diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
index ecbc939fe..08aaff061 100644
--- a/lib/jsonrpc.c
+++ b/lib/jsonrpc.c
@@ -50,6 +50,10 @@ struct jsonrpc {
struct ovs_list output; /* Contains "struct ofpbuf"s. */
size_t output_count; /* Number of elements in "output". */
size_t backlog;
+
+ /* Limits. */
+ size_t max_output; /* 'output_count' disconnection threshold. */
+ size_t max_backlog; /* 'backlog' disconnection threshold. */
};
/* Rate limit for error messages. */
@@ -178,6 +182,17 @@ jsonrpc_get_backlog(const struct jsonrpc *rpc)
return rpc->status ? 0 : rpc->backlog;
}
+/* Sets thresholds for send backlog. If send backlog contains more than
+ * 'max_n_msgs' messages or is larger than 'max_backlog_bytes' bytes,
+ * connection will be dropped. */
+void
+jsonrpc_set_backlog_threshold(struct jsonrpc *rpc,
+ size_t max_n_msgs, size_t max_backlog_bytes)
+{
+ rpc->max_output = max_n_msgs;
+ rpc->max_backlog = max_backlog_bytes;
+}
+
/* Returns the number of bytes that have been received on 'rpc''s underlying
* stream. (The value wraps around if it exceeds UINT_MAX.) */
unsigned int
@@ -261,9 +276,26 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
rpc->backlog += length;
if (rpc->output_count >= 50) {
- VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of"
+ static struct vlog_rate_limit bl_rl = VLOG_RATE_LIMIT_INIT(5, 5);
+ bool disconnect = false;
+
+ VLOG_INFO_RL(&bl_rl, "excessive sending backlog, jsonrpc: %s, num of"
" msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name,
rpc->output_count, rpc->backlog);
+ if (rpc->max_output && rpc->output_count > rpc->max_output) {
+ disconnect = true;
+ VLOG_WARN("sending backlog exceeded maximum number of messages (%"
+ PRIuSIZE" > %"PRIuSIZE"), disconnecting, jsonrpc: %s.",
+ rpc->output_count, rpc->max_output, rpc->name);
+ } else if (rpc->max_backlog && rpc->backlog > rpc->max_backlog) {
+ disconnect = true;
+ VLOG_WARN("sending backlog exceeded maximum size (%"PRIuSIZE" > %"
+ PRIuSIZE" bytes), disconnecting, jsonrpc: %s.",
+ rpc->backlog, rpc->max_backlog, rpc->name);
+ }
+ if (disconnect) {
+ jsonrpc_error(rpc, E2BIG);
+ }
}
if (rpc->backlog == length) {
@@ -787,6 +819,10 @@ struct jsonrpc_session {
int last_error;
unsigned int seqno;
uint8_t dscp;
+
+ /* Limits for jsonrpc. */
+ size_t max_n_msgs;
+ size_t max_backlog_bytes;
};
static void
@@ -842,6 +878,8 @@ jsonrpc_session_open_multiple(const struct svec *remotes, bool retry)
s->dscp = 0;
s->last_error = 0;
+ jsonrpc_session_set_backlog_threshold(s, 0, 0);
+
const char *name = reconnect_get_name(s->reconnect);
if (!pstream_verify_name(name)) {
reconnect_set_passive(s->reconnect, true, time_msec());
@@ -882,6 +920,7 @@ jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc, uint8_t dscp)
s->pstream = NULL;
s->seqno = 1;
+ jsonrpc_session_set_backlog_threshold(s, 0, 0);
return s;
}
@@ -970,6 +1009,8 @@ jsonrpc_session_run(struct jsonrpc_session *s)
}
reconnect_connected(s->reconnect, time_msec());
s->rpc = jsonrpc_open(stream);
+ jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs,
+ s->max_backlog_bytes);
s->seqno++;
} else if (error != EAGAIN) {
reconnect_listen_error(s->reconnect, time_msec(), error);
@@ -1010,6 +1051,8 @@ jsonrpc_session_run(struct jsonrpc_session *s)
if (!error) {
reconnect_connected(s->reconnect, time_msec());
s->rpc = jsonrpc_open(s->stream);
+ jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs,
+ s->max_backlog_bytes);
s->stream = NULL;
s->seqno++;
} else if (error != EAGAIN) {
@@ -1250,3 +1293,18 @@ jsonrpc_session_set_dscp(struct jsonrpc_session *s, uint8_t dscp)
jsonrpc_session_force_reconnect(s);
}
}
+
+/* Sets thresholds for send backlog. If send backlog contains more than
+ * 'max_n_msgs' messages or is larger than 'max_backlog_bytes' bytes,
+ * connection will be closed (then reconnected, if that feature is enabled). */
+void
+jsonrpc_session_set_backlog_threshold(struct jsonrpc_session *s,
+ size_t max_n_msgs,
+ size_t max_backlog_bytes)
+{
+ s->max_n_msgs = max_n_msgs;
+ s->max_backlog_bytes = max_backlog_bytes;
+ if (s->rpc) {
+ jsonrpc_set_backlog_threshold(s->rpc, max_n_msgs, max_backlog_bytes);
+ }
+}
diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h
index a44114e8d..d75d66b86 100644
--- a/lib/jsonrpc.h
+++ b/lib/jsonrpc.h
@@ -51,6 +51,9 @@ void jsonrpc_wait(struct jsonrpc *);
int jsonrpc_get_status(const struct jsonrpc *);
size_t jsonrpc_get_backlog(const struct jsonrpc *);
+void jsonrpc_set_backlog_threshold(struct jsonrpc *, size_t max_n_msgs,
+ size_t max_backlog_bytes);
+
unsigned int jsonrpc_get_received_bytes(const struct jsonrpc *);
const char *jsonrpc_get_name(const struct jsonrpc *);
@@ -140,6 +143,9 @@ void jsonrpc_session_set_probe_interval(struct jsonrpc_session *,
int probe_interval);
void jsonrpc_session_set_dscp(struct jsonrpc_session *,
uint8_t dscp);
+void jsonrpc_session_set_backlog_threshold(struct jsonrpc_session *,
+ size_t max_n_msgs,
+ size_t max_backlog_bytes);
const char *jsonrpc_session_get_id(const struct jsonrpc_session *);
#endif /* jsonrpc.h */
diff --git a/ovsdb/raft.c b/ovsdb/raft.c
index f94a3eed8..67c714ff4 100644
--- a/ovsdb/raft.c
+++ b/ovsdb/raft.c
@@ -925,6 +925,9 @@ raft_reset_ping_timer(struct raft *raft)
raft->ping_timeout = time_msec() + raft->election_timer / 3;
}
+#define RAFT_MAX_BACKLOG_N_MSGS 500
+#define RAFT_MAX_BACKLOG_BYTES UINT32_MAX
+
static void
raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
const struct uuid *sid, bool incoming)
@@ -940,6 +943,8 @@ raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
conn->incoming = incoming;
conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
jsonrpc_session_set_probe_interval(js, 0);
+ jsonrpc_session_set_backlog_threshold(js, RAFT_MAX_BACKLOG_N_MSGS,
+ RAFT_MAX_BACKLOG_BYTES);
}
/* Starts the local server in an existing Raft cluster, using the local copy of