summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorIlya Maximets <i.maximets@ovn.org>2020-10-21 03:32:49 +0200
committerIlya Maximets <i.maximets@ovn.org>2020-11-10 01:23:33 +0100
commiteca34ebd7c418c0351eb92ae615d07edc31a9404 (patch)
tree4f46d9ed1b7db198a952958a927c6637e9bedb59 /lib
parentc4bc03d872db5fe6f804fc9ddbbec29e28335cb5 (diff)
downloadopenvswitch-eca34ebd7c418c0351eb92ae615d07edc31a9404.tar.gz
raft: Set threshold on backlog for raft connections.
RAFT messages could be fairly big. If something abnormal happens to one of the servers in a cluster it may not be able to process all the incoming messages in a timely manner. This results in jsonrpc backlog growth on the sender's side. For example if follower gets many new clients at once that it needs to serve, or it decides to take a snapshot in a period of high number of database changes. If backlog grows large enough it becomes harder and harder for follower to process incoming raft messages, it sends outdated replies and starts receiving snapshots and the whole raft log from the leader. Sometimes backlog grows too high (60GB in this example): jsonrpc|INFO|excessive sending backlog, jsonrpc: ssl:<ip>, num of msgs: 15370, backlog: 61731060773. In this case OS might actually decide to kill the sender to free some memory. Anyway, It could take a lot of time for such a server to catch up with the rest of the cluster if it has so much data to receive and process. Introducing backlog thresholds for jsonrpc connections. If sending backlog will exceed particular values (500 messages or 4GB in size), connection will be dropped and re-created. This will allow to drop all the current backlog and start over increasing chances of cluster recovery. Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=1888829 Acked-by: Dumitru Ceara <dceara@redhat.com> Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
Diffstat (limited to 'lib')
-rw-r--r--lib/jsonrpc.c60
-rw-r--r--lib/jsonrpc.h6
2 files changed, 65 insertions, 1 deletions
diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
index ecbc939fe..08aaff061 100644
--- a/lib/jsonrpc.c
+++ b/lib/jsonrpc.c
@@ -50,6 +50,10 @@ struct jsonrpc {
struct ovs_list output; /* Contains "struct ofpbuf"s. */
size_t output_count; /* Number of elements in "output". */
size_t backlog;
+
+ /* Limits. */
+ size_t max_output; /* 'output_count' disconnection threshold. */
+ size_t max_backlog; /* 'backlog' disconnection threshold. */
};
/* Rate limit for error messages. */
@@ -178,6 +182,17 @@ jsonrpc_get_backlog(const struct jsonrpc *rpc)
return rpc->status ? 0 : rpc->backlog;
}
+/* Sets thresholds for send backlog. If send backlog contains more than
+ * 'max_n_msgs' messages or is larger than 'max_backlog_bytes' bytes,
+ * connection will be dropped. */
+void
+jsonrpc_set_backlog_threshold(struct jsonrpc *rpc,
+ size_t max_n_msgs, size_t max_backlog_bytes)
+{
+ rpc->max_output = max_n_msgs;
+ rpc->max_backlog = max_backlog_bytes;
+}
+
/* Returns the number of bytes that have been received on 'rpc''s underlying
* stream. (The value wraps around if it exceeds UINT_MAX.) */
unsigned int
@@ -261,9 +276,26 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
rpc->backlog += length;
if (rpc->output_count >= 50) {
- VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of"
+ static struct vlog_rate_limit bl_rl = VLOG_RATE_LIMIT_INIT(5, 5);
+ bool disconnect = false;
+
+ VLOG_INFO_RL(&bl_rl, "excessive sending backlog, jsonrpc: %s, num of"
" msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name,
rpc->output_count, rpc->backlog);
+ if (rpc->max_output && rpc->output_count > rpc->max_output) {
+ disconnect = true;
+ VLOG_WARN("sending backlog exceeded maximum number of messages (%"
+ PRIuSIZE" > %"PRIuSIZE"), disconnecting, jsonrpc: %s.",
+ rpc->output_count, rpc->max_output, rpc->name);
+ } else if (rpc->max_backlog && rpc->backlog > rpc->max_backlog) {
+ disconnect = true;
+ VLOG_WARN("sending backlog exceeded maximum size (%"PRIuSIZE" > %"
+ PRIuSIZE" bytes), disconnecting, jsonrpc: %s.",
+ rpc->backlog, rpc->max_backlog, rpc->name);
+ }
+ if (disconnect) {
+ jsonrpc_error(rpc, E2BIG);
+ }
}
if (rpc->backlog == length) {
@@ -787,6 +819,10 @@ struct jsonrpc_session {
int last_error;
unsigned int seqno;
uint8_t dscp;
+
+ /* Limits for jsonrpc. */
+ size_t max_n_msgs;
+ size_t max_backlog_bytes;
};
static void
@@ -842,6 +878,8 @@ jsonrpc_session_open_multiple(const struct svec *remotes, bool retry)
s->dscp = 0;
s->last_error = 0;
+ jsonrpc_session_set_backlog_threshold(s, 0, 0);
+
const char *name = reconnect_get_name(s->reconnect);
if (!pstream_verify_name(name)) {
reconnect_set_passive(s->reconnect, true, time_msec());
@@ -882,6 +920,7 @@ jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc, uint8_t dscp)
s->pstream = NULL;
s->seqno = 1;
+ jsonrpc_session_set_backlog_threshold(s, 0, 0);
return s;
}
@@ -970,6 +1009,8 @@ jsonrpc_session_run(struct jsonrpc_session *s)
}
reconnect_connected(s->reconnect, time_msec());
s->rpc = jsonrpc_open(stream);
+ jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs,
+ s->max_backlog_bytes);
s->seqno++;
} else if (error != EAGAIN) {
reconnect_listen_error(s->reconnect, time_msec(), error);
@@ -1010,6 +1051,8 @@ jsonrpc_session_run(struct jsonrpc_session *s)
if (!error) {
reconnect_connected(s->reconnect, time_msec());
s->rpc = jsonrpc_open(s->stream);
+ jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs,
+ s->max_backlog_bytes);
s->stream = NULL;
s->seqno++;
} else if (error != EAGAIN) {
@@ -1250,3 +1293,18 @@ jsonrpc_session_set_dscp(struct jsonrpc_session *s, uint8_t dscp)
jsonrpc_session_force_reconnect(s);
}
}
+
+/* Sets thresholds for send backlog. If send backlog contains more than
+ * 'max_n_msgs' messages or is larger than 'max_backlog_bytes' bytes,
+ * connection will be closed (then reconnected, if that feature is enabled). */
+void
+jsonrpc_session_set_backlog_threshold(struct jsonrpc_session *s,
+ size_t max_n_msgs,
+ size_t max_backlog_bytes)
+{
+ s->max_n_msgs = max_n_msgs;
+ s->max_backlog_bytes = max_backlog_bytes;
+ if (s->rpc) {
+ jsonrpc_set_backlog_threshold(s->rpc, max_n_msgs, max_backlog_bytes);
+ }
+}
diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h
index a44114e8d..d75d66b86 100644
--- a/lib/jsonrpc.h
+++ b/lib/jsonrpc.h
@@ -51,6 +51,9 @@ void jsonrpc_wait(struct jsonrpc *);
int jsonrpc_get_status(const struct jsonrpc *);
size_t jsonrpc_get_backlog(const struct jsonrpc *);
+void jsonrpc_set_backlog_threshold(struct jsonrpc *, size_t max_n_msgs,
+ size_t max_backlog_bytes);
+
unsigned int jsonrpc_get_received_bytes(const struct jsonrpc *);
const char *jsonrpc_get_name(const struct jsonrpc *);
@@ -140,6 +143,9 @@ void jsonrpc_session_set_probe_interval(struct jsonrpc_session *,
int probe_interval);
void jsonrpc_session_set_dscp(struct jsonrpc_session *,
uint8_t dscp);
+void jsonrpc_session_set_backlog_threshold(struct jsonrpc_session *,
+ size_t max_n_msgs,
+ size_t max_backlog_bytes);
const char *jsonrpc_session_get_id(const struct jsonrpc_session *);
#endif /* jsonrpc.h */