summaryrefslogtreecommitdiff
path: root/src/cluster.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/cluster.c')
-rw-r--r--src/cluster.c185
1 files changed, 151 insertions, 34 deletions
diff --git a/src/cluster.c b/src/cluster.c
index 4d7b0502d..d5ad85fe7 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -423,8 +423,11 @@ void clusterInit(void) {
server.cluster->failover_auth_epoch = 0;
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
server.cluster->lastVoteEpoch = 0;
- server.cluster->stats_bus_messages_sent = 0;
- server.cluster->stats_bus_messages_received = 0;
+ for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
+ server.cluster->stats_bus_messages_sent[i] = 0;
+ server.cluster->stats_bus_messages_received[i] = 0;
+ }
+ server.cluster->stats_pfail_nodes = 0;
memset(server.cluster->slots,0, sizeof(server.cluster->slots));
clusterCloseAllSlots();
@@ -476,8 +479,10 @@ void clusterInit(void) {
}
}
- /* The slots -> keys map is a sorted set. Init it. */
- server.cluster->slots_to_keys = zslCreate();
+ /* The slots -> keys map is a radix tree. Initialize it here. */
+ server.cluster->slots_to_keys = raxNew();
+ memset(server.cluster->slots_keys_count,0,
+ sizeof(server.cluster->slots_keys_count));
/* Set myself->port / cport to my listening ports, we'll just need to
* discover the IP address via MEET messages. */
@@ -1350,6 +1355,28 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
}
}
+ /* If from our POV the node is up (no failure flags are set),
+ * we have no pending ping for the node, nor we have failure
+ * reports for this node, update the last pong time with the
+ * one we see from the other nodes. */
+ if (!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
+ node->ping_sent == 0 &&
+ clusterNodeFailureReportsCount(node) == 0)
+ {
+ mstime_t pongtime = ntohl(g->pong_received);
+ pongtime *= 1000; /* Convert back to milliseconds. */
+
+ /* Replace the pong time with the received one only if
+ * it's greater than our view but is not in the future
+ * (with 500 milliseconds tolerance) from the POV of our
+ * clock. */
+ if (pongtime <= (server.mstime+500) &&
+ pongtime > node->pong_received)
+ {
+ node->pong_received = pongtime;
+ }
+ }
+
/* If we already know this node, but it is not reachable, and
* we see a different address in the gossip section of a node that
* can talk with this other node, update the address, disconnect
@@ -1581,7 +1608,8 @@ int clusterProcessPacket(clusterLink *link) {
uint32_t totlen = ntohl(hdr->totlen);
uint16_t type = ntohs(hdr->type);
- server.cluster->stats_bus_messages_received++;
+ if (type < CLUSTERMSG_TYPE_COUNT)
+ server.cluster->stats_bus_messages_received[type]++;
serverLog(LL_DEBUG,"--- Processing packet of type %d, %lu bytes",
type, (unsigned long) totlen);
@@ -2128,7 +2156,12 @@ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
clusterWriteHandler,link);
link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
- server.cluster->stats_bus_messages_sent++;
+
+ /* Populate sent messages stats. */
+ clusterMsg *hdr = (clusterMsg*) msg;
+ uint16_t type = ntohs(hdr->type);
+ if (type < CLUSTERMSG_TYPE_COUNT)
+ server.cluster->stats_bus_messages_sent[type]++;
}
/* Send a message to all the nodes that are part of the cluster having
@@ -2229,6 +2262,33 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
/* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */
}
+/* Return non zero if the node is already present in the gossip section of the
+ * message pointed by 'hdr' and having 'count' gossip entries. Otherwise
+ * zero is returned. Helper for clusterSendPing(). */
+int clusterNodeIsInGossipSection(clusterMsg *hdr, int count, clusterNode *n) {
+ int j;
+ for (j = 0; j < count; j++) {
+ if (memcmp(hdr->data.ping.gossip[j].nodename,n->name,
+ CLUSTER_NAMELEN) == 0) break;
+ }
+ return j != count;
+}
+
+/* Set the i-th entry of the gossip section in the message pointed by 'hdr'
+ * to the info of the specified node 'n'. */
+void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
+ clusterMsgDataGossip *gossip;
+ gossip = &(hdr->data.ping.gossip[i]);
+ memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
+ gossip->ping_sent = htonl(n->ping_sent/1000);
+ gossip->pong_received = htonl(n->pong_received/1000);
+ memcpy(gossip->ip,n->ip,sizeof(n->ip));
+ gossip->port = htons(n->port);
+ gossip->cport = htons(n->cport);
+ gossip->flags = htons(n->flags);
+ gossip->notused1 = 0;
+}
+
/* Send a PING or PONG packet to the specified node, making sure to add enough
* gossip informations. */
void clusterSendPing(clusterLink *link, int type) {
@@ -2273,11 +2333,15 @@ void clusterSendPing(clusterLink *link, int type) {
if (wanted < 3) wanted = 3;
if (wanted > freshnodes) wanted = freshnodes;
+ /* Include all the nodes in PFAIL state, so that failure reports are
+ * faster to propagate to go from PFAIL to FAIL state. */
+ int pfail_wanted = server.cluster->stats_pfail_nodes;
+
/* Compute the maxium totlen to allocate our buffer. We'll fix the totlen
* later according to the number of gossip sections we really were able
* to put inside the packet. */
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
- totlen += (sizeof(clusterMsgDataGossip)*wanted);
+ totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted));
/* Note: clusterBuildMessageHdr() expects the buffer to be always at least
* sizeof(clusterMsg) or more. */
if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
@@ -2294,17 +2358,13 @@ void clusterSendPing(clusterLink *link, int type) {
while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
dictEntry *de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
- clusterMsgDataGossip *gossip;
- int j;
/* Don't include this node: the whole packet header is about us
* already, so we just gossip about other nodes. */
if (this == myself) continue;
- /* Give a bias to FAIL/PFAIL nodes. */
- if (maxiterations > wanted*2 &&
- !(this->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL)))
- continue;
+ /* PFAIL nodes will be added later. */
+ if (this->flags & CLUSTER_NODE_PFAIL) continue;
/* In the gossip section don't include:
* 1) Nodes in HANDSHAKE state.
@@ -2318,27 +2378,37 @@ void clusterSendPing(clusterLink *link, int type) {
continue;
}
- /* Check if we already added this node */
- for (j = 0; j < gossipcount; j++) {
- if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
- CLUSTER_NAMELEN) == 0) break;
- }
- if (j != gossipcount) continue;
+ /* Do not add a node we already have. */
+ if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;
/* Add it */
+ clusterSetGossipEntry(hdr,gossipcount,this);
freshnodes--;
- gossip = &(hdr->data.ping.gossip[gossipcount]);
- memcpy(gossip->nodename,this->name,CLUSTER_NAMELEN);
- gossip->ping_sent = htonl(this->ping_sent);
- gossip->pong_received = htonl(this->pong_received);
- memcpy(gossip->ip,this->ip,sizeof(this->ip));
- gossip->port = htons(this->port);
- gossip->cport = htons(this->cport);
- gossip->flags = htons(this->flags);
- gossip->notused1 = 0;
gossipcount++;
}
+ /* If there are PFAIL nodes, add them at the end. */
+ if (pfail_wanted) {
+ dictIterator *di;
+ dictEntry *de;
+
+ di = dictGetSafeIterator(server.cluster->nodes);
+ while((de = dictNext(di)) != NULL && pfail_wanted > 0) {
+ clusterNode *node = dictGetVal(de);
+ if (node->flags & CLUSTER_NODE_HANDSHAKE) continue;
+ if (node->flags & CLUSTER_NODE_NOADDR) continue;
+ if (!(node->flags & CLUSTER_NODE_PFAIL)) continue;
+ clusterSetGossipEntry(hdr,gossipcount,node);
+ freshnodes--;
+ gossipcount++;
+ /* We take the count of the slots we allocated, since the
+ * PFAIL stats may not match perfectly with the current number
+ * of PFAIL nodes. */
+ pfail_wanted--;
+ }
+ dictReleaseIterator(di);
+ }
+
/* Ready to send... fix the totlen fiend and queue the message in the
* output buffer. */
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
@@ -3164,13 +3234,21 @@ void clusterCron(void) {
handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;
- /* Check if we have disconnected nodes and re-establish the connection. */
+ /* Check if we have disconnected nodes and re-establish the connection.
+ * Also update a few stats while we are here, that can be used to make
+ * better decisions in other part of the code. */
di = dictGetSafeIterator(server.cluster->nodes);
+ server.cluster->stats_pfail_nodes = 0;
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
+ /* Not interested in reconnecting the link with myself or nodes
+ * for which we have no address. */
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
+ if (node->flags & CLUSTER_NODE_PFAIL)
+ server.cluster->stats_pfail_nodes++;
+
/* A Node in HANDSHAKE state has a limited lifespan equal to the
* configured node timeout. */
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
@@ -3875,6 +3953,21 @@ sds clusterGenNodesDescription(int filter) {
* CLUSTER command
* -------------------------------------------------------------------------- */
+const char *clusterGetMessageTypeString(int type) {
+ switch(type) {
+ case CLUSTERMSG_TYPE_PING: return "ping";
+ case CLUSTERMSG_TYPE_PONG: return "pong";
+ case CLUSTERMSG_TYPE_MEET: return "meet";
+ case CLUSTERMSG_TYPE_FAIL: return "fail";
+ case CLUSTERMSG_TYPE_PUBLISH: return "publish";
+ case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req";
+ case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
+ case CLUSTERMSG_TYPE_UPDATE: return "update";
+ case CLUSTERMSG_TYPE_MFSTART: return "mfstart";
+ }
+ return "unknown";
+}
+
int getSlotOrReply(client *c, robj *o) {
long long slot;
@@ -4206,8 +4299,6 @@ void clusterCommand(client *c) {
"cluster_size:%d\r\n"
"cluster_current_epoch:%llu\r\n"
"cluster_my_epoch:%llu\r\n"
- "cluster_stats_messages_sent:%lld\r\n"
- "cluster_stats_messages_received:%lld\r\n"
, statestr[server.cluster->state],
slots_assigned,
slots_ok,
@@ -4216,10 +4307,36 @@ void clusterCommand(client *c) {
dictSize(server.cluster->nodes),
server.cluster->size,
(unsigned long long) server.cluster->currentEpoch,
- (unsigned long long) myepoch,
- server.cluster->stats_bus_messages_sent,
- server.cluster->stats_bus_messages_received
+ (unsigned long long) myepoch
);
+
+ /* Show stats about messages sent and received. */
+ long long tot_msg_sent = 0;
+ long long tot_msg_received = 0;
+
+ for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
+ if (server.cluster->stats_bus_messages_sent[i] == 0) continue;
+ tot_msg_sent += server.cluster->stats_bus_messages_sent[i];
+ info = sdscatprintf(info,
+ "cluster_stats_messages_%s_sent:%lld\r\n",
+ clusterGetMessageTypeString(i),
+ server.cluster->stats_bus_messages_sent[i]);
+ }
+ info = sdscatprintf(info,
+ "cluster_stats_messages_sent:%lld\r\n", tot_msg_sent);
+
+ for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
+ if (server.cluster->stats_bus_messages_received[i] == 0) continue;
+ tot_msg_received += server.cluster->stats_bus_messages_received[i];
+ info = sdscatprintf(info,
+ "cluster_stats_messages_%s_received:%lld\r\n",
+ clusterGetMessageTypeString(i),
+ server.cluster->stats_bus_messages_received[i]);
+ }
+ info = sdscatprintf(info,
+ "cluster_stats_messages_received:%lld\r\n", tot_msg_received);
+
+ /* Produce the reply protocol. */
addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
(unsigned long)sdslen(info)));
addReplySds(c,info);