summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2017-04-14 13:39:49 +0200
committerantirez <antirez@gmail.com>2017-04-14 13:39:49 +0200
commit02777bb2529a403b50be4621a93fd65cdce14343 (patch)
tree9eb352ecff164840acfb72309e556163969c8b7d
parent8c829d9e4323b3636afa43d0ad8eb1ce2397c3f9 (diff)
downloadredis-02777bb2529a403b50be4621a93fd65cdce14343.tar.gz
Cluster: always add PFAIL nodes at end of gossip section.
To rely on the fact that nodes in PFAIL state will be shared around by randomly adding them in the gossip section is a weak assumption, especially after changes related to sending less ping/pong packets. We want to always include gossip entries for all the nodes that are in PFAIL state, so that the PFAIL -> FAIL state promotion can happen much faster and reliably. Related to #3929.
-rw-r--r--src/cluster.c92
-rw-r--r--src/cluster.h2
2 files changed, 71 insertions, 23 deletions
diff --git a/src/cluster.c b/src/cluster.c
index cae63e924..b23160b90 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -427,6 +427,7 @@ void clusterInit(void) {
server.cluster->stats_bus_messages_sent[i] = 0;
server.cluster->stats_bus_messages_received[i] = 0;
}
+ server.cluster->stats_pfail_nodes = 0;
memset(server.cluster->slots,0, sizeof(server.cluster->slots));
clusterCloseAllSlots();
@@ -2254,6 +2255,33 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
/* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */
}
+/* Return non zero if the node is already present in the gossip section of the
+ * message pointed by 'hdr' and having 'count' gossip entries. Otherwise
+ * zero is returned. Helper for clusterSendPing(). */
+int clusterNodeIsInGossipSection(clusterMsg *hdr, int count, clusterNode *n) {
+ int j;
+ for (j = 0; j < count; j++) {
+ if (memcmp(hdr->data.ping.gossip[j].nodename,n->name,
+ CLUSTER_NAMELEN) == 0) break;
+ }
+ return j != count;
+}
+
+/* Set the i-th entry of the gossip section in the message pointed by 'hdr'
+ * to the info of the specified node 'n'. */
+void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
+ clusterMsgDataGossip *gossip;
+ gossip = &(hdr->data.ping.gossip[i]);
+ memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
+ gossip->ping_sent = htonl(n->ping_sent/1000);
+ gossip->pong_received = htonl(n->pong_received/1000);
+ memcpy(gossip->ip,n->ip,sizeof(n->ip));
+ gossip->port = htons(n->port);
+ gossip->cport = htons(n->cport);
+ gossip->flags = htons(n->flags);
+ gossip->notused1 = 0;
+}
+
/* Send a PING or PONG packet to the specified node, making sure to add enough
* gossip informations. */
void clusterSendPing(clusterLink *link, int type) {
@@ -2298,11 +2326,15 @@ void clusterSendPing(clusterLink *link, int type) {
if (wanted < 3) wanted = 3;
if (wanted > freshnodes) wanted = freshnodes;
+ /* Include all the nodes in PFAIL state, so that failure reports are
+ * faster to propagate to go from PFAIL to FAIL state. */
+ int pfail_wanted = server.cluster->stats_pfail_nodes;
+
/* Compute the maxium totlen to allocate our buffer. We'll fix the totlen
* later according to the number of gossip sections we really were able
* to put inside the packet. */
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
- totlen += (sizeof(clusterMsgDataGossip)*wanted);
+ totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted));
/* Note: clusterBuildMessageHdr() expects the buffer to be always at least
* sizeof(clusterMsg) or more. */
if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
@@ -2319,17 +2351,13 @@ void clusterSendPing(clusterLink *link, int type) {
while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
dictEntry *de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
- clusterMsgDataGossip *gossip;
- int j;
/* Don't include this node: the whole packet header is about us
* already, so we just gossip about other nodes. */
if (this == myself) continue;
- /* Give a bias to FAIL/PFAIL nodes. */
- if (maxiterations > wanted*2 &&
- !(this->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL)))
- continue;
+ /* PFAIL nodes will be added later. */
+ if (this->flags & CLUSTER_NODE_PFAIL) continue;
/* In the gossip section don't include:
* 1) Nodes in HANDSHAKE state.
@@ -2343,27 +2371,37 @@ void clusterSendPing(clusterLink *link, int type) {
continue;
}
- /* Check if we already added this node */
- for (j = 0; j < gossipcount; j++) {
- if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
- CLUSTER_NAMELEN) == 0) break;
- }
- if (j != gossipcount) continue;
+ /* Do not add a node we already have. */
+ if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;
/* Add it */
+ clusterSetGossipEntry(hdr,gossipcount,this);
freshnodes--;
- gossip = &(hdr->data.ping.gossip[gossipcount]);
- memcpy(gossip->nodename,this->name,CLUSTER_NAMELEN);
- gossip->ping_sent = htonl(this->ping_sent/1000);
- gossip->pong_received = htonl(this->pong_received/1000);
- memcpy(gossip->ip,this->ip,sizeof(this->ip));
- gossip->port = htons(this->port);
- gossip->cport = htons(this->cport);
- gossip->flags = htons(this->flags);
- gossip->notused1 = 0;
gossipcount++;
}
+ /* If there are PFAIL nodes, add them at the end. */
+ if (pfail_wanted) {
+ dictIterator *di;
+ dictEntry *de;
+
+ di = dictGetSafeIterator(server.cluster->nodes);
+ while((de = dictNext(di)) != NULL && pfail_wanted > 0) {
+ clusterNode *node = dictGetVal(de);
+ if (node->flags & CLUSTER_NODE_HANDSHAKE) continue;
+ if (node->flags & CLUSTER_NODE_NOADDR) continue;
+ if (!(node->flags & CLUSTER_NODE_PFAIL)) continue;
+ clusterSetGossipEntry(hdr,gossipcount,node);
+ freshnodes--;
+ gossipcount++;
+ /* We take the count of the slots we allocated, since the
+ * PFAIL stats may not match perfectly with the current number
+ * of PFAIL nodes. */
+ pfail_wanted--;
+ }
+ dictReleaseIterator(di);
+ }
+
/* Ready to send... fix the totlen fiend and queue the message in the
* output buffer. */
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
@@ -3189,13 +3227,21 @@ void clusterCron(void) {
handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;
- /* Check if we have disconnected nodes and re-establish the connection. */
+ /* Check if we have disconnected nodes and re-establish the connection.
+ * Also update a few stats while we are here, that can be used to make
+ * better decisions in other part of the code. */
di = dictGetSafeIterator(server.cluster->nodes);
+ server.cluster->stats_pfail_nodes = 0;
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
+ /* Not interested in reconnecting the link with myself or nodes
+ * for which we have no address. */
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
+ if (node->flags & CLUSTER_NODE_PFAIL)
+ server.cluster->stats_pfail_nodes++;
+
/* A Node in HANDSHAKE state has a limited lifespan equal to the
* configured node timeout. */
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
diff --git a/src/cluster.h b/src/cluster.h
index e7c088569..5e228c0f9 100644
--- a/src/cluster.h
+++ b/src/cluster.h
@@ -165,6 +165,8 @@ typedef struct clusterState {
/* Messages received and sent by type. */
long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
+ long long stats_pfail_nodes; /* Number of nodes in PFAIL status,
+ excluding nodes without address. */
} clusterState;
/* Redis cluster messages header */