summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2015-05-11 13:15:26 +0200
committerantirez <antirez@gmail.com>2015-05-11 13:15:26 +0200
commit1029276c0d46e643a5740120d44a9cce8ba652b9 (patch)
tree36e57472ddd3247d9585166bf5a1761db02d08ed
parent611283f7438009fe690023673c1fac5d9393034d (diff)
downloadredis-1029276c0d46e643a5740120d44a9cce8ba652b9.tar.gz
Sentinel: connection sharing WIP #1
-rw-r--r--src/sentinel.c515
1 files changed, 295 insertions, 220 deletions
diff --git a/src/sentinel.c b/src/sentinel.c
index dfb096d34..c28bf6dd3 100644
--- a/src/sentinel.c
+++ b/src/sentinel.c
@@ -54,19 +54,18 @@ typedef struct sentinelAddr {
#define SRI_MASTER (1<<0)
#define SRI_SLAVE (1<<1)
#define SRI_SENTINEL (1<<2)
-#define SRI_DISCONNECTED (1<<3)
-#define SRI_S_DOWN (1<<4) /* Subjectively down (no quorum). */
-#define SRI_O_DOWN (1<<5) /* Objectively down (confirmed by others). */
-#define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that
+#define SRI_S_DOWN (1<<3) /* Subjectively down (no quorum). */
+#define SRI_O_DOWN (1<<4) /* Objectively down (confirmed by others). */
+#define SRI_MASTER_DOWN (1<<5) /* A Sentinel with this flag set thinks that
its master is down. */
-#define SRI_FAILOVER_IN_PROGRESS (1<<7) /* Failover is in progress for
+#define SRI_FAILOVER_IN_PROGRESS (1<<6) /* Failover is in progress for
this master. */
-#define SRI_PROMOTED (1<<8) /* Slave selected for promotion. */
-#define SRI_RECONF_SENT (1<<9) /* SLAVEOF <newmaster> sent. */
-#define SRI_RECONF_INPROG (1<<10) /* Slave synchronization in progress. */
-#define SRI_RECONF_DONE (1<<11) /* Slave synchronized with new master. */
-#define SRI_FORCE_FAILOVER (1<<12) /* Force failover with master up. */
-#define SRI_SCRIPT_KILL_SENT (1<<13) /* SCRIPT KILL already sent on -BUSY */
+#define SRI_PROMOTED (1<<7) /* Slave selected for promotion. */
+#define SRI_RECONF_SENT (1<<8) /* SLAVEOF <newmaster> sent. */
+#define SRI_RECONF_INPROG (1<<9) /* Slave synchronization in progress. */
+#define SRI_RECONF_DONE (1<<10) /* Slave synchronized with new master. */
+#define SRI_FORCE_FAILOVER (1<<11) /* Force failover with master up. */
+#define SRI_SCRIPT_KILL_SENT (1<<12) /* SCRIPT KILL already sent on -BUSY */
/* Note: times are in milliseconds. */
#define SENTINEL_INFO_PERIOD 10000
@@ -115,15 +114,26 @@ typedef struct sentinelAddr {
#define SENTINEL_SCRIPT_MAX_RETRY 10
#define SENTINEL_SCRIPT_RETRY_DELAY 30000 /* 30 seconds between retries. */
-typedef struct sentinelRedisInstance {
- int flags; /* See SRI_... defines */
- char *name; /* Master name from the point of view of this sentinel. */
- char *runid; /* Run ID of this instance, or unique ID if is a Sentinel.*/
- uint64_t config_epoch; /* Configuration epoch. */
- sentinelAddr *addr; /* Master host. */
+/* The link to a sentinelRedisInstance. When we have the same set of Sentinels
+ * monitoring many masters, we have different instances representing the
+ * same Sentinels, one per master, and we need to share the hiredis connections
+ * among them. Oherwise if 5 Senintels are monitoring 100 masters we create
+ * 500 outgoing connections instead of 5.
+ *
+ * So this structure represents a reference counted link in terms of the two
+ * hiredis connections for commands and Pub/Sub, and the fields needed for
+ * failure detection, since the ping/pong time are now local to the link: if
+ * the link is available, the instance is avaialbe. This way we don't just
+ * have 5 connections instead of 500, we also send 5 pings instead of 500.
+ *
+ * Links are shared only for Sentinels: master and slave instances have
+ * a link with refcount = 1, always. */
+typedef struct instanceLink {
+ int refcount; /* Number of sentinelRedisInstance owners. */
+ int disconnected; /* Non-zero if we need to reconnect cc or pc. */
+ int pending_commands; /* Number of commands sent waiting for a reply. */
redisAsyncContext *cc; /* Hiredis context for commands. */
redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
- int pending_commands; /* Number of commands sent waiting for a reply. */
mstime_t cc_conn_time; /* cc connection time. */
mstime_t pc_conn_time; /* pc connection time. */
mstime_t pc_last_activity; /* Last time we received any message. */
@@ -136,6 +146,15 @@ typedef struct sentinelRedisInstance {
mstime_t last_pong_time; /* Last time the instance replied to ping,
whatever the reply was. That's used to check
if the link is idle and must be reconnected. */
+} instanceLink;
+
+typedef struct sentinelRedisInstance {
+ int flags; /* See SRI_... defines */
+ char *name; /* Master name from the point of view of this sentinel. */
+ char *runid; /* Run ID of this instance, or unique ID if is a Sentinel.*/
+ uint64_t config_epoch; /* Configuration epoch. */
+ sentinelAddr *addr; /* Master host. */
+ instanceLink *link; /* Link to the instance, may be shared for Sentinels. */
mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
we received a hello from this Sentinel
@@ -328,8 +347,7 @@ sentinelRedisInstance *sentinelGetMasterByName(char *name);
char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
int yesnotoi(char *s);
-void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
-void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c);
+void instanceLinkConnectionError(const redisAsyncContext *c);
const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
void sentinelAbortFailover(sentinelRedisInstance *ri);
void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
@@ -889,6 +907,118 @@ void sentinelCallClientReconfScript(sentinelRedisInstance *master, int role, cha
state, from->ip, fromport, to->ip, toport, NULL);
}
+/* =============================== instanceLink ============================= */
+
+/* Create a not yet connected link object. */
+instanceLink *createInstanceLink(void) {
+ instanceLink *link = zmalloc(sizeof(*link));
+
+ link->refcount = 1;
+ link->disconnected = 1;
+ link->pending_commands = 0;
+ link->cc = NULL;
+ link->pc = NULL;
+ link->cc_conn_time = 0;
+ link->pc_conn_time = 0;
+ link->pc_last_activity = 0;
+ /* We set the last_ping_time to "now" even if we actually don't have yet
+ * a connection with the node, nor we sent a ping.
+ * This is useful to detect a timeout in case we'll not be able to connect
+ * with the node at all. */
+ link->last_ping_time = mstime();
+ link->last_avail_time = mstime();
+ link->last_pong_time = mstime();
+ return link;
+}
+
+/* Disconnect an hiredis connection in the context of an instance link. */
+void instanceLinkCloseConnection(instanceLink *link, redisAsyncContext *c) {
+ if (c == NULL) return;
+
+ if (link->cc == c) {
+ link->cc = NULL;
+ link->pending_commands = 0;
+ }
+ if (link->pc == c) link->pc = NULL;
+ c->data = NULL;
+ link->disconnected = 1;
+ redisAsyncFree(c);
+}
+
+/* Decrement the refcount of a link object, if it drops to zero, actually
+ * free it and return NULL. Otherwise don't do anything and return the pointer
+ * to the object.
+ *
+ * If we are not going to free the link and ri is not NULL, we rebind all the
+ * pending requests in link->cc (hiredis connection for commands) to a
+ * callback that will just ignore them. This is useful to avoid processing
+ * replies for an instance that no longer exists. */
+instanceLink *releaseInstanceLink(instanceLink *link, sentinelRedisInstance *ri)
+{
+ redisAssert(link->refcount > 0);
+ link->refcount--;
+ if (link->refcount != 0) {
+ if (ri) {
+ /* TODO: run the callbacks list and rebind. */
+ }
+ return link; /* Other active users. */
+ }
+
+ instanceLinkCloseConnection(link,link->cc);
+ instanceLinkCloseConnection(link,link->pc);
+ zfree(link);
+ return NULL;
+}
+
+/* This function will attempt to share the instance link we already have
+ * for the same Sentinel in the context of a different master, with the
+ * instance we are passing as argument.
+ *
+ * This way multiple Sentinel objects that refer all to the same physical
+ * Sentinel instance but in the context of different masters will use
+ * a single connection, will send a single PING per second for failure
+ * detection and so forth. */
+void tryConnectionSharing(sentinelRedisInstance *ri) {
+ redisAssert(ri->flags & SRI_SENTINEL);
+
+ /* TODO:
+ * 1) Check if there is a match.
+ * 2) Free our current link.
+ * 3) Reference the other link and increment its reference count. */
+ REDIS_NOTUSED(ri);
+}
+
+/* This function is called when an hiredis connection reported an error.
+ * We set it to NULL and mark the link as disconnected so that it will be
+ * reconnected again.
+ *
+ * Note: we don't free the hiredis context as hiredis will do it for us
+ * for async connections. */
+void instanceLinkConnectionError(const redisAsyncContext *c) {
+ instanceLink *link = c->data;
+ int pubsub;
+
+ if (!link) return;
+
+ pubsub = (link->pc == c);
+ if (pubsub)
+ link->pc = NULL;
+ else
+ link->cc = NULL;
+ link->disconnected = 1;
+}
+
+/* Hiredis connection established / disconnected callbacks. We need them
+ * just to cleanup our link state. */
+void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
+ if (status != REDIS_OK) instanceLinkConnectionError(c);
+}
+
+void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
+ REDIS_NOTUSED(status);
+ instanceLinkConnectionError(c);
+}
+
/* ========================== sentinelRedisInstance ========================= */
/* Create a redis instance, the following fields must be populated by the
@@ -911,6 +1041,7 @@ void sentinelCallClientReconfScript(sentinelRedisInstance *master, int role, cha
*
* The function may also fail and return NULL with errno set to EBUSY if
* a master or slave with the same name already exists. */
+
sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
sentinelRedisInstance *ri;
sentinelAddr *addr;
@@ -949,24 +1080,12 @@ sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *
ri = zmalloc(sizeof(*ri));
/* Note that all the instances are started in the disconnected state,
* the event loop will take care of connecting them. */
- ri->flags = flags | SRI_DISCONNECTED;
+ ri->flags = flags;
ri->name = sdsname;
ri->runid = NULL;
ri->config_epoch = 0;
ri->addr = addr;
- ri->cc = NULL;
- ri->pc = NULL;
- ri->pending_commands = 0;
- ri->cc_conn_time = 0;
- ri->pc_conn_time = 0;
- ri->pc_last_activity = 0;
- /* We set the last_ping_time to "now" even if we actually don't have yet
- * a connection with the node, nor we sent a ping.
- * This is useful to detect a timeout in case we'll not be able to connect
- * with the node at all. */
- ri->last_ping_time = mstime();
- ri->last_avail_time = mstime();
- ri->last_pong_time = mstime();
+ ri->link = createInstanceLink();
ri->last_pub_time = mstime();
ri->last_hello_time = mstime();
ri->last_master_down_reply_time = mstime();
@@ -1022,9 +1141,8 @@ void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
dictRelease(ri->sentinels);
dictRelease(ri->slaves);
- /* Release hiredis connections. */
- if (ri->cc) sentinelKillLink(ri,ri->cc);
- if (ri->pc) sentinelKillLink(ri,ri->pc);
+ /* Disconnect the instance. */
+ releaseInstanceLink(ri->link,ri);
/* Free other resources. */
sdsfree(ri->name);
@@ -1183,9 +1301,9 @@ void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
dictRelease(ri->sentinels);
ri->sentinels = dictCreate(&instancesDictType,NULL);
}
- if (ri->cc) sentinelKillLink(ri,ri->cc);
- if (ri->pc) sentinelKillLink(ri,ri->pc);
- ri->flags &= SRI_MASTER|SRI_DISCONNECTED;
+ instanceLinkCloseConnection(ri->link,ri->link->cc);
+ instanceLinkCloseConnection(ri->link,ri->link->pc);
+ ri->flags &= SRI_MASTER;
if (ri->leader) {
sdsfree(ri->leader);
ri->leader = NULL;
@@ -1198,9 +1316,9 @@ void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
sdsfree(ri->slave_master_host);
ri->runid = NULL;
ri->slave_master_host = NULL;
- ri->last_ping_time = mstime();
- ri->last_avail_time = mstime();
- ri->last_pong_time = mstime();
+ ri->link->last_ping_time = mstime();
+ ri->link->last_avail_time = mstime();
+ ri->link->last_pong_time = mstime();
ri->role_reported_time = mstime();
ri->role_reported = SRI_MASTER;
if (flags & SENTINEL_GENERATE_EVENT)
@@ -1445,7 +1563,10 @@ char *sentinelHandleConfiguration(char **argv, int argc) {
{
return "Wrong hostname or port for sentinel.";
}
- if (argc == 5) si->runid = sdsnew(argv[4]);
+ if (argc == 5) {
+ si->runid = sdsnew(argv[4]);
+ tryConnectionSharing(si);
+ }
} else if (!strcasecmp(argv[0],"announce-ip") && argc == 2) {
/* announce-ip <ip-address> */
if (strlen(argv[1]))
@@ -1634,57 +1755,6 @@ werr:
/* ====================== hiredis connection handling ======================= */
-/* Completely disconnect a hiredis link from an instance. */
-void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c) {
- if (ri->cc == c) {
- ri->cc = NULL;
- ri->pending_commands = 0;
- }
- if (ri->pc == c) ri->pc = NULL;
- c->data = NULL;
- ri->flags |= SRI_DISCONNECTED;
- redisAsyncFree(c);
-}
-
-/* This function takes a hiredis context that is in an error condition
- * and make sure to mark the instance as disconnected performing the
- * cleanup needed.
- *
- * Note: we don't free the hiredis context as hiredis will do it for us
- * for async connections. */
-void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) {
- sentinelRedisInstance *ri = c->data;
- int pubsub;
-
- if (ri == NULL) return; /* The instance no longer exists. */
-
- pubsub = (ri->pc == c);
- sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri,
- "%@ #%s", c->errstr);
- if (pubsub)
- ri->pc = NULL;
- else
- ri->cc = NULL;
- ri->flags |= SRI_DISCONNECTED;
-}
-
-void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
- if (status != REDIS_OK) {
- sentinelDisconnectInstanceFromContext(c);
- } else {
- sentinelRedisInstance *ri = c->data;
- int pubsub = (ri->pc == c);
-
- sentinelEvent(REDIS_DEBUG, pubsub ? "+pubsub-link" : "+cmd-link", ri,
- "%@");
- }
-}
-
-void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
- REDIS_NOTUSED(status);
- sentinelDisconnectInstanceFromContext(c);
-}
-
/* Send the AUTH command with the specified master password if needed.
* Note that for slaves the password set for the master is used.
*
@@ -1696,8 +1766,8 @@ void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) {
ri->master->auth_pass;
if (auth_pass) {
- if (redisAsyncCommand(c, sentinelDiscardReplyCallback, NULL, "AUTH %s",
- auth_pass) == REDIS_OK) ri->pending_commands++;
+ if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "AUTH %s",
+ auth_pass) == REDIS_OK) ri->link->pending_commands++;
}
}
@@ -1711,76 +1781,77 @@ void sentinelSetClientName(sentinelRedisInstance *ri, redisAsyncContext *c, char
char name[64];
snprintf(name,sizeof(name),"sentinel-%.8s-%s",sentinel.myid,type);
- if (redisAsyncCommand(c, sentinelDiscardReplyCallback, NULL,
+ if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri,
"CLIENT SETNAME %s", name) == REDIS_OK)
{
- ri->pending_commands++;
+ ri->link->pending_commands++;
}
}
-/* Create the async connections for the specified instance if the instance
- * is disconnected. Note that the SRI_DISCONNECTED flag is set even if just
+/* Create the async connections for the instance link if the link
+ * is disconnected. Note that link->disconnected is true even if just
* one of the two links (commands and pub/sub) is missing. */
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
- if (!(ri->flags & SRI_DISCONNECTED)) return;
+ if (ri->link->disconnected == 0) return;
+ instanceLink *link = ri->link;
/* Commands connection. */
- if (ri->cc == NULL) {
- ri->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,REDIS_BIND_ADDR);
- if (ri->cc->err) {
+ if (link->cc == NULL) {
+ link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,REDIS_BIND_ADDR);
+ if (link->cc->err) {
sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
- ri->cc->errstr);
- sentinelKillLink(ri,ri->cc);
+ link->cc->errstr);
+ instanceLinkCloseConnection(link,link->cc);
} else {
- ri->cc_conn_time = mstime();
- ri->cc->data = ri;
- redisAeAttach(server.el,ri->cc);
- redisAsyncSetConnectCallback(ri->cc,
- sentinelLinkEstablishedCallback);
- redisAsyncSetDisconnectCallback(ri->cc,
- sentinelDisconnectCallback);
- sentinelSendAuthIfNeeded(ri,ri->cc);
- sentinelSetClientName(ri,ri->cc,"cmd");
+ link->cc_conn_time = mstime();
+ link->cc->data = link;
+ redisAeAttach(server.el,link->cc);
+ redisAsyncSetConnectCallback(link->cc,
+ sentinelLinkEstablishedCallback);
+ redisAsyncSetDisconnectCallback(link->cc,
+ sentinelDisconnectCallback);
+ sentinelSendAuthIfNeeded(ri,link->cc);
+ sentinelSetClientName(ri,link->cc,"cmd");
/* Send a PING ASAP when reconnecting. */
sentinelSendPing(ri);
}
}
/* Pub / Sub */
- if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && ri->pc == NULL) {
- ri->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,REDIS_BIND_ADDR);
- if (ri->pc->err) {
+ if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
+ link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,REDIS_BIND_ADDR);
+ if (link->pc->err) {
sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
- ri->pc->errstr);
- sentinelKillLink(ri,ri->pc);
+ link->pc->errstr);
+ instanceLinkCloseConnection(link,link->pc);
} else {
int retval;
- ri->pc_conn_time = mstime();
- ri->pc->data = ri;
- redisAeAttach(server.el,ri->pc);
- redisAsyncSetConnectCallback(ri->pc,
- sentinelLinkEstablishedCallback);
- redisAsyncSetDisconnectCallback(ri->pc,
- sentinelDisconnectCallback);
- sentinelSendAuthIfNeeded(ri,ri->pc);
- sentinelSetClientName(ri,ri->pc,"pubsub");
+ link->pc_conn_time = mstime();
+ link->pc->data = link;
+ redisAeAttach(server.el,link->pc);
+ redisAsyncSetConnectCallback(link->pc,
+ sentinelLinkEstablishedCallback);
+ redisAsyncSetDisconnectCallback(link->pc,
+ sentinelDisconnectCallback);
+ sentinelSendAuthIfNeeded(ri,link->pc);
+ sentinelSetClientName(ri,link->pc,"pubsub");
/* Now we subscribe to the Sentinels "Hello" channel. */
- retval = redisAsyncCommand(ri->pc,
+ retval = redisAsyncCommand(link->pc,
sentinelReceiveHelloMessages, ri, "SUBSCRIBE %s",
SENTINEL_HELLO_CHANNEL);
if (retval != REDIS_OK) {
/* If we can't subscribe, the Pub/Sub connection is useless
* and we can simply disconnect it and try again. */
- sentinelKillLink(ri,ri->pc);
+ instanceLinkCloseConnection(link,link->pc);
return;
}
}
}
- /* Clear the DISCONNECTED flags only if we have both the connections
+ /* Clear the disconnected status only if we have both the connections
* (or just the commands connection if this is a sentinel instance). */
- if (ri->cc && (ri->flags & SRI_SENTINEL || ri->pc))
- ri->flags &= ~SRI_DISCONNECTED;
+ if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
+ link->disconnected = 0;
}
/* ======================== Redis instances pinging ======================== */
@@ -2049,36 +2120,35 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
}
void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
- sentinelRedisInstance *ri = c->data;
+ sentinelRedisInstance *ri = privdata;
+ instanceLink *link = c->data;
redisReply *r;
- REDIS_NOTUSED(privdata);
- if (ri) ri->pending_commands--;
- if (!reply || !ri) return;
+ if (!reply || !link) return;
+ link->pending_commands--;
r = reply;
- if (r->type == REDIS_REPLY_STRING) {
+ if (r->type == REDIS_REPLY_STRING)
sentinelRefreshInstanceInfo(ri,r->str);
- }
}
/* Just discard the reply. We use this when we are not monitoring the return
* value of the command but its effects directly. */
void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
- sentinelRedisInstance *ri = c->data;
+ instanceLink *link = c->data;
REDIS_NOTUSED(reply);
REDIS_NOTUSED(privdata);
- if (ri) ri->pending_commands--;
+ if (link) link->pending_commands--;
}
void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
- sentinelRedisInstance *ri = c->data;
+ sentinelRedisInstance *ri = privdata;
+ instanceLink *link = c->data;
redisReply *r;
- REDIS_NOTUSED(privdata);
- if (ri) ri->pending_commands--;
- if (!reply || !ri) return;
+ if (!reply || !link) return;
+ link->pending_commands--;
r = reply;
if (r->type == REDIS_REPLY_STATUS ||
@@ -2089,8 +2159,8 @@ void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata
strncmp(r->str,"LOADING",7) == 0 ||
strncmp(r->str,"MASTERDOWN",10) == 0)
{
- ri->last_avail_time = mstime();
- ri->last_ping_time = 0; /* Flag the pong as received. */
+ link->last_avail_time = mstime();
+ link->last_ping_time = 0; /* Flag the pong as received. */
} else {
/* Send a SCRIPT KILL command if the instance appears to be
* down because of a busy script. */
@@ -2098,26 +2168,26 @@ void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata
(ri->flags & SRI_S_DOWN) &&
!(ri->flags & SRI_SCRIPT_KILL_SENT))
{
- if (redisAsyncCommand(ri->cc,
- sentinelDiscardReplyCallback, NULL,
+ if (redisAsyncCommand(ri->link->cc,
+ sentinelDiscardReplyCallback, ri,
"SCRIPT KILL") == REDIS_OK)
- ri->pending_commands++;
+ ri->link->pending_commands++;
ri->flags |= SRI_SCRIPT_KILL_SENT;
}
}
}
- ri->last_pong_time = mstime();
+ link->last_pong_time = mstime();
}
/* This is called when we get the reply about the PUBLISH command we send
* to the master to advertise this sentinel. */
void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
- sentinelRedisInstance *ri = c->data;
+ sentinelRedisInstance *ri = privdata;
+ instanceLink *link = c->data;
redisReply *r;
- REDIS_NOTUSED(privdata);
- if (ri) ri->pending_commands--;
- if (!reply || !ri) return;
+ if (!reply || !link) return;
+ link->pending_commands--;
r = reply;
/* Only update pub_time if we actually published our message. Otherwise
@@ -2172,6 +2242,7 @@ void sentinelProcessHelloMessage(char *hello, int hello_len) {
* for Sentinels we don't have a later chance to fill it,
* so do it now. */
si->runid = sdsnew(token[2]);
+ tryConnectionSharing(si);
sentinelFlushConfig();
}
}
@@ -2230,7 +2301,7 @@ void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privd
/* Update the last activity in the pubsub channel. Note that since we
* receive our messages as well this timestamp can be used to detect
* if the link is probably disconnected even if it seems otherwise. */
- ri->pc_last_activity = mstime();
+ ri->link->pc_last_activity = mstime();
/* Sanity check in the reply we expect, so that the code that follows
* can avoid to check for details. */
@@ -2267,14 +2338,14 @@ int sentinelSendHello(sentinelRedisInstance *ri) {
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master;
sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master);
- if (ri->flags & SRI_DISCONNECTED) return REDIS_ERR;
+ if (ri->link->disconnected) return REDIS_ERR;
/* Use the specified announce address if specified, otherwise try to
* obtain our own IP address. */
if (sentinel.announce_ip) {
announce_ip = sentinel.announce_ip;
} else {
- if (anetSockName(ri->cc->c.fd,ip,sizeof(ip),NULL) == -1)
+ if (anetSockName(ri->link->cc->c.fd,ip,sizeof(ip),NULL) == -1)
return REDIS_ERR;
announce_ip = ip;
}
@@ -2290,11 +2361,11 @@ int sentinelSendHello(sentinelRedisInstance *ri) {
/* --- */
master->name,master_addr->ip,master_addr->port,
(unsigned long long) master->config_epoch);
- retval = redisAsyncCommand(ri->cc,
- sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
+ retval = redisAsyncCommand(ri->link->cc,
+ sentinelPublishReplyCallback, ri, "PUBLISH %s %s",
SENTINEL_HELLO_CHANNEL,payload);
if (retval != REDIS_OK) return REDIS_ERR;
- ri->pending_commands++;
+ ri->link->pending_commands++;
return REDIS_OK;
}
@@ -2336,14 +2407,14 @@ int sentinelForceHelloUpdateForMaster(sentinelRedisInstance *master) {
* On error zero is returned, and we can't consider the PING command
* queued in the connection. */
int sentinelSendPing(sentinelRedisInstance *ri) {
- int retval = redisAsyncCommand(ri->cc,
- sentinelPingReplyCallback, NULL, "PING");
+ int retval = redisAsyncCommand(ri->link->cc,
+ sentinelPingReplyCallback, ri, "PING");
if (retval == REDIS_OK) {
- ri->pending_commands++;
+ ri->link->pending_commands++;
/* We update the ping time only if we received the pong for
* the previous ping, otherwise we are technically waiting
* since the first ping that did not received a reply. */
- if (ri->last_ping_time == 0) ri->last_ping_time = mstime();
+ if (ri->link->last_ping_time == 0) ri->link->last_ping_time = mstime();
return 1;
} else {
return 0;
@@ -2359,7 +2430,7 @@ void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
/* Return ASAP if we have already a PING or INFO already pending, or
* in the case the instance is not properly connected. */
- if (ri->flags & SRI_DISCONNECTED) return;
+ if (ri->link->disconnected) return;
/* For INFO, PING, PUBLISH that are not critical commands to send we
* also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
@@ -2367,7 +2438,8 @@ void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
* properly (note that anyway there is a redundant protection about this,
* that is, the link will be disconnected and reconnected if a long
* timeout condition is detected. */
- if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
+ if (ri->link->pending_commands >=
+ SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;
/* If this is a slave of a master in O_DOWN condition we start sending
* it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
@@ -2391,10 +2463,10 @@ void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
(now - ri->info_refresh) > info_period))
{
/* Send INFO to masters and slaves, not sentinels. */
- retval = redisAsyncCommand(ri->cc,
- sentinelInfoReplyCallback, NULL, "INFO");
- if (retval == REDIS_OK) ri->pending_commands++;
- } else if ((now - ri->last_pong_time) > ping_period) {
+ retval = redisAsyncCommand(ri->link->cc,
+ sentinelInfoReplyCallback, ri, "INFO");
+ if (retval == REDIS_OK) ri->link->pending_commands++;
+ } else if ((now - ri->link->last_pong_time) > ping_period) {
/* Send PING to all the three kinds of instances. */
sentinelSendPing(ri);
} else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
@@ -2448,7 +2520,7 @@ void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
- if (ri->flags & SRI_DISCONNECTED) flags = sdscat(flags,"disconnected,");
+ if (ri->link->disconnected) flags = sdscat(flags,"disconnected,");
if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
flags = sdscat(flags,"failover_in_progress,");
@@ -2462,8 +2534,8 @@ void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
sdsfree(flags);
fields++;
- addReplyBulkCString(c,"pending-commands");
- addReplyBulkLongLong(c,ri->pending_commands);
+ addReplyBulkCString(c,"link-pending-commands");
+ addReplyBulkLongLong(c,ri->link->pending_commands);
fields++;
if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
@@ -2474,15 +2546,15 @@ void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
addReplyBulkCString(c,"last-ping-sent");
addReplyBulkLongLong(c,
- ri->last_ping_time ? (mstime() - ri->last_ping_time) : 0);
+ ri->link->last_ping_time ? (mstime() - ri->link->last_ping_time) : 0);
fields++;
addReplyBulkCString(c,"last-ok-ping-reply");
- addReplyBulkLongLong(c,mstime() - ri->last_avail_time);
+ addReplyBulkLongLong(c,mstime() - ri->link->last_avail_time);
fields++;
addReplyBulkCString(c,"last-ping-reply");
- addReplyBulkLongLong(c,mstime() - ri->last_pong_time);
+ addReplyBulkLongLong(c,mstime() - ri->link->last_pong_time);
fields++;
if (ri->flags & SRI_S_DOWN) {
@@ -2676,7 +2748,7 @@ void sentinelCommand(redisClient *c) {
* differnet masters with the same name.
*
* current-epoch is needed in order to understand if we are allowed
- * to vote for a failover leader or not. Each Senitnel can vote just
+ * to vote for a failover leader or not. Each Sentinel can vote just
* one time per epoch.
*
* runid is "*" if we are not seeking for a vote from the Sentinel
@@ -3084,8 +3156,8 @@ void sentinelPublishCommand(redisClient *c) {
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
mstime_t elapsed = 0;
- if (ri->last_ping_time)
- elapsed = mstime() - ri->last_ping_time;
+ if (ri->link->last_ping_time)
+ elapsed = mstime() - ri->link->last_ping_time;
/* Check if we are in need for a reconnection of one of the
* links, because we are detecting low activity.
@@ -3093,15 +3165,16 @@ void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
* 1) Check if the command link seems connected, was connected not less
* than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have a
* pending ping for more than half the timeout. */
- if (ri->cc &&
- (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
- ri->last_ping_time != 0 && /* Ther is a pending ping... */
+ if (ri->link->cc &&
+ (mstime() - ri->link->cc_conn_time) >
+ SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
+ ri->link->last_ping_time != 0 && /* Ther is a pending ping... */
/* The pending ping is delayed, and we did not received
* error replies as well. */
- (mstime() - ri->last_ping_time) > (ri->down_after_period/2) &&
- (mstime() - ri->last_pong_time) > (ri->down_after_period/2))
+ (mstime() - ri->link->last_ping_time) > (ri->down_after_period/2) &&
+ (mstime() - ri->link->last_pong_time) > (ri->down_after_period/2))
{
- sentinelKillLink(ri,ri->cc);
+ instanceLinkCloseConnection(ri->link,ri->link->cc);
}
/* 2) Check if the pubsub link seems connected, was connected not less
@@ -3109,11 +3182,12 @@ void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
* activity in the Pub/Sub channel for more than
* SENTINEL_PUBLISH_PERIOD * 3.
*/
- if (ri->pc &&
- (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
- (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
+ if (ri->link->pc &&
+ (mstime() - ri->link->pc_conn_time) >
+ SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
+ (mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
{
- sentinelKillLink(ri,ri->pc);
+ instanceLinkCloseConnection(ri->link,ri->link->pc);
}
/* Update the SDOWN flag. We believe the instance is SDOWN if:
@@ -3188,11 +3262,11 @@ void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
* sentinelAskMasterStateToOtherSentinels() function for more information. */
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
+ instanceLink *link = c->data;
redisReply *r;
- REDIS_NOTUSED(c);
- if (ri) ri->pending_commands--;
- if (!reply || !ri) return;
+ if (!reply || !link) return;
+ link->pending_commands--;
r = reply;
/* Ignore every error or unexpected reply.
@@ -3253,21 +3327,21 @@ void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int f
* 2) Sentinel is connected.
* 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
if ((master->flags & SRI_S_DOWN) == 0) continue;
- if (ri->flags & SRI_DISCONNECTED) continue;
+ if (ri->link->disconnected) continue;
if (!(flags & SENTINEL_ASK_FORCED) &&
mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
continue;
/* Ask */
ll2string(port,sizeof(port),master->addr->port);
- retval = redisAsyncCommand(ri->cc,
+ retval = redisAsyncCommand(ri->link->cc,
sentinelReceiveIsMasterDownReply, ri,
"SENTINEL is-master-down-by-addr %s %s %llu %s",
master->addr->ip, port,
sentinel.current_epoch,
(master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
sentinel.myid : "*");
- if (retval == REDIS_OK) ri->pending_commands++;
+ if (retval == REDIS_OK) ri->link->pending_commands++;
}
dictReleaseIterator(di);
}
@@ -3433,35 +3507,35 @@ int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) {
*
* Note that we don't check the replies returned by commands, since we
* will observe instead the effects in the next INFO output. */
- retval = redisAsyncCommand(ri->cc,
- sentinelDiscardReplyCallback, NULL, "MULTI");
+ retval = redisAsyncCommand(ri->link->cc,
+ sentinelDiscardReplyCallback, ri, "MULTI");
if (retval == REDIS_ERR) return retval;
- ri->pending_commands++;
+ ri->link->pending_commands++;
- retval = redisAsyncCommand(ri->cc,
- sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s", host, portstr);
+ retval = redisAsyncCommand(ri->link->cc,
+ sentinelDiscardReplyCallback, ri, "SLAVEOF %s %s", host, portstr);
if (retval == REDIS_ERR) return retval;
- ri->pending_commands++;
+ ri->link->pending_commands++;
- retval = redisAsyncCommand(ri->cc,
- sentinelDiscardReplyCallback, NULL, "CONFIG REWRITE");
+ retval = redisAsyncCommand(ri->link->cc,
+ sentinelDiscardReplyCallback, ri, "CONFIG REWRITE");
if (retval == REDIS_ERR) return retval;
- ri->pending_commands++;
+ ri->link->pending_commands++;
/* CLIENT KILL TYPE <type> is only supported starting from Redis 2.8.12,
* however sending it to an instance not understanding this command is not
* an issue because CLIENT is variadic command, so Redis will not
* recognized as a syntax error, and the transaction will not fail (but
* only the unsupported command will fail). */
- retval = redisAsyncCommand(ri->cc,
- sentinelDiscardReplyCallback, NULL, "CLIENT KILL TYPE normal");
+ retval = redisAsyncCommand(ri->link->cc,
+ sentinelDiscardReplyCallback, ri, "CLIENT KILL TYPE normal");
if (retval == REDIS_ERR) return retval;
- ri->pending_commands++;
+ ri->link->pending_commands++;
- retval = redisAsyncCommand(ri->cc,
- sentinelDiscardReplyCallback, NULL, "EXEC");
+ retval = redisAsyncCommand(ri->link->cc,
+ sentinelDiscardReplyCallback, ri, "EXEC");
if (retval == REDIS_ERR) return retval;
- ri->pending_commands++;
+ ri->link->pending_commands++;
return REDIS_OK;
}
@@ -3599,8 +3673,9 @@ sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
sentinelRedisInstance *slave = dictGetVal(de);
mstime_t info_validity_time;
- if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
- if (mstime() - slave->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
+ if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
+ if (slave->link->disconnected) continue;
+ if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
if (slave->slave_priority == 0) continue;
/* If the master is in SDOWN state we get INFO for slaves every second.
@@ -3681,7 +3756,7 @@ void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
/* We can't send the command to the promoted slave if it is now
* disconnected. Retry again and again with this state until the timeout
* is reached, then abort the failover. */
- if (ri->promoted_slave->flags & SRI_DISCONNECTED) {
+ if (ri->link->disconnected) {
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(REDIS_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
@@ -3760,8 +3835,8 @@ void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
sentinelRedisInstance *slave = dictGetVal(de);
int retval;
- if (slave->flags &
- (SRI_RECONF_DONE|SRI_RECONF_SENT|SRI_DISCONNECTED)) continue;
+ if (slave->flags & (SRI_RECONF_DONE|SRI_RECONF_SENT)) continue;
+ if (slave->link->disconnected) continue;
retval = sentinelSendSlaveOf(slave,
master->promoted_slave->addr->ip,
@@ -3816,8 +3891,8 @@ void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
/* Nothing to do for instances that are disconnected or already
* in RECONF_SENT state. */
- if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG))
- continue;
+ if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)) continue;
+ if (slave->link->disconnected) continue;
/* Send SLAVEOF <new master>. */
retval = sentinelSendSlaveOf(slave,