summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-05-08 11:38:07 +0200
committerOran Agra <oran@redislabs.com>2020-10-27 08:49:22 +0200
commit97816fd63ef3c40dd0b4f472fb237a0d9e9a7c6f (patch)
tree6d72ddfb35d984b5dcc361b3a63552b0b6d67465
parent3b792f5100124d9cc2102128d6c64f383b0bcd6c (diff)
downloadredis-97816fd63ef3c40dd0b4f472fb237a0d9e9a7c6f.tar.gz
Cluster: introduce data_received field.
We want to send pings and pongs at specific intervals, since our packets also contain information about the configuration of the cluster and are used for gossip. However since our cluster bus is used in a mixed way for data (such as Pub/Sub or modules cluster messages) and metadata, sometimes a very busy channel may delay the reception of pong packets. So after discussing it in #7216, this commit introduces a new field that is not exposed in the cluster, is only an internal information about the last time we received any data from a given node: we use this field in order to avoid detecting failures, claiming data reception of new data from the node is a proof of liveness. (cherry picked from commit 960186a71fee12b0e64b109c7b2066d2dae0ffe5)
-rw-r--r--src/cluster.c36
-rw-r--r--src/cluster.h1
2 files changed, 27 insertions, 10 deletions
diff --git a/src/cluster.c b/src/cluster.c
index e8fe58dea..5966d0561 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -721,6 +721,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->slaves = NULL;
node->slaveof = NULL;
node->ping_sent = node->pong_received = 0;
+ node->data_received = 0;
node->fail_time = 0;
node->link = NULL;
memset(node->ip,0,sizeof(node->ip));
@@ -1658,6 +1659,7 @@ int clusterProcessPacket(clusterLink *link) {
clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
uint32_t totlen = ntohl(hdr->totlen);
uint16_t type = ntohs(hdr->type);
+ mstime_t now = mstime();
if (type < CLUSTERMSG_TYPE_COUNT)
server.cluster->stats_bus_messages_received[type]++;
@@ -1721,6 +1723,13 @@ int clusterProcessPacket(clusterLink *link) {
/* Check if the sender is a known node. */
sender = clusterLookupNode(hdr->sender);
+
+ /* Update the last time we saw any data from this node. We
+ * use this in order to avoid detecting a timeout from a node that
+ * is just sending a lot of data in the cluster bus, for instance
+ * because of Pub/Sub. */
+ if (sender) sender->data_received = now;
+
if (sender && !nodeInHandshake(sender)) {
/* Update our curretEpoch if we see a newer epoch in the cluster. */
senderCurrentEpoch = ntohu64(hdr->currentEpoch);
@@ -1735,7 +1744,7 @@ int clusterProcessPacket(clusterLink *link) {
}
/* Update the replication offset info for this node. */
sender->repl_offset = ntohu64(hdr->offset);
- sender->repl_offset_time = mstime();
+ sender->repl_offset_time = now;
/* If we are a slave performing a manual failover and our master
* sent its offset while already paused, populate the MF state. */
if (server.cluster->mf_end &&
@@ -1849,7 +1858,7 @@ int clusterProcessPacket(clusterLink *link) {
* address. */
serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
link->node->name,
- (int)(mstime()-(link->node->ctime)),
+ (int)(now-(link->node->ctime)),
link->node->flags);
link->node->flags |= CLUSTER_NODE_NOADDR;
link->node->ip[0] = '\0';
@@ -1884,7 +1893,7 @@ int clusterProcessPacket(clusterLink *link) {
/* Update our info about the node */
if (link->node && type == CLUSTERMSG_TYPE_PONG) {
- link->node->pong_received = mstime();
+ link->node->pong_received = now;
link->node->ping_sent = 0;
/* The PFAIL condition can be reversed without external
@@ -2031,7 +2040,7 @@ int clusterProcessPacket(clusterLink *link) {
"FAIL message received from %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);
failing->flags |= CLUSTER_NODE_FAIL;
- failing->fail_time = mstime();
+ failing->fail_time = now;
failing->flags &= ~CLUSTER_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
@@ -2084,9 +2093,9 @@ int clusterProcessPacket(clusterLink *link) {
/* Manual failover requested from slaves. Initialize the state
* accordingly. */
resetManualFailover();
- server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
+ server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT;
server.cluster->mf_slave = sender;
- pauseClients(mstime()+(CLUSTER_MF_TIMEOUT*2));
+ pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT));
serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
sender->name);
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
@@ -3494,7 +3503,6 @@ void clusterCron(void) {
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
now = mstime(); /* Use an updated time at every iteration. */
- mstime_t delay;
if (node->flags &
(CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
@@ -3518,7 +3526,7 @@ void clusterCron(void) {
this_slaves = okslaves;
}
- /* If we are waiting for the PONG more than half the cluster
+ /* If we are not receiving any data for more than half the cluster
* timeout, reconnect the link: maybe there is a connection
* issue even if the node is alive. */
if (node->link && /* is connected */
@@ -3527,7 +3535,9 @@ void clusterCron(void) {
node->ping_sent && /* we already sent a ping */
node->pong_received < node->ping_sent && /* still waiting pong */
/* and we are waiting for the pong more than timeout/2 */
- now - node->ping_sent > server.cluster_node_timeout/2)
+ now - node->ping_sent > server.cluster_node_timeout/2 &&
+ /* and in such interval we are not seeing any traffic at all. */
+ now - node->data_received > server.cluster_node_timeout/2)
{
/* Disconnect the link, it will be reconnected automatically. */
freeClusterLink(node->link);
@@ -3562,7 +3572,13 @@ void clusterCron(void) {
/* Compute the delay of the PONG. Note that if we already received
* the PONG, then node->ping_sent is zero, so can't reach this
* code at all. */
- delay = now - node->ping_sent;
+ mstime_t delay = now - node->ping_sent;
+
+ /* We consider every incoming data as proof of liveness, since
+ * our cluster bus link is also used for data: under heavy data
+ * load pong delays are possible. */
+ mstime_t data_delay = now - node->data_received;
+ if (data_delay < delay) delay = data_delay;
if (delay > server.cluster_node_timeout) {
/* Timeout reached. Set the node as possibly failing if it is
diff --git a/src/cluster.h b/src/cluster.h
index 571b9c543..b9c5ad135 100644
--- a/src/cluster.h
+++ b/src/cluster.h
@@ -128,6 +128,7 @@ typedef struct clusterNode {
tables. */
mstime_t ping_sent; /* Unix time we sent latest ping */
mstime_t pong_received; /* Unix time we received the pong */
+ mstime_t data_received; /* Unix time we received any data */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t voted_time; /* Last time we voted for a slave of this master */
mstime_t repl_offset_time; /* Unix time we received offset for this node */