summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--redis.conf11
-rw-r--r--src/cluster.c216
-rw-r--r--src/cluster.h8
-rw-r--r--src/commands.c9
-rw-r--r--src/commands/cluster-links.json15
-rw-r--r--src/config.c1
-rw-r--r--src/object.c3
-rw-r--r--src/server.c3
-rw-r--r--src/server.h3
-rw-r--r--tests/cluster/cluster.tcl69
-rw-r--r--tests/cluster/tests/24-links.tcl99
-rw-r--r--tests/support/util.tcl8
-rw-r--r--tests/unit/pendingquerybuf.tcl8
13 files changed, 421 insertions, 32 deletions
diff --git a/redis.conf b/redis.conf
index b2810996d..1350ac96d 100644
--- a/redis.conf
+++ b/redis.conf
@@ -1576,6 +1576,17 @@ lua-time-limit 5000
#
# cluster-allow-reads-when-down no
+# Cluster link send buffer limit is the limit on the memory usage of an individual
+# cluster bus link's send buffer in bytes. Cluster links would be freed if they exceed
+# this limit. This is to primarily prevent send buffers from growing unbounded on links
+# toward slow peers (E.g. PubSub messages being piled up).
+# This limit is disabled by default. Enable this limit when 'mem_cluster_links' INFO field
+# and/or 'send-buffer-allocated' entries in the 'CLUSTER LINKS` command output continuously increase.
+# Minimum limit of 1gb is recommended so that cluster link buffer can fit in at least a single
+# PubSub message by default. (client-query-buffer-limit default value is 1gb)
+#
+# cluster-link-sendbuf-limit 0
+
# In order to setup your cluster make sure to read the documentation
# available at https://redis.io web site.
diff --git a/src/cluster.c b/src/cluster.c
index cbc4cd184..6538ceb0b 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -568,11 +568,15 @@ void clusterInit(void) {
server.cluster->failover_auth_epoch = 0;
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
server.cluster->lastVoteEpoch = 0;
+
+ /* Initialize stats */
for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
server.cluster->stats_bus_messages_sent[i] = 0;
server.cluster->stats_bus_messages_received[i] = 0;
}
server.cluster->stats_pfail_nodes = 0;
+ server.cluster->stat_cluster_links_buffer_limit_exceeded = 0;
+
memset(server.cluster->slots,0, sizeof(server.cluster->slots));
clusterCloseAllSlots();
@@ -711,8 +715,13 @@ clusterLink *createClusterLink(clusterNode *node) {
link->sndbuf = sdsempty();
link->rcvbuf = zmalloc(link->rcvbuf_alloc = RCVBUF_INIT_LEN);
link->rcvbuf_len = 0;
- link->node = node;
link->conn = NULL;
+ link->node = node;
+ /* Related node can only possibly be known at link creation time if this is an outbound link */
+ link->inbound = (node == NULL);
+ if (!link->inbound) {
+ node->link = link;
+ }
return link;
}
@@ -726,11 +735,33 @@ void freeClusterLink(clusterLink *link) {
}
sdsfree(link->sndbuf);
zfree(link->rcvbuf);
- if (link->node)
- link->node->link = NULL;
+ if (link->node) {
+ if (link->node->link == link) {
+ serverAssert(!link->inbound);
+ link->node->link = NULL;
+ } else if (link->node->inbound_link == link) {
+ serverAssert(link->inbound);
+ link->node->inbound_link = NULL;
+ }
+ }
zfree(link);
}
+void setClusterNodeToInboundClusterLink(clusterNode *node, clusterLink *link) {
+ serverAssert(!link->node);
+ serverAssert(link->inbound);
+ if (node->inbound_link) {
+ /* A peer may disconnect and then reconnect with us, and it's not guaranteed that
+ * we would always process the disconnection of the existing inbound link before
+ * accepting a new existing inbound link. Therefore, it's possible to have more than
+ * one inbound link from the same node at the same time. */
+ serverLog(LL_DEBUG, "Replacing inbound link fd %d from node %s with fd %d",
+ node->inbound_link->conn->fd, node->name, link->conn->fd);
+ }
+ node->inbound_link = link;
+ link->node = node;
+}
+
static void clusterConnAcceptHandler(connection *conn) {
clusterLink *link;
@@ -879,6 +910,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->data_received = 0;
node->fail_time = 0;
node->link = NULL;
+ node->inbound_link = NULL;
memset(node->ip,0,sizeof(node->ip));
node->port = 0;
node->cport = 0;
@@ -1046,8 +1078,9 @@ void freeClusterNode(clusterNode *n) {
serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
sdsfree(nodename);
- /* Release link and associated data structures. */
+ /* Release links and associated data structures. */
if (n->link) freeClusterLink(n->link);
+ if (n->inbound_link) freeClusterLink(n->inbound_link);
listRelease(n->fail_reports);
zfree(n->slaves);
zfree(n);
@@ -1821,6 +1854,26 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
}
}
+static clusterNode *getNodeFromLinkAndMsg(clusterLink *link, clusterMsg *hdr) {
+ clusterNode *sender;
+ if (link->node && !nodeInHandshake(link->node)) {
+ /* If the link has an associated node, use that so that we don't have to look it
+ * up every time, except when the node is still in handshake, the node still has
+ * a random name thus not truly "known". */
+ sender = link->node;
+ } else {
+ /* Otherwise, fetch sender based on the message */
+ sender = clusterLookupNode(hdr->sender);
+ /* We know the sender node but haven't associate it with the link. This must
+ * be an inbound link because only for inbound links we didn't know which node
+ * to associate when they were created. */
+ if (sender && !link->node) {
+ setClusterNodeToInboundClusterLink(sender, link);
+ }
+ }
+ return sender;
+}
+
/* When this function is called, there is a packet to process starting
* at link->rcvbuf. Releasing the buffer is up to the caller, so this
* function should just handle the higher level stuff of processing the
@@ -1896,10 +1949,7 @@ int clusterProcessPacket(clusterLink *link) {
if (totlen != explen) return 1;
}
- /* Check if the sender is a known node. Note that for incoming connections
- * we don't store link->node information, but resolve the node by the
- * ID in the header each time in the current implementation. */
- sender = clusterLookupNode(hdr->sender);
+ sender = getNodeFromLinkAndMsg(link, hdr);
/* Update the last time we saw any data from this node. We
* use this in order to avoid detecting a timeout from a node that
@@ -2000,7 +2050,7 @@ int clusterProcessPacket(clusterLink *link) {
serverLog(LL_DEBUG,"%s packet received: %s",
clusterGetMessageTypeString(type),
link->node ? link->node->name : "NULL");
- if (link->node) {
+ if (!link->inbound) {
if (nodeInHandshake(link->node)) {
/* If we already have this node, try to change the
* IP/port of the node with the new one. */
@@ -2070,7 +2120,7 @@ int clusterProcessPacket(clusterLink *link) {
}
/* Update our info about the node */
- if (link->node && type == CLUSTERMSG_TYPE_PONG) {
+ if (!link->inbound && type == CLUSTERMSG_TYPE_PONG) {
link->node->pong_received = now;
link->node->ping_sent = 0;
@@ -2673,7 +2723,7 @@ void clusterSendPing(clusterLink *link, int type) {
hdr = (clusterMsg*) buf;
/* Populate the header. */
- if (link->node && type == CLUSTERMSG_TYPE_PING)
+ if (!link->inbound && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();
clusterBuildMessageHdr(hdr,type);
@@ -3588,7 +3638,7 @@ void clusterHandleManualFailover(void) {
/* Check if the node is disconnected and re-establish the connection.
* Also update a few stats while we are here, that can be used to make
* better decisions in other part of the code. */
-int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_timeout, mstime_t now) {
+static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_timeout, mstime_t now) {
/* Not interested in reconnecting the link with myself or nodes
* for which we have no address. */
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) return 1;
@@ -3622,20 +3672,57 @@ int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_timeout
freeClusterLink(link);
return 0;
}
- node->link = link;
}
return 0;
}
-/* Resize the send buffer of a node if it is wasting
- * enough space. */
-int clusterNodeCronResizeBuffers(clusterNode *node) {
+static void resizeClusterLinkBuffer(clusterLink *link) {
/* If unused space is a lot bigger than the used portion of the buffer then free up unused space.
* We use a factor of 4 because of the greediness of sdsMakeRoomFor (used by sdscatlen). */
- if (node->link != NULL && sdsavail(node->link->sndbuf) / 4 > sdslen(node->link->sndbuf)) {
- node->link->sndbuf = sdsRemoveFreeSpace(node->link->sndbuf);
+ if (link != NULL && sdsavail(link->sndbuf) / 4 > sdslen(link->sndbuf)) {
+ link->sndbuf = sdsRemoveFreeSpace(link->sndbuf);
+ }
+}
+
+/* Resize the send buffer of a node if it is wasting
+ * enough space. */
+static void clusterNodeCronResizeBuffers(clusterNode *node) {
+ resizeClusterLinkBuffer(node->link);
+ resizeClusterLinkBuffer(node->inbound_link);
+}
+
+static void freeClusterLinkOnBufferLimitReached(clusterLink *link) {
+ if (link == NULL || server.cluster_link_sendbuf_limit_bytes == 0) {
+ return;
+ }
+ unsigned long long mem_link = sdsalloc(link->sndbuf);
+ if (mem_link > server.cluster_link_sendbuf_limit_bytes) {
+ serverLog(LL_WARNING, "Freeing cluster link(%s node %s, used memory: %llu) due to "
+ "exceeding send buffer memory limit.", link->inbound ? "from" : "to",
+ link->node ? link->node->name : "", mem_link);
+ freeClusterLink(link);
+ server.cluster->stat_cluster_links_buffer_limit_exceeded++;
+ }
+}
+
+/* Free outbound link to a node if its send buffer size exceeded limit. */
+static void clusterNodeCronFreeLinkOnBufferLimitReached(clusterNode *node) {
+ freeClusterLinkOnBufferLimitReached(node->link);
+ freeClusterLinkOnBufferLimitReached(node->inbound_link);
+}
+
+static size_t getClusterLinkMemUsage(clusterLink *link) {
+ if (link != NULL) {
+ return sizeof(clusterLink) + sdsalloc(link->sndbuf) + link->rcvbuf_alloc;
+ } else {
+ return 0;
}
- return 0;
+}
+
+/* Update memory usage statistics of all current cluster links */
+static void clusterNodeCronUpdateClusterLinksMemUsage(clusterNode *node) {
+ server.stat_cluster_links_memory += getClusterLinkMemUsage(node->link);
+ server.stat_cluster_links_memory += getClusterLinkMemUsage(node->inbound_link);
}
/* This is executed 10 times every second */
@@ -3662,14 +3749,25 @@ void clusterCron(void) {
/* Clear so clusterNodeCronHandleReconnect can count the number of nodes in PFAIL. */
server.cluster->stats_pfail_nodes = 0;
+ /* Clear so clusterNodeCronUpdateClusterLinksMemUsage can count the current memory usage of all cluster links. */
+ server.stat_cluster_links_memory = 0;
/* Run through some of the operations we want to do on each cluster node. */
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
- /* The protocol is that they return non-zero if the node was
- * terminated. */
+ /* The sequence goes:
+ * 1. We try to shrink link buffers if possible.
+ * 2. We free the links whose buffers are still oversized after possible shrinking.
+ * 3. We update the latest memory usage of cluster links.
+ * 4. We immediately attempt reconnecting after freeing links.
+ */
+ clusterNodeCronResizeBuffers(node);
+ clusterNodeCronFreeLinkOnBufferLimitReached(node);
+ clusterNodeCronUpdateClusterLinksMemUsage(node);
+ /* The protocol is that function(s) below return non-zero if the node was
+ * terminated.
+ */
if(clusterNodeCronHandleReconnect(node, handshake_timeout, now)) continue;
- if(clusterNodeCronResizeBuffers(node)) continue;
}
dictReleaseIterator(di);
@@ -4399,6 +4497,70 @@ sds clusterGenNodesDescription(int filter, int use_pport) {
return ci;
}
+/* Add to the output buffer of the given client the description of the given cluster link.
+ * The description is a map with each entry being an attribute of the link. */
+void addReplyClusterLinkDescription(client *c, clusterLink *link) {
+ addReplyMapLen(c, 6);
+
+ addReplyBulkCString(c, "direction");
+ addReplyBulkCString(c, link->inbound ? "from" : "to");
+
+ /* addReplyClusterLinkDescription is only called for links that have been
+ * associated with nodes. The association is always bi-directional, so
+ * in addReplyClusterLinkDescription, link->node should never be NULL. */
+ serverAssert(link->node);
+ sds node_name = sdsnewlen(link->node->name, CLUSTER_NAMELEN);
+ addReplyBulkCString(c, "node");
+ addReplyBulkCString(c, node_name);
+ sdsfree(node_name);
+
+ addReplyBulkCString(c, "create-time");
+ addReplyLongLong(c, link->ctime);
+
+ char events[3], *p;
+ p = events;
+ if (link->conn) {
+ if (connHasReadHandler(link->conn)) *p++ = 'r';
+ if (connHasWriteHandler(link->conn)) *p++ = 'w';
+ }
+ *p = '\0';
+ addReplyBulkCString(c, "events");
+ addReplyBulkCString(c, events);
+
+ addReplyBulkCString(c, "send-buffer-allocated");
+ addReplyLongLong(c, sdsalloc(link->sndbuf));
+
+ addReplyBulkCString(c, "send-buffer-used");
+ addReplyLongLong(c, sdslen(link->sndbuf));
+}
+
+/* Add to the output buffer of the given client an array of cluster link descriptions,
+ * with array entry being a description of a single current cluster link. */
+void addReplyClusterLinksDescription(client *c) {
+ dictIterator *di;
+ dictEntry *de;
+ void *arraylen_ptr = NULL;
+ int num_links = 0;
+
+ arraylen_ptr = addReplyDeferredLen(c);
+
+ di = dictGetSafeIterator(server.cluster->nodes);
+ while((de = dictNext(di)) != NULL) {
+ clusterNode *node = dictGetVal(de);
+ if (node->link) {
+ num_links++;
+ addReplyClusterLinkDescription(c, node->link);
+ }
+ if (node->inbound_link) {
+ num_links++;
+ addReplyClusterLinkDescription(c, node->inbound_link);
+ }
+ }
+ dictReleaseIterator(di);
+
+ setDeferredArrayLen(c, arraylen_ptr, num_links);
+}
+
/* -----------------------------------------------------------------------------
* CLUSTER command
* -------------------------------------------------------------------------- */
@@ -4608,6 +4770,9 @@ void clusterCommand(client *c) {
"SLOTS",
" Return information about slots range mappings. Each range is made of:",
" start, end, master and replicas IP addresses, ports and ids",
+"LINKS",
+" Return information about all network links between this node and its peers.",
+" Output format is an array where each array element is a map containing attributes of a link",
NULL
};
addReplyHelp(c, help);
@@ -4919,6 +5084,10 @@ NULL
info = sdscatprintf(info,
"cluster_stats_messages_received:%lld\r\n", tot_msg_received);
+ info = sdscatprintf(info,
+ "total_cluster_links_buffer_limit_exceeded:%llu\r\n",
+ server.cluster->stat_cluster_links_buffer_limit_exceeded);
+
/* Produce the reply protocol. */
addReplyVerbatim(c,info,sdslen(info),"txt");
sdsfree(info);
@@ -5182,6 +5351,9 @@ NULL
}
clusterReset(hard);
addReply(c,shared.ok);
+ } else if (!strcasecmp(c->argv[1]->ptr,"links") && c->argc == 2) {
+ /* CLUSTER LINKS */
+ addReplyClusterLinksDescription(c);
} else {
addReplySubcommandSyntaxError(c);
return;
diff --git a/src/cluster.h b/src/cluster.h
index e0cf5c4dd..d64e2a5b9 100644
--- a/src/cluster.h
+++ b/src/cluster.h
@@ -39,7 +39,8 @@ typedef struct clusterLink {
char *rcvbuf; /* Packet reception buffer */
size_t rcvbuf_len; /* Used size of rcvbuf */
size_t rcvbuf_alloc; /* Allocated size of rcvbuf */
- struct clusterNode *node; /* Node related to this link if any, or NULL */
+ struct clusterNode *node; /* Node related to this link. Initialized to NULL when unknown */
+ int inbound; /* 1 if this link is an inbound link accepted from the related node */
} clusterLink;
/* Cluster node flags and macros. */
@@ -137,7 +138,8 @@ typedef struct clusterNode {
int pport; /* Latest known clients plaintext port. Only used
if the main clients port is for TLS. */
int cport; /* Latest known cluster port of this node. */
- clusterLink *link; /* TCP/IP link with this node */
+ clusterLink *link; /* TCP/IP link established toward this node */
+ clusterLink *inbound_link; /* TCP/IP link accepted from this node */
list *fail_reports; /* List of nodes signaling this as failing */
} clusterNode;
@@ -192,11 +194,13 @@ typedef struct clusterState {
/* The following fields are used by masters to take state on elections. */
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
+ /* Stats */
/* Messages received and sent by type. */
long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
long long stats_pfail_nodes; /* Number of nodes in PFAIL status,
excluding nodes without address. */
+ unsigned long long stat_cluster_links_buffer_limit_exceeded; /* Total number of cluster links freed due to exceeding buffer limit */
} clusterState;
/* Redis cluster messages header */
diff --git a/src/commands.c b/src/commands.c
index c88072072..4232ba8b5 100644
--- a/src/commands.c
+++ b/src/commands.c
@@ -399,6 +399,14 @@ struct redisCommandArg CLUSTER_KEYSLOT_Args[] = {
{0}
};
+/********** CLUSTER LINKS ********************/
+
+/* CLUSTER LINKS history */
+#define CLUSTER_LINKS_History NULL
+
+/* CLUSTER LINKS hints */
+#define CLUSTER_LINKS_Hints NULL
+
/********** CLUSTER MEET ********************/
/* CLUSTER MEET history */
@@ -552,6 +560,7 @@ struct redisCommand CLUSTER_Subcommands[] = {
{"help","Show helpful text about the different subcommands","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_HELP_History,CLUSTER_HELP_Hints,clusterCommand,2,CMD_LOADING|CMD_STALE,0},
{"info","Provides info about Redis Cluster node state","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_INFO_History,CLUSTER_INFO_Hints,clusterCommand,2,CMD_RANDOM|CMD_STALE,0},
{"keyslot","Returns the hash slot of the specified key","O(N) where N is the number of bytes in the key","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_KEYSLOT_History,CLUSTER_KEYSLOT_Hints,clusterCommand,3,CMD_RANDOM|CMD_STALE,0,.args=CLUSTER_KEYSLOT_Args},
+{"links","Returns a list of all TCP links to and from peer nodes in cluster","O(N) where N is the total number of Cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_LINKS_History,CLUSTER_LINKS_Hints,clusterCommand,2,CMD_RANDOM|CMD_STALE,0},
{"meet","Force a node cluster to handshake with another node","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_MEET_History,CLUSTER_MEET_Hints,clusterCommand,-4,CMD_ADMIN|CMD_RANDOM|CMD_STALE,0,.args=CLUSTER_MEET_Args},
{"myid","Return the node id","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_MYID_History,CLUSTER_MYID_Hints,clusterCommand,2,CMD_RANDOM|CMD_STALE,0},
{"nodes","Get Cluster config for the node","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_NODES_History,CLUSTER_NODES_Hints,clusterCommand,2,CMD_RANDOM|CMD_STALE,0},
diff --git a/src/commands/cluster-links.json b/src/commands/cluster-links.json
new file mode 100644
index 000000000..cc53d3384
--- /dev/null
+++ b/src/commands/cluster-links.json
@@ -0,0 +1,15 @@
+{
+ "LINKS": {
+ "summary": "Returns a list of all TCP links to and from peer nodes in cluster",
+ "complexity": "O(N) where N is the total number of Cluster nodes",
+ "group": "cluster",
+ "since": "7.0.0",
+ "arity": 2,
+ "container": "CLUSTER",
+ "function": "clusterCommand",
+ "command_flags": [
+ "RANDOM",
+ "STALE"
+ ]
+ }
+}
diff --git a/src/config.c b/src/config.c
index 5146559e4..f171b0041 100644
--- a/src/config.c
+++ b/src/config.c
@@ -2690,6 +2690,7 @@ standardConfig configs[] = {
/* Unsigned Long Long configs */
createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, ULLONG_MAX, server.maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory),
+ createULongLongConfig("cluster-link-sendbuf-limit", NULL, MODIFIABLE_CONFIG, 0, ULLONG_MAX, server.cluster_link_sendbuf_limit_bytes, 0, MEMORY_CONFIG, NULL, NULL),
/* Size_t configs */
createSizeTConfig("hash-max-listpack-entries", "hash-max-ziplist-entries", MODIFIABLE_CONFIG, 0, LONG_MAX, server.hash_max_listpack_entries, 512, INTEGER_CONFIG, NULL, NULL),
diff --git a/src/object.c b/src/object.c
index 7a5563ccb..d53f3e084 100644
--- a/src/object.c
+++ b/src/object.c
@@ -1196,6 +1196,9 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
server.stat_clients_type_memory[CLIENT_TYPE_NORMAL];
mem_total += mh->clients_normal;
+ mh->cluster_links = server.stat_cluster_links_memory;
+ mem_total += mh->cluster_links;
+
mem = 0;
if (server.aof_state != AOF_OFF) {
mem += sdsZmallocSize(server.aof_buf);
diff --git a/src/server.c b/src/server.c
index 0abf20dcd..e09974e41 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2352,6 +2352,7 @@ void initServer(void) {
server.stat_module_progress = 0;
for (int j = 0; j < CLIENT_TYPE_COUNT; j++)
server.stat_clients_type_memory[j] = 0;
+ server.stat_cluster_links_memory = 0;
server.cron_malloc_stats.zmalloc_used = 0;
server.cron_malloc_stats.process_rss = 0;
server.cron_malloc_stats.allocator_allocated = 0;
@@ -4559,6 +4560,7 @@ sds genRedisInfoString(const char *section) {
"mem_total_replication_buffers:%zu\r\n"
"mem_clients_slaves:%zu\r\n"
"mem_clients_normal:%zu\r\n"
+ "mem_cluster_links:%zu\r\n"
"mem_aof_buffer:%zu\r\n"
"mem_allocator:%s\r\n"
"active_defrag_running:%d\r\n"
@@ -4611,6 +4613,7 @@ sds genRedisInfoString(const char *section) {
server.repl_buffer_mem,
mh->clients_slaves,
mh->clients_normal,
+ mh->cluster_links,
mh->aof_buffer,
ZMALLOC_LIB,
server.active_defrag_running,
diff --git a/src/server.h b/src/server.h
index a3600e13a..6ecad5456 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1217,6 +1217,7 @@ struct redisMemOverhead {
size_t repl_backlog;
size_t clients_slaves;
size_t clients_normal;
+ size_t cluster_links;
size_t aof_buffer;
size_t lua_caches;
size_t functions_caches;
@@ -1462,6 +1463,7 @@ struct redisServer {
size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */
double stat_module_progress; /* Module save progress. */
redisAtomic size_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */
+ size_t stat_cluster_links_memory;/* Mem usage by cluster links */
long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */
long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */
long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */
@@ -1734,6 +1736,7 @@ struct redisServer {
int cluster_allow_reads_when_down; /* Are reads allowed when the cluster
is down? */
int cluster_config_file_lock_fd; /* cluster config fd, will be flock */
+ unsigned long long cluster_link_sendbuf_limit_bytes; /* Memory usage limit on individual link send buffers*/
/* Scripting */
client *script_caller; /* The client running script right now, or NULL */
mstime_t script_time_limit; /* Script timeout in milliseconds */
diff --git a/tests/cluster/cluster.tcl b/tests/cluster/cluster.tcl
index e95789282..7b7ce5343 100644
--- a/tests/cluster/cluster.tcl
+++ b/tests/cluster/cluster.tcl
@@ -175,3 +175,72 @@ proc wait_for_cluster_propagation {} {
fail "cluster config did not reach a consistent state"
}
}
+
+# Returns a parsed CLUSTER LINKS output of the instance identified
+# by the given `id` as a list of dictionaries, with each dictionary
+# corresponds to a link.
+proc get_cluster_links id {
+ set lines [R $id cluster links]
+ set links {}
+ foreach l $lines {
+ if {$l eq {}} continue
+ assert_equal [llength $l] 12
+ assert_equal [lindex $l 0] "direction"
+ set dir [lindex $l 1]
+ assert_equal [lindex $l 2] "node"
+ set node [lindex $l 3]
+ assert_equal [lindex $l 4] "create-time"
+ set create_time [lindex $l 5]
+ assert_equal [lindex $l 6] "events"
+ set events [lindex $l 7]
+ assert_equal [lindex $l 8] "send-buffer-allocated"
+ set send_buffer_allocated [lindex $l 9]
+ assert_equal [lindex $l 10] "send-buffer-used"
+ set send_buffer_used [lindex $l 11]
+ set link [dict create \
+ dir $dir \
+ node $node \
+ create_time $create_time \
+ events $events \
+ send_buffer_allocated $send_buffer_allocated \
+ send_buffer_used $send_buffer_used \
+ ]
+ lappend links $link
+ }
+ return $links
+}
+
+proc get_links_with_peer {this_instance_id peer_nodename} {
+ set links [get_cluster_links $this_instance_id]
+ set links_with_peer {}
+ foreach l $links {
+ if {[dict get $l node] eq $peer_nodename} {
+ lappend links_with_peer $l
+ }
+ }
+ return $links_with_peer
+}
+
+# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that
+# corresponds to the link established toward a peer identified by `peer_nodename`
+proc get_link_to_peer {this_instance_id peer_nodename} {
+ set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename]
+ foreach l $links_with_peer {
+ if {[dict get $l dir] eq "to"} {
+ return $l
+ }
+ }
+ return {}
+}
+
+# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that
+# corresponds to the link accepted from a peer identified by `peer_nodename`
+proc get_link_from_peer {this_instance_id peer_nodename} {
+ set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename]
+ foreach l $links_with_peer {
+ if {[dict get $l dir] eq "from"} {
+ return $l
+ }
+ }
+ return {}
+}
diff --git a/tests/cluster/tests/24-links.tcl b/tests/cluster/tests/24-links.tcl
new file mode 100644
index 000000000..6657a8ce4
--- /dev/null
+++ b/tests/cluster/tests/24-links.tcl
@@ -0,0 +1,99 @@
+source "../tests/includes/init-tests.tcl"
+
+test "Create a cluster with two single-node shards" {
+ create_cluster 2 0
+}
+
+test "Cluster should start ok" {
+ assert_cluster_state ok
+}
+
+test "Each node has two links with each peer" {
+ foreach_redis_id id {
+ # Get number of peers, excluding myself
+ set nodes [get_cluster_nodes $id]
+ set num_peers [expr [llength $nodes] - 1]
+
+ # Get number of links to peers
+ set links [get_cluster_links $id]
+ set num_links [llength $links]
+
+ # Two links per peer
+ assert {$num_peers*2 eq $num_links}
+
+ # For each peer there should be exactly one
+ # link "to" it and one link "from" it.
+ foreach n $nodes {
+ if {[has_flag $n myself]} continue
+ set peer [dict get $n id]
+ set to 0
+ set from 0
+ foreach l $links {
+ if {[dict get $l node] eq $peer} {
+ if {[dict get $l dir] eq "to"} {
+ incr to
+ } elseif {[dict get $l dir] eq "from"} {
+ incr from
+ }
+ }
+ }
+ assert {$to eq 1}
+ assert {$from eq 1}
+ }
+ }
+}
+
+set primary1_id 0
+set primary2_id 1
+
+set primary1 [Rn $primary1_id]
+set primary2 [Rn $primary2_id]
+
+test "Disconnect link when send buffer limit reached" {
+ # On primary1, set timeout to 1 hour so links won't get disconnected due to timeouts
+ set oldtimeout [lindex [$primary1 CONFIG get cluster-node-timeout] 1]
+ $primary1 CONFIG set cluster-node-timeout [expr 60*60*1000]
+
+ # Get primary1's links with primary2
+ set primary2_name [dict get [get_myself $primary2_id] id]
+ set orig_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name]
+ set orig_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name]
+
+ # On primary1, set cluster link send buffer limit to 32MB
+ set oldlimit [lindex [$primary1 CONFIG get cluster-link-sendbuf-limit] 1]
+ $primary1 CONFIG set cluster-link-sendbuf-limit [expr 32*1024*1024]
+ assert {[get_info_field [$primary1 cluster info] total_cluster_links_buffer_limit_exceeded] eq 0}
+
+ # To manufacture an ever-growing send buffer from primary1 to primary2,
+ # make primary2 unresponsive.
+ set primary2_pid [get_instance_attrib redis $primary2_id pid]
+ exec kill -SIGSTOP $primary2_pid
+
+ # On primary1, send a 10MB Pubsub message. It will stay in send buffer of
+ # the link from primary1 to primary2
+ $primary1 publish channel [prepare_value [expr 10*1024*1024]]
+
+ # Check the same link has not been disconnected, but its send buffer has grown
+ set same_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name]
+ assert {[dict get $same_link_p1_to_p2 create_time] eq [dict get $orig_link_p1_to_p2 create_time]}
+ assert {[dict get $same_link_p1_to_p2 send_buffer_allocated] > [dict get $orig_link_p1_to_p2 send_buffer_allocated]}
+
+ # On primary1, send another 30MB Pubsub message.
+ $primary1 publish channel [prepare_value [expr 30*1024*1024]]
+
+ # Link has exceeded buffer limit and been dropped and recreated
+ set new_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name]
+ assert {[dict get $new_link_p1_to_p2 create_time] > [dict get $orig_link_p1_to_p2 create_time]}
+ assert {[get_info_field [$primary1 cluster info] total_cluster_links_buffer_limit_exceeded] eq 1}
+
+ # Link from primary2 should not be affected
+ set same_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name]
+ assert {[dict get $same_link_p1_from_p2 create_time] eq [dict get $orig_link_p1_from_p2 create_time]}
+
+ # Revive primary2
+ exec kill -SIGCONT $primary2_pid
+
+ # Reset configs on primary1 so config changes don't leak out to other tests
+ $primary1 CONFIG set cluster-node-timeout $oldtimeout
+ $primary1 CONFIG set cluster-link-sendbuf-limit $oldlimit
+}
diff --git a/tests/support/util.tcl b/tests/support/util.tcl
index d97743665..08fea1faa 100644
--- a/tests/support/util.tcl
+++ b/tests/support/util.tcl
@@ -978,3 +978,11 @@ proc read_big_bulk {code {compare no} {prefix ""}} {
r readraw 0
return $resp_len
}
+
+proc prepare_value {size} {
+ set _v "c"
+ for {set i 1} {$i < $size} {incr i} {
+ append _v 0
+ }
+ return $_v
+}
diff --git a/tests/unit/pendingquerybuf.tcl b/tests/unit/pendingquerybuf.tcl
index b1c2ee0d5..c1278c8fd 100644
--- a/tests/unit/pendingquerybuf.tcl
+++ b/tests/unit/pendingquerybuf.tcl
@@ -4,14 +4,6 @@ proc info_memory {r property} {
}
}
-proc prepare_value {size} {
- set _v "c"
- for {set i 1} {$i < $size} {incr i} {
- append _v 0
- }
- return $_v
-}
-
start_server {tags {"wait external:skip"}} {
start_server {} {
set slave [srv 0 client]