summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/cluster.c36
-rw-r--r--src/cluster.h1
2 files changed, 27 insertions, 10 deletions
diff --git a/src/cluster.c b/src/cluster.c
index d8f83f08d..78524bd34 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -749,6 +749,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->slaves = NULL;
node->slaveof = NULL;
node->ping_sent = node->pong_received = 0;
+ node->data_received = 0;
node->fail_time = 0;
node->link = NULL;
memset(node->ip,0,sizeof(node->ip));
@@ -1678,6 +1679,7 @@ int clusterProcessPacket(clusterLink *link) {
clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
uint32_t totlen = ntohl(hdr->totlen);
uint16_t type = ntohs(hdr->type);
+ mstime_t now = mstime();
if (type < CLUSTERMSG_TYPE_COUNT)
server.cluster->stats_bus_messages_received[type]++;
@@ -1741,6 +1743,13 @@ int clusterProcessPacket(clusterLink *link) {
/* Check if the sender is a known node. */
sender = clusterLookupNode(hdr->sender);
+
+ /* Update the last time we saw any data from this node. We
+ * use this in order to avoid detecting a timeout from a node that
+ * is just sending a lot of data in the cluster bus, for instance
+ * because of Pub/Sub. */
+ if (sender) sender->data_received = now;
+
if (sender && !nodeInHandshake(sender)) {
/* Update our curretEpoch if we see a newer epoch in the cluster. */
senderCurrentEpoch = ntohu64(hdr->currentEpoch);
@@ -1755,7 +1764,7 @@ int clusterProcessPacket(clusterLink *link) {
}
/* Update the replication offset info for this node. */
sender->repl_offset = ntohu64(hdr->offset);
- sender->repl_offset_time = mstime();
+ sender->repl_offset_time = now;
/* If we are a slave performing a manual failover and our master
* sent its offset while already paused, populate the MF state. */
if (server.cluster->mf_end &&
@@ -1869,7 +1878,7 @@ int clusterProcessPacket(clusterLink *link) {
* address. */
serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
link->node->name,
- (int)(mstime()-(link->node->ctime)),
+ (int)(now-(link->node->ctime)),
link->node->flags);
link->node->flags |= CLUSTER_NODE_NOADDR;
link->node->ip[0] = '\0';
@@ -1904,7 +1913,7 @@ int clusterProcessPacket(clusterLink *link) {
/* Update our info about the node */
if (link->node && type == CLUSTERMSG_TYPE_PONG) {
- link->node->pong_received = mstime();
+ link->node->pong_received = now;
link->node->ping_sent = 0;
/* The PFAIL condition can be reversed without external
@@ -2051,7 +2060,7 @@ int clusterProcessPacket(clusterLink *link) {
"FAIL message received from %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);
failing->flags |= CLUSTER_NODE_FAIL;
- failing->fail_time = mstime();
+ failing->fail_time = now;
failing->flags &= ~CLUSTER_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
@@ -2104,9 +2113,9 @@ int clusterProcessPacket(clusterLink *link) {
/* Manual failover requested from slaves. Initialize the state
* accordingly. */
resetManualFailover();
- server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
+ server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT;
server.cluster->mf_slave = sender;
- pauseClients(mstime()+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT));
+ pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT));
serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
sender->name);
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
@@ -3529,7 +3538,6 @@ void clusterCron(void) {
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
now = mstime(); /* Use an updated time at every iteration. */
- mstime_t delay;
if (node->flags &
(CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
@@ -3553,7 +3561,7 @@ void clusterCron(void) {
this_slaves = okslaves;
}
- /* If we are waiting for the PONG more than half the cluster
+ /* If we are not receiving any data for more than half the cluster
* timeout, reconnect the link: maybe there is a connection
* issue even if the node is alive. */
if (node->link && /* is connected */
@@ -3562,7 +3570,9 @@ void clusterCron(void) {
node->ping_sent && /* we already sent a ping */
node->pong_received < node->ping_sent && /* still waiting pong */
/* and we are waiting for the pong more than timeout/2 */
- now - node->ping_sent > server.cluster_node_timeout/2)
+ now - node->ping_sent > server.cluster_node_timeout/2 &&
+ /* and in such interval we are not seeing any traffic at all. */
+ now - node->data_received > server.cluster_node_timeout/2)
{
/* Disconnect the link, it will be reconnected automatically. */
freeClusterLink(node->link);
@@ -3597,7 +3607,13 @@ void clusterCron(void) {
/* Compute the delay of the PONG. Note that if we already received
* the PONG, then node->ping_sent is zero, so can't reach this
* code at all. */
- delay = now - node->ping_sent;
+ mstime_t delay = now - node->ping_sent;
+
+ /* We consider every incoming data as proof of liveness, since
+ * our cluster bus link is also used for data: under heavy data
+ * load pong delays are possible. */
+ mstime_t data_delay = now - node->data_received;
+ if (data_delay < delay) delay = data_delay;
if (delay > server.cluster_node_timeout) {
/* Timeout reached. Set the node as possibly failing if it is
diff --git a/src/cluster.h b/src/cluster.h
index 35fc0cbfa..d3af4a355 100644
--- a/src/cluster.h
+++ b/src/cluster.h
@@ -124,6 +124,7 @@ typedef struct clusterNode {
tables. */
mstime_t ping_sent; /* Unix time we sent latest ping */
mstime_t pong_received; /* Unix time we received the pong */
+ mstime_t data_received; /* Unix time we received any data */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t voted_time; /* Last time we voted for a slave of this master */
mstime_t repl_offset_time; /* Unix time we received offset for this node */