diff options
Diffstat (limited to 'src/cluster.c')
-rw-r--r-- | src/cluster.c | 185 |
1 files changed, 151 insertions, 34 deletions
diff --git a/src/cluster.c b/src/cluster.c index 4d7b0502d..d5ad85fe7 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -423,8 +423,11 @@ void clusterInit(void) { server.cluster->failover_auth_epoch = 0; server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE; server.cluster->lastVoteEpoch = 0; - server.cluster->stats_bus_messages_sent = 0; - server.cluster->stats_bus_messages_received = 0; + for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) { + server.cluster->stats_bus_messages_sent[i] = 0; + server.cluster->stats_bus_messages_received[i] = 0; + } + server.cluster->stats_pfail_nodes = 0; memset(server.cluster->slots,0, sizeof(server.cluster->slots)); clusterCloseAllSlots(); @@ -476,8 +479,10 @@ void clusterInit(void) { } } - /* The slots -> keys map is a sorted set. Init it. */ - server.cluster->slots_to_keys = zslCreate(); + /* The slots -> keys map is a radix tree. Initialize it here. */ + server.cluster->slots_to_keys = raxNew(); + memset(server.cluster->slots_keys_count,0, + sizeof(server.cluster->slots_keys_count)); /* Set myself->port / cport to my listening ports, we'll just need to * discover the IP address via MEET messages. */ @@ -1350,6 +1355,28 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { } } + /* If from our POV the node is up (no failure flags are set), + * we have no pending ping for the node, nor we have failure + * reports for this node, update the last pong time with the + * one we see from the other nodes. */ + if (!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) && + node->ping_sent == 0 && + clusterNodeFailureReportsCount(node) == 0) + { + mstime_t pongtime = ntohl(g->pong_received); + pongtime *= 1000; /* Convert back to milliseconds. */ + + /* Replace the pong time with the received one only if + * it's greater than our view but is not in the future + * (with 500 milliseconds tolerance) from the POV of our + * clock. */ + if (pongtime <= (server.mstime+500) && + pongtime > node->pong_received) + { + node->pong_received = pongtime; + } + } + /* If we already know this node, but it is not reachable, and * we see a different address in the gossip section of a node that * can talk with this other node, update the address, disconnect @@ -1581,7 +1608,8 @@ int clusterProcessPacket(clusterLink *link) { uint32_t totlen = ntohl(hdr->totlen); uint16_t type = ntohs(hdr->type); - server.cluster->stats_bus_messages_received++; + if (type < CLUSTERMSG_TYPE_COUNT) + server.cluster->stats_bus_messages_received[type]++; serverLog(LL_DEBUG,"--- Processing packet of type %d, %lu bytes", type, (unsigned long) totlen); @@ -2128,7 +2156,12 @@ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) { clusterWriteHandler,link); link->sndbuf = sdscatlen(link->sndbuf, msg, msglen); - server.cluster->stats_bus_messages_sent++; + + /* Populate sent messages stats. */ + clusterMsg *hdr = (clusterMsg*) msg; + uint16_t type = ntohs(hdr->type); + if (type < CLUSTERMSG_TYPE_COUNT) + server.cluster->stats_bus_messages_sent[type]++; } /* Send a message to all the nodes that are part of the cluster having @@ -2229,6 +2262,33 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) { /* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */ } +/* Return non zero if the node is already present in the gossip section of the + * message pointed by 'hdr' and having 'count' gossip entries. Otherwise + * zero is returned. Helper for clusterSendPing(). */ +int clusterNodeIsInGossipSection(clusterMsg *hdr, int count, clusterNode *n) { + int j; + for (j = 0; j < count; j++) { + if (memcmp(hdr->data.ping.gossip[j].nodename,n->name, + CLUSTER_NAMELEN) == 0) break; + } + return j != count; +} + +/* Set the i-th entry of the gossip section in the message pointed by 'hdr' + * to the info of the specified node 'n'. */ +void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) { + clusterMsgDataGossip *gossip; + gossip = &(hdr->data.ping.gossip[i]); + memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN); + gossip->ping_sent = htonl(n->ping_sent/1000); + gossip->pong_received = htonl(n->pong_received/1000); + memcpy(gossip->ip,n->ip,sizeof(n->ip)); + gossip->port = htons(n->port); + gossip->cport = htons(n->cport); + gossip->flags = htons(n->flags); + gossip->notused1 = 0; +} + /* Send a PING or PONG packet to the specified node, making sure to add enough * gossip informations. */ void clusterSendPing(clusterLink *link, int type) { @@ -2273,11 +2333,15 @@ void clusterSendPing(clusterLink *link, int type) { if (wanted < 3) wanted = 3; if (wanted > freshnodes) wanted = freshnodes; + /* Include all the nodes in PFAIL state, so that failure reports are + * faster to propagate to go from PFAIL to FAIL state. */ + int pfail_wanted = server.cluster->stats_pfail_nodes; + /* Compute the maxium totlen to allocate our buffer. We'll fix the totlen * later according to the number of gossip sections we really were able * to put inside the packet. */ totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - totlen += (sizeof(clusterMsgDataGossip)*wanted); + totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted)); /* Note: clusterBuildMessageHdr() expects the buffer to be always at least * sizeof(clusterMsg) or more. */ if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg); @@ -2294,17 +2358,13 @@ void clusterSendPing(clusterLink *link, int type) { while(freshnodes > 0 && gossipcount < wanted && maxiterations--) { dictEntry *de = dictGetRandomKey(server.cluster->nodes); clusterNode *this = dictGetVal(de); - clusterMsgDataGossip *gossip; - int j; /* Don't include this node: the whole packet header is about us * already, so we just gossip about other nodes. */ if (this == myself) continue; - /* Give a bias to FAIL/PFAIL nodes. */ - if (maxiterations > wanted*2 && - !(this->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) - continue; + /* PFAIL nodes will be added later. */ + if (this->flags & CLUSTER_NODE_PFAIL) continue; /* In the gossip section don't include: * 1) Nodes in HANDSHAKE state. @@ -2318,27 +2378,37 @@ void clusterSendPing(clusterLink *link, int type) { continue; } - /* Check if we already added this node */ - for (j = 0; j < gossipcount; j++) { - if (memcmp(hdr->data.ping.gossip[j].nodename,this->name, - CLUSTER_NAMELEN) == 0) break; - } - if (j != gossipcount) continue; + /* Do not add a node we already have. */ + if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue; /* Add it */ + clusterSetGossipEntry(hdr,gossipcount,this); freshnodes--; - gossip = &(hdr->data.ping.gossip[gossipcount]); - memcpy(gossip->nodename,this->name,CLUSTER_NAMELEN); - gossip->ping_sent = htonl(this->ping_sent); - gossip->pong_received = htonl(this->pong_received); - memcpy(gossip->ip,this->ip,sizeof(this->ip)); - gossip->port = htons(this->port); - gossip->cport = htons(this->cport); - gossip->flags = htons(this->flags); - gossip->notused1 = 0; gossipcount++; } + /* If there are PFAIL nodes, add them at the end. */ + if (pfail_wanted) { + dictIterator *di; + dictEntry *de; + + di = dictGetSafeIterator(server.cluster->nodes); + while((de = dictNext(di)) != NULL && pfail_wanted > 0) { + clusterNode *node = dictGetVal(de); + if (node->flags & CLUSTER_NODE_HANDSHAKE) continue; + if (node->flags & CLUSTER_NODE_NOADDR) continue; + if (!(node->flags & CLUSTER_NODE_PFAIL)) continue; + clusterSetGossipEntry(hdr,gossipcount,node); + freshnodes--; + gossipcount++; + /* We take the count of the slots we allocated, since the + * PFAIL stats may not match perfectly with the current number + * of PFAIL nodes. */ + pfail_wanted--; + } + dictReleaseIterator(di); + } + /* Ready to send... fix the totlen fiend and queue the message in the * output buffer. */ totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); @@ -3164,13 +3234,21 @@ void clusterCron(void) { handshake_timeout = server.cluster_node_timeout; if (handshake_timeout < 1000) handshake_timeout = 1000; - /* Check if we have disconnected nodes and re-establish the connection. */ + /* Check if we have disconnected nodes and re-establish the connection. + * Also update a few stats while we are here, that can be used to make + * better decisions in other part of the code. */ di = dictGetSafeIterator(server.cluster->nodes); + server.cluster->stats_pfail_nodes = 0; while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); + /* Not interested in reconnecting the link with myself or nodes + * for which we have no address. */ if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue; + if (node->flags & CLUSTER_NODE_PFAIL) + server.cluster->stats_pfail_nodes++; + /* A Node in HANDSHAKE state has a limited lifespan equal to the * configured node timeout. */ if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) { @@ -3875,6 +3953,21 @@ sds clusterGenNodesDescription(int filter) { * CLUSTER command * -------------------------------------------------------------------------- */ +const char *clusterGetMessageTypeString(int type) { + switch(type) { + case CLUSTERMSG_TYPE_PING: return "ping"; + case CLUSTERMSG_TYPE_PONG: return "pong"; + case CLUSTERMSG_TYPE_MEET: return "meet"; + case CLUSTERMSG_TYPE_FAIL: return "fail"; + case CLUSTERMSG_TYPE_PUBLISH: return "publish"; + case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req"; + case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack"; + case CLUSTERMSG_TYPE_UPDATE: return "update"; + case CLUSTERMSG_TYPE_MFSTART: return "mfstart"; + } + return "unknown"; +} + int getSlotOrReply(client *c, robj *o) { long long slot; @@ -4206,8 +4299,6 @@ void clusterCommand(client *c) { "cluster_size:%d\r\n" "cluster_current_epoch:%llu\r\n" "cluster_my_epoch:%llu\r\n" - "cluster_stats_messages_sent:%lld\r\n" - "cluster_stats_messages_received:%lld\r\n" , statestr[server.cluster->state], slots_assigned, slots_ok, @@ -4216,10 +4307,36 @@ void clusterCommand(client *c) { dictSize(server.cluster->nodes), server.cluster->size, (unsigned long long) server.cluster->currentEpoch, - (unsigned long long) myepoch, - server.cluster->stats_bus_messages_sent, - server.cluster->stats_bus_messages_received + (unsigned long long) myepoch ); + + /* Show stats about messages sent and received. */ + long long tot_msg_sent = 0; + long long tot_msg_received = 0; + + for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) { + if (server.cluster->stats_bus_messages_sent[i] == 0) continue; + tot_msg_sent += server.cluster->stats_bus_messages_sent[i]; + info = sdscatprintf(info, + "cluster_stats_messages_%s_sent:%lld\r\n", + clusterGetMessageTypeString(i), + server.cluster->stats_bus_messages_sent[i]); + } + info = sdscatprintf(info, + "cluster_stats_messages_sent:%lld\r\n", tot_msg_sent); + + for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) { + if (server.cluster->stats_bus_messages_received[i] == 0) continue; + tot_msg_received += server.cluster->stats_bus_messages_received[i]; + info = sdscatprintf(info, + "cluster_stats_messages_%s_received:%lld\r\n", + clusterGetMessageTypeString(i), + server.cluster->stats_bus_messages_received[i]); + } + info = sdscatprintf(info, + "cluster_stats_messages_received:%lld\r\n", tot_msg_received); + + /* Produce the reply protocol. */ addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n", (unsigned long)sdslen(info))); addReplySds(c,info); |