summaryrefslogtreecommitdiff
path: root/src/sentinel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/sentinel.c')
-rw-r--r--src/sentinel.c1427
1 files changed, 956 insertions, 471 deletions
diff --git a/src/sentinel.c b/src/sentinel.c
index 48e1de8dd..6c6a3a0cd 100644
--- a/src/sentinel.c
+++ b/src/sentinel.c
@@ -28,7 +28,7 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
-#include "redis.h"
+#include "server.h"
#include "hiredis.h"
#include "async.h"
@@ -54,19 +54,18 @@ typedef struct sentinelAddr {
#define SRI_MASTER (1<<0)
#define SRI_SLAVE (1<<1)
#define SRI_SENTINEL (1<<2)
-#define SRI_DISCONNECTED (1<<3)
-#define SRI_S_DOWN (1<<4) /* Subjectively down (no quorum). */
-#define SRI_O_DOWN (1<<5) /* Objectively down (confirmed by others). */
-#define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that
+#define SRI_S_DOWN (1<<3) /* Subjectively down (no quorum). */
+#define SRI_O_DOWN (1<<4) /* Objectively down (confirmed by others). */
+#define SRI_MASTER_DOWN (1<<5) /* A Sentinel with this flag set thinks that
its master is down. */
-#define SRI_FAILOVER_IN_PROGRESS (1<<7) /* Failover is in progress for
+#define SRI_FAILOVER_IN_PROGRESS (1<<6) /* Failover is in progress for
this master. */
-#define SRI_PROMOTED (1<<8) /* Slave selected for promotion. */
-#define SRI_RECONF_SENT (1<<9) /* SLAVEOF <newmaster> sent. */
-#define SRI_RECONF_INPROG (1<<10) /* Slave synchronization in progress. */
-#define SRI_RECONF_DONE (1<<11) /* Slave synchronized with new master. */
-#define SRI_FORCE_FAILOVER (1<<12) /* Force failover with master up. */
-#define SRI_SCRIPT_KILL_SENT (1<<13) /* SCRIPT KILL already sent on -BUSY */
+#define SRI_PROMOTED (1<<7) /* Slave selected for promotion. */
+#define SRI_RECONF_SENT (1<<8) /* SLAVEOF <newmaster> sent. */
+#define SRI_RECONF_INPROG (1<<9) /* Slave synchronization in progress. */
+#define SRI_RECONF_DONE (1<<10) /* Slave synchronized with new master. */
+#define SRI_FORCE_FAILOVER (1<<11) /* Force failover with master up. */
+#define SRI_SCRIPT_KILL_SENT (1<<12) /* SCRIPT KILL already sent on -BUSY */
/* Note: times are in milliseconds. */
#define SENTINEL_INFO_PERIOD 10000
@@ -115,27 +114,59 @@ typedef struct sentinelAddr {
#define SENTINEL_SCRIPT_MAX_RETRY 10
#define SENTINEL_SCRIPT_RETRY_DELAY 30000 /* 30 seconds between retries. */
-typedef struct sentinelRedisInstance {
- int flags; /* See SRI_... defines */
- char *name; /* Master name from the point of view of this sentinel. */
- char *runid; /* run ID of this instance. */
- uint64_t config_epoch; /* Configuration epoch. */
- sentinelAddr *addr; /* Master host. */
+/* SENTINEL SIMULATE-FAILURE command flags. */
+#define SENTINEL_SIMFAILURE_NONE 0
+#define SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION (1<<0)
+#define SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION (1<<1)
+
+/* The link to a sentinelRedisInstance. When we have the same set of Sentinels
+ * monitoring many masters, we have different instances representing the
+ * same Sentinels, one per master, and we need to share the hiredis connections
+ * among them. Oherwise if 5 Sentinels are monitoring 100 masters we create
+ * 500 outgoing connections instead of 5.
+ *
+ * So this structure represents a reference counted link in terms of the two
+ * hiredis connections for commands and Pub/Sub, and the fields needed for
+ * failure detection, since the ping/pong time are now local to the link: if
+ * the link is available, the instance is avaialbe. This way we don't just
+ * have 5 connections instead of 500, we also send 5 pings instead of 500.
+ *
+ * Links are shared only for Sentinels: master and slave instances have
+ * a link with refcount = 1, always. */
+typedef struct instanceLink {
+ int refcount; /* Number of sentinelRedisInstance owners. */
+ int disconnected; /* Non-zero if we need to reconnect cc or pc. */
+ int pending_commands; /* Number of commands sent waiting for a reply. */
redisAsyncContext *cc; /* Hiredis context for commands. */
redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
- int pending_commands; /* Number of commands sent waiting for a reply. */
mstime_t cc_conn_time; /* cc connection time. */
mstime_t pc_conn_time; /* pc connection time. */
mstime_t pc_last_activity; /* Last time we received any message. */
mstime_t last_avail_time; /* Last time the instance replied to ping with
a reply we consider valid. */
- mstime_t last_ping_time; /* Last time a pending ping was sent in the
- context of the current command connection
- with the instance. 0 if still not sent or
- if pong already received. */
+ mstime_t act_ping_time; /* Time at which the last pending ping (no pong
+ received after it) was sent. This field is
+ set to 0 when a pong is received, and set again
+ to the current time if the value is 0 and a new
+ ping is sent. */
+ mstime_t last_ping_time; /* Time at which we sent the last ping. This is
+ only used to avoid sending too many pings
+ during failure. Idle time is computed using
+ the act_ping_time field. */
mstime_t last_pong_time; /* Last time the instance replied to ping,
whatever the reply was. That's used to check
if the link is idle and must be reconnected. */
+ mstime_t last_reconn_time; /* Last reconnection attempt performed when
+ the link was down. */
+} instanceLink;
+
+typedef struct sentinelRedisInstance {
+ int flags; /* See SRI_... defines */
+ char *name; /* Master name from the point of view of this sentinel. */
+ char *runid; /* Run ID of this instance, or unique ID if is a Sentinel.*/
+ uint64_t config_epoch; /* Configuration epoch. */
+ sentinelAddr *addr; /* Master host. */
+ instanceLink *link; /* Link to the instance, may be shared for Sentinels. */
mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
we received a hello from this Sentinel
@@ -159,7 +190,7 @@ typedef struct sentinelRedisInstance {
/* Master specific. */
dict *sentinels; /* Other sentinels monitoring the same master. */
dict *slaves; /* Slaves for this master instance. */
- int quorum; /* Number of sentinels that need to agree on failure. */
+ unsigned int quorum;/* Number of sentinels that need to agree on failure. */
int parallel_syncs; /* How many slaves to reconfigure at same time. */
char *auth_pass; /* Password to use for AUTH against master & slaves. */
@@ -190,19 +221,26 @@ typedef struct sentinelRedisInstance {
* are set to NULL no script is executed. */
char *notification_script;
char *client_reconfig_script;
+ sds info; /* cached INFO output */
} sentinelRedisInstance;
/* Main state. */
struct sentinelState {
- uint64_t current_epoch; /* Current epoch. */
+ char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
+ uint64_t current_epoch; /* Current epoch. */
dict *masters; /* Dictionary of master sentinelRedisInstances.
Key is the instance name, value is the
sentinelRedisInstance structure pointer. */
int tilt; /* Are we in TILT mode? */
int running_scripts; /* Number of scripts in execution right now. */
- mstime_t tilt_start_time; /* When TITL started. */
- mstime_t previous_time; /* Last time we ran the time handler. */
- list *scripts_queue; /* Queue of user scripts to execute. */
+ mstime_t tilt_start_time; /* When TITL started. */
+ mstime_t previous_time; /* Last time we ran the time handler. */
+ list *scripts_queue; /* Queue of user scripts to execute. */
+ char *announce_ip; /* IP addr that is gossiped to other sentinels if
+ not NULL. */
+ int announce_port; /* Port that is gossiped to other sentinels if
+ non zero. */
+ unsigned long simfailure_flags; /* Failures simulation. */
} sentinel;
/* A script execution job. */
@@ -293,7 +331,7 @@ static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
/* Nothing should be attached when something is already attached */
if (ac->ev.data != NULL)
- return REDIS_ERR;
+ return C_ERR;
/* Create container for context and r/w events */
e = (redisAeEvents*)zmalloc(sizeof(*e));
@@ -310,7 +348,7 @@ static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
ac->ev.cleanup = redisAeCleanup;
ac->ev.data = e;
- return REDIS_OK;
+ return C_OK;
}
/* ============================= Prototypes ================================= */
@@ -322,8 +360,7 @@ sentinelRedisInstance *sentinelGetMasterByName(char *name);
char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
int yesnotoi(char *s);
-void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
-void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c);
+void instanceLinkConnectionError(const redisAsyncContext *c);
const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
void sentinelAbortFailover(sentinelRedisInstance *ri);
void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
@@ -337,14 +374,17 @@ void sentinelFlushConfig(void);
void sentinelGenerateInitialMonitorEvents(void);
int sentinelSendPing(sentinelRedisInstance *ri);
int sentinelForceHelloUpdateForMaster(sentinelRedisInstance *master);
+sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid);
+void sentinelSimFailureCrash(void);
/* ========================= Dictionary types =============================== */
-unsigned int dictSdsHash(const void *key);
+uint64_t dictSdsHash(const void *key);
int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
void dictInstancesValDestructor (void *privdata, void *obj) {
+ UNUSED(privdata);
releaseSentinelRedisInstance(obj);
}
@@ -376,11 +416,11 @@ dictType leaderVotesDictType = {
/* =========================== Initialization =============================== */
-void sentinelCommand(redisClient *c);
-void sentinelInfoCommand(redisClient *c);
-void sentinelSetCommand(redisClient *c);
-void sentinelPublishCommand(redisClient *c);
-void sentinelRoleCommand(redisClient *c);
+void sentinelCommand(client *c);
+void sentinelInfoCommand(client *c);
+void sentinelSetCommand(client *c);
+void sentinelPublishCommand(client *c);
+void sentinelRoleCommand(client *c);
struct redisCommand sentinelcmds[] = {
{"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
@@ -392,6 +432,7 @@ struct redisCommand sentinelcmds[] = {
{"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},
{"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
{"role",sentinelRoleCommand,1,"l",0,NULL,0,0,0,0,0},
+ {"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0},
{"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0}
};
@@ -403,7 +444,7 @@ void initSentinelConfig(void) {
/* Perform the Sentinel mode initialization. */
void initSentinel(void) {
- int j;
+ unsigned int j;
/* Remove usual Redis commands from the command table, then just add
* the SENTINEL command. */
@@ -413,7 +454,7 @@ void initSentinel(void) {
struct redisCommand *cmd = sentinelcmds+j;
retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
- redisAssert(retval == DICT_OK);
+ serverAssert(retval == DICT_OK);
}
/* Initialize various data structures. */
@@ -424,24 +465,43 @@ void initSentinel(void) {
sentinel.previous_time = mstime();
sentinel.running_scripts = 0;
sentinel.scripts_queue = listCreate();
+ sentinel.announce_ip = NULL;
+ sentinel.announce_port = 0;
+ sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
+ memset(sentinel.myid,0,sizeof(sentinel.myid));
}
/* This function gets called when the server is in Sentinel mode, started,
* loaded the configuration, and is ready for normal operations. */
void sentinelIsRunning(void) {
- redisLog(REDIS_WARNING,"Sentinel runid is %s", server.runid);
+ int j;
if (server.configfile == NULL) {
- redisLog(REDIS_WARNING,
+ serverLog(LL_WARNING,
"Sentinel started without a config file. Exiting...");
exit(1);
} else if (access(server.configfile,W_OK) == -1) {
- redisLog(REDIS_WARNING,
+ serverLog(LL_WARNING,
"Sentinel config file %s is not writable: %s. Exiting...",
server.configfile,strerror(errno));
exit(1);
}
+ /* If this Sentinel has yet no ID set in the configuration file, we
+ * pick a random one and persist the config on disk. From now on this
+ * will be this Sentinel ID across restarts. */
+ for (j = 0; j < CONFIG_RUN_ID_SIZE; j++)
+ if (sentinel.myid[j] != 0) break;
+
+ if (j == CONFIG_RUN_ID_SIZE) {
+ /* Pick ID and presist the config. */
+ getRandomHexChars(sentinel.myid,CONFIG_RUN_ID_SIZE);
+ sentinelFlushConfig();
+ }
+
+ /* Log its ID to make debugging of issues simpler. */
+ serverLog(LL_WARNING,"Sentinel ID is %s", sentinel.myid);
+
/* We want to generate a +monitor event for every configured master
* at startup. */
sentinelGenerateInitialMonitorEvents();
@@ -455,19 +515,19 @@ void sentinelIsRunning(void) {
* EINVAL: Invalid port number.
*/
sentinelAddr *createSentinelAddr(char *hostname, int port) {
- char buf[32];
+ char ip[NET_IP_STR_LEN];
sentinelAddr *sa;
- if (port <= 0 || port > 65535) {
+ if (port < 0 || port > 65535) {
errno = EINVAL;
return NULL;
}
- if (anetResolve(NULL,hostname,buf,sizeof(buf)) == ANET_ERR) {
+ if (anetResolve(NULL,hostname,ip,sizeof(ip)) == ANET_ERR) {
errno = ENOENT;
return NULL;
}
sa = zmalloc(sizeof(*sa));
- sa->ip = sdsnew(buf);
+ sa->ip = sdsnew(ip);
sa->port = port;
return sa;
}
@@ -497,7 +557,7 @@ int sentinelAddrIsEqual(sentinelAddr *a, sentinelAddr *b) {
/* Send an event to log, pub/sub, user notification script.
*
- * 'level' is the log level for logging. Only REDIS_WARNING events will trigger
+ * 'level' is the log level for logging. Only LL_WARNING events will trigger
* the execution of the user notification script.
*
* 'type' is the message type, also used as a pub/sub channel name.
@@ -522,7 +582,7 @@ int sentinelAddrIsEqual(sentinelAddr *a, sentinelAddr *b) {
void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
const char *fmt, ...) {
va_list ap;
- char msg[REDIS_MAX_LOGMSG_LEN];
+ char msg[LOG_MAX_LEN];
robj *channel, *payload;
/* Handle %@ */
@@ -554,10 +614,10 @@ void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
/* Log the message if the log level allows it to be logged. */
if (level >= server.verbosity)
- redisLog(level,"%s %s",type,msg);
+ serverLog(level,"%s %s",type,msg);
/* Publish the message via Pub/Sub if it's not a debugging one. */
- if (level != REDIS_DEBUG) {
+ if (level != LL_DEBUG) {
channel = createStringObject(type,strlen(type));
payload = createStringObject(msg,strlen(msg));
pubsubPublishMessage(channel,payload);
@@ -566,10 +626,10 @@ void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
}
/* Call the notification script if applicable. */
- if (level == REDIS_WARNING && ri != NULL) {
+ if (level == LL_WARNING && ri != NULL) {
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
ri : ri->master;
- if (master->notification_script) {
+ if (master && master->notification_script) {
sentinelScheduleScriptExecution(master->notification_script,
type,msg,NULL);
}
@@ -587,7 +647,7 @@ void sentinelGenerateInitialMonitorEvents(void) {
di = dictGetIterator(sentinel.masters);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
- sentinelEvent(REDIS_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);
+ sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);
}
dictReleaseIterator(di);
}
@@ -645,7 +705,7 @@ void sentinelScheduleScriptExecution(char *path, ...) {
sentinelReleaseScriptJob(sj);
break;
}
- redisAssert(listLength(sentinel.scripts_queue) <=
+ serverAssert(listLength(sentinel.scripts_queue) <=
SENTINEL_SCRIPT_MAX_QUEUE);
}
}
@@ -697,7 +757,7 @@ void sentinelRunPendingScripts(void) {
/* Parent (fork error).
* We report fork errors as signal 99, in order to unify the
* reporting with other kind of errors. */
- sentinelEvent(REDIS_WARNING,"-script-error",NULL,
+ sentinelEvent(LL_WARNING,"-script-error",NULL,
"%s %d %d", sj->argv[0], 99, 0);
sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
sj->pid = 0;
@@ -709,7 +769,7 @@ void sentinelRunPendingScripts(void) {
} else {
sentinel.running_scripts++;
sj->pid = pid;
- sentinelEvent(REDIS_DEBUG,"+script-child",NULL,"%ld",(long)pid);
+ sentinelEvent(LL_DEBUG,"+script-child",NULL,"%ld",(long)pid);
}
}
}
@@ -743,12 +803,12 @@ void sentinelCollectTerminatedScripts(void) {
sentinelScriptJob *sj;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
- sentinelEvent(REDIS_DEBUG,"-script-child",NULL,"%ld %d %d",
+ sentinelEvent(LL_DEBUG,"-script-child",NULL,"%ld %d %d",
(long)pid, exitcode, bysignal);
ln = sentinelGetScriptListNodeByPid(pid);
if (ln == NULL) {
- redisLog(REDIS_WARNING,"wait3() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid);
+ serverLog(LL_WARNING,"wait3() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid);
continue;
}
sj = ln->value;
@@ -767,7 +827,7 @@ void sentinelCollectTerminatedScripts(void) {
/* Otherwise let's remove the script, but log the event if the
* execution did not terminated in the best of the ways. */
if (bysignal || exitcode != 0) {
- sentinelEvent(REDIS_WARNING,"-script-error",NULL,
+ sentinelEvent(LL_WARNING,"-script-error",NULL,
"%s %d %d", sj->argv[0], bysignal, exitcode);
}
listDelNode(sentinel.scripts_queue,ln);
@@ -791,7 +851,7 @@ void sentinelKillTimedoutScripts(void) {
if (sj->flags & SENTINEL_SCRIPT_RUNNING &&
(now - sj->start_time) > SENTINEL_SCRIPT_MAX_RUNTIME)
{
- sentinelEvent(REDIS_WARNING,"-script-timeout",NULL,"%s %ld",
+ sentinelEvent(LL_WARNING,"-script-timeout",NULL,"%s %ld",
sj->argv[0], (long)sj->pid);
kill(sj->pid,SIGKILL);
}
@@ -799,7 +859,7 @@ void sentinelKillTimedoutScripts(void) {
}
/* Implements SENTINEL PENDING-SCRIPTS command. */
-void sentinelPendingScriptsCommand(redisClient *c) {
+void sentinelPendingScriptsCommand(client *c) {
listNode *ln;
listIter li;
@@ -863,6 +923,201 @@ void sentinelCallClientReconfScript(sentinelRedisInstance *master, int role, cha
state, from->ip, fromport, to->ip, toport, NULL);
}
+/* =============================== instanceLink ============================= */
+
+/* Create a not yet connected link object. */
+instanceLink *createInstanceLink(void) {
+ instanceLink *link = zmalloc(sizeof(*link));
+
+ link->refcount = 1;
+ link->disconnected = 1;
+ link->pending_commands = 0;
+ link->cc = NULL;
+ link->pc = NULL;
+ link->cc_conn_time = 0;
+ link->pc_conn_time = 0;
+ link->last_reconn_time = 0;
+ link->pc_last_activity = 0;
+ /* We set the act_ping_time to "now" even if we actually don't have yet
+ * a connection with the node, nor we sent a ping.
+ * This is useful to detect a timeout in case we'll not be able to connect
+ * with the node at all. */
+ link->act_ping_time = mstime();
+ link->last_ping_time = 0;
+ link->last_avail_time = mstime();
+ link->last_pong_time = mstime();
+ return link;
+}
+
+/* Disconnect an hiredis connection in the context of an instance link. */
+void instanceLinkCloseConnection(instanceLink *link, redisAsyncContext *c) {
+ if (c == NULL) return;
+
+ if (link->cc == c) {
+ link->cc = NULL;
+ link->pending_commands = 0;
+ }
+ if (link->pc == c) link->pc = NULL;
+ c->data = NULL;
+ link->disconnected = 1;
+ redisAsyncFree(c);
+}
+
+/* Decrement the refcount of a link object, if it drops to zero, actually
+ * free it and return NULL. Otherwise don't do anything and return the pointer
+ * to the object.
+ *
+ * If we are not going to free the link and ri is not NULL, we rebind all the
+ * pending requests in link->cc (hiredis connection for commands) to a
+ * callback that will just ignore them. This is useful to avoid processing
+ * replies for an instance that no longer exists. */
+instanceLink *releaseInstanceLink(instanceLink *link, sentinelRedisInstance *ri)
+{
+ serverAssert(link->refcount > 0);
+ link->refcount--;
+ if (link->refcount != 0) {
+ if (ri && ri->link->cc) {
+ /* This instance may have pending callbacks in the hiredis async
+ * context, having as 'privdata' the instance that we are going to
+ * free. Let's rewrite the callback list, directly exploiting
+ * hiredis internal data structures, in order to bind them with
+ * a callback that will ignore the reply at all. */
+ redisCallback *cb;
+ redisCallbackList *callbacks = &link->cc->replies;
+
+ cb = callbacks->head;
+ while(cb) {
+ if (cb->privdata == ri) {
+ cb->fn = sentinelDiscardReplyCallback;
+ cb->privdata = NULL; /* Not strictly needed. */
+ }
+ cb = cb->next;
+ }
+ }
+ return link; /* Other active users. */
+ }
+
+ instanceLinkCloseConnection(link,link->cc);
+ instanceLinkCloseConnection(link,link->pc);
+ zfree(link);
+ return NULL;
+}
+
+/* This function will attempt to share the instance link we already have
+ * for the same Sentinel in the context of a different master, with the
+ * instance we are passing as argument.
+ *
+ * This way multiple Sentinel objects that refer all to the same physical
+ * Sentinel instance but in the context of different masters will use
+ * a single connection, will send a single PING per second for failure
+ * detection and so forth.
+ *
+ * Return C_OK if a matching Sentinel was found in the context of a
+ * different master and sharing was performed. Otherwise C_ERR
+ * is returned. */
+int sentinelTryConnectionSharing(sentinelRedisInstance *ri) {
+ serverAssert(ri->flags & SRI_SENTINEL);
+ dictIterator *di;
+ dictEntry *de;
+
+ if (ri->runid == NULL) return C_ERR; /* No way to identify it. */
+ if (ri->link->refcount > 1) return C_ERR; /* Already shared. */
+
+ di = dictGetIterator(sentinel.masters);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *master = dictGetVal(de), *match;
+ /* We want to share with the same physical Sentinel referenced
+ * in other masters, so skip our master. */
+ if (master == ri->master) continue;
+ match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels,
+ NULL,0,ri->runid);
+ if (match == NULL) continue; /* No match. */
+ if (match == ri) continue; /* Should never happen but... safer. */
+
+ /* We identified a matching Sentinel, great! Let's free our link
+ * and use the one of the matching Sentinel. */
+ releaseInstanceLink(ri->link,NULL);
+ ri->link = match->link;
+ match->link->refcount++;
+ return C_OK;
+ }
+ dictReleaseIterator(di);
+ return C_ERR;
+}
+
+/* When we detect a Sentinel to switch address (reporting a different IP/port
+ * pair in Hello messages), let's update all the matching Sentinels in the
+ * context of other masters as well and disconnect the links, so that everybody
+ * will be updated.
+ *
+ * Return the number of updated Sentinel addresses. */
+int sentinelUpdateSentinelAddressInAllMasters(sentinelRedisInstance *ri) {
+ serverAssert(ri->flags & SRI_SENTINEL);
+ dictIterator *di;
+ dictEntry *de;
+ int reconfigured = 0;
+
+ di = dictGetIterator(sentinel.masters);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *master = dictGetVal(de), *match;
+ match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels,
+ NULL,0,ri->runid);
+ /* If there is no match, this master does not know about this
+ * Sentinel, try with the next one. */
+ if (match == NULL) continue;
+
+ /* Disconnect the old links if connected. */
+ if (match->link->cc != NULL)
+ instanceLinkCloseConnection(match->link,match->link->cc);
+ if (match->link->pc != NULL)
+ instanceLinkCloseConnection(match->link,match->link->pc);
+
+ if (match == ri) continue; /* Address already updated for it. */
+
+ /* Update the address of the matching Sentinel by copying the address
+ * of the Sentinel object that received the address update. */
+ releaseSentinelAddr(match->addr);
+ match->addr = dupSentinelAddr(ri->addr);
+ reconfigured++;
+ }
+ dictReleaseIterator(di);
+ if (reconfigured)
+ sentinelEvent(LL_NOTICE,"+sentinel-address-update", ri,
+ "%@ %d additional matching instances", reconfigured);
+ return reconfigured;
+}
+
+/* This function is called when an hiredis connection reported an error.
+ * We set it to NULL and mark the link as disconnected so that it will be
+ * reconnected again.
+ *
+ * Note: we don't free the hiredis context as hiredis will do it for us
+ * for async connections. */
+void instanceLinkConnectionError(const redisAsyncContext *c) {
+ instanceLink *link = c->data;
+ int pubsub;
+
+ if (!link) return;
+
+ pubsub = (link->pc == c);
+ if (pubsub)
+ link->pc = NULL;
+ else
+ link->cc = NULL;
+ link->disconnected = 1;
+}
+
+/* Hiredis connection established / disconnected callbacks. We need them
+ * just to cleanup our link state. */
+void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
+ if (status != C_OK) instanceLinkConnectionError(c);
+}
+
+void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
+ UNUSED(status);
+ instanceLinkConnectionError(c);
+}
+
/* ========================== sentinelRedisInstance ========================= */
/* Create a redis instance, the following fields must be populated by the
@@ -884,25 +1139,25 @@ void sentinelCallClientReconfScript(sentinelRedisInstance *master, int role, cha
* createSentinelAddr() function.
*
* The function may also fail and return NULL with errno set to EBUSY if
- * a master or slave with the same name already exists. */
+ * a master with the same name, a slave with the same address, or a sentinel
+ * with the same ID already exists. */
+
sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
sentinelRedisInstance *ri;
sentinelAddr *addr;
dict *table = NULL;
- char slavename[128], *sdsname;
+ char slavename[NET_PEER_ID_LEN], *sdsname;
- redisAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
- redisAssert((flags & SRI_MASTER) || master != NULL);
+ serverAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
+ serverAssert((flags & SRI_MASTER) || master != NULL);
/* Check address validity. */
addr = createSentinelAddr(hostname,port);
if (addr == NULL) return NULL;
- /* For slaves and sentinel we use ip:port as name. */
- if (flags & (SRI_SLAVE|SRI_SENTINEL)) {
- snprintf(slavename,sizeof(slavename),
- strchr(hostname,':') ? "[%s]:%d" : "%s:%d",
- hostname,port);
+ /* For slaves use ip:port as name. */
+ if (flags & SRI_SLAVE) {
+ anetFormatAddr(slavename, sizeof(slavename), hostname, port);
name = slavename;
}
@@ -915,6 +1170,7 @@ sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *
else if (flags & SRI_SENTINEL) table = master->sentinels;
sdsname = sdsnew(name);
if (dictFind(table,sdsname)) {
+ releaseSentinelAddr(addr);
sdsfree(sdsname);
errno = EBUSY;
return NULL;
@@ -924,24 +1180,12 @@ sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *
ri = zmalloc(sizeof(*ri));
/* Note that all the instances are started in the disconnected state,
* the event loop will take care of connecting them. */
- ri->flags = flags | SRI_DISCONNECTED;
+ ri->flags = flags;
ri->name = sdsname;
ri->runid = NULL;
ri->config_epoch = 0;
ri->addr = addr;
- ri->cc = NULL;
- ri->pc = NULL;
- ri->pending_commands = 0;
- ri->cc_conn_time = 0;
- ri->pc_conn_time = 0;
- ri->pc_last_activity = 0;
- /* We set the last_ping_time to "now" even if we actually don't have yet
- * a connection with the node, nor we sent a ping.
- * This is useful to detect a timeout in case we'll not be able to connect
- * with the node at all. */
- ri->last_ping_time = mstime();
- ri->last_avail_time = mstime();
- ri->last_pong_time = mstime();
+ ri->link = createInstanceLink();
ri->last_pub_time = mstime();
ri->last_hello_time = mstime();
ri->last_master_down_reply_time = mstime();
@@ -976,6 +1220,7 @@ sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *
ri->promoted_slave = NULL;
ri->notification_script = NULL;
ri->client_reconfig_script = NULL;
+ ri->info = NULL;
/* Role */
ri->role_reported = ri->flags & (SRI_MASTER|SRI_SLAVE);
@@ -996,9 +1241,8 @@ void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
dictRelease(ri->sentinels);
dictRelease(ri->slaves);
- /* Release hiredis connections. */
- if (ri->cc) sentinelKillLink(ri,ri->cc);
- if (ri->pc) sentinelKillLink(ri,ri->pc);
+ /* Disconnect the instance. */
+ releaseInstanceLink(ri->link,ri);
/* Free other resources. */
sdsfree(ri->name);
@@ -1008,6 +1252,7 @@ void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
sdsfree(ri->slave_master_host);
sdsfree(ri->leader);
sdsfree(ri->auth_pass);
+ sdsfree(ri->info);
releaseSentinelAddr(ri->addr);
/* Clear state into the master if needed. */
@@ -1023,11 +1268,11 @@ sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
{
sds key;
sentinelRedisInstance *slave;
+ char buf[NET_PEER_ID_LEN];
- redisAssert(ri->flags & SRI_MASTER);
- key = sdscatprintf(sdsempty(),
- strchr(ip,':') ? "[%s]:%d" : "%s:%d",
- ip,port);
+ serverAssert(ri->flags & SRI_MASTER);
+ anetFormatAddr(buf,sizeof(buf),ip,port);
+ key = sdsnew(buf);
slave = dictFetchValue(ri->slaves,key);
sdsfree(key);
return slave;
@@ -1041,35 +1286,29 @@ const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
else return "unknown";
}
-/* This function removes all the instances found in the dictionary of
- * sentinels in the specified 'master', having either:
+/* This function remove the Sentinel with the specified ID from the
+ * specified master.
*
- * 1) The same ip/port as specified.
- * 2) The same runid.
+ * If "runid" is NULL the function returns ASAP.
*
- * "1" and "2" don't need to verify at the same time, just one is enough.
- * If "runid" is NULL it is not checked.
- * Similarly if "ip" is NULL it is not checked.
+ * This function is useful because on Sentinels address switch, we want to
+ * remove our old entry and add a new one for the same ID but with the new
+ * address.
*
- * This function is useful because every time we add a new Sentinel into
- * a master's Sentinels dictionary, we want to be very sure about not
- * having duplicated instances for any reason. This is important because
- * other sentinels are needed to reach ODOWN quorum, and later to get
- * voted for a given configuration epoch in order to perform the failover.
- *
- * The function returns the number of Sentinels removed. */
-int removeMatchingSentinelsFromMaster(sentinelRedisInstance *master, char *ip, int port, char *runid) {
+ * The function returns 1 if the matching Sentinel was removed, otherwise
+ * 0 if there was no Sentinel with this ID. */
+int removeMatchingSentinelFromMaster(sentinelRedisInstance *master, char *runid) {
dictIterator *di;
dictEntry *de;
int removed = 0;
+ if (runid == NULL) return 0;
+
di = dictGetSafeIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
- if ((ri->runid && runid && strcmp(ri->runid,runid) == 0) ||
- (ip && strcmp(ri->addr->ip,ip) == 0 && port == ri->addr->port))
- {
+ if (ri->runid && strcmp(ri->runid,runid) == 0) {
dictDelete(master->sentinels,ri->name);
removed++;
}
@@ -1089,7 +1328,7 @@ sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, c
dictEntry *de;
sentinelRedisInstance *instance = NULL;
- redisAssert(ip || runid); /* User must pass at least one search param. */
+ serverAssert(ip || runid); /* User must pass at least one search param. */
di = dictGetIterator(instances);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
@@ -1148,42 +1387,45 @@ void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
* 1) Remove all slaves.
* 2) Remove all sentinels.
* 3) Remove most of the flags resulting from runtime operations.
- * 4) Reset timers to their default value.
+ * 4) Reset timers to their default value. For example after a reset it will be
+ * possible to failover again the same master ASAP, without waiting the
+ * failover timeout delay.
* 5) In the process of doing this undo the failover if in progress.
* 6) Disconnect the connections with the master (will reconnect automatically).
*/
#define SENTINEL_RESET_NO_SENTINELS (1<<0)
void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
- redisAssert(ri->flags & SRI_MASTER);
+ serverAssert(ri->flags & SRI_MASTER);
dictRelease(ri->slaves);
ri->slaves = dictCreate(&instancesDictType,NULL);
if (!(flags & SENTINEL_RESET_NO_SENTINELS)) {
dictRelease(ri->sentinels);
ri->sentinels = dictCreate(&instancesDictType,NULL);
}
- if (ri->cc) sentinelKillLink(ri,ri->cc);
- if (ri->pc) sentinelKillLink(ri,ri->pc);
- ri->flags &= SRI_MASTER|SRI_DISCONNECTED;
+ instanceLinkCloseConnection(ri->link,ri->link->cc);
+ instanceLinkCloseConnection(ri->link,ri->link->pc);
+ ri->flags &= SRI_MASTER;
if (ri->leader) {
sdsfree(ri->leader);
ri->leader = NULL;
}
ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
ri->failover_state_change_time = 0;
- ri->failover_start_time = 0;
+ ri->failover_start_time = 0; /* We can failover again ASAP. */
ri->promoted_slave = NULL;
sdsfree(ri->runid);
sdsfree(ri->slave_master_host);
ri->runid = NULL;
ri->slave_master_host = NULL;
- ri->last_ping_time = mstime();
- ri->last_avail_time = mstime();
- ri->last_pong_time = mstime();
+ ri->link->act_ping_time = mstime();
+ ri->link->last_ping_time = 0;
+ ri->link->last_avail_time = mstime();
+ ri->link->last_pong_time = mstime();
ri->role_reported_time = mstime();
ri->role_reported = SRI_MASTER;
if (flags & SENTINEL_GENERATE_EVENT)
- sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
+ sentinelEvent(LL_WARNING,"+reset-master",ri,"%@");
}
/* Call sentinelResetMaster() on every master with a name matching the specified
@@ -1213,8 +1455,8 @@ int sentinelResetMastersByPattern(char *pattern, int flags) {
*
* This is used to handle the +switch-master event.
*
- * The function returns REDIS_ERR if the address can't be resolved for some
- * reason. Otherwise REDIS_OK is returned. */
+ * The function returns C_ERR if the address can't be resolved for some
+ * reason. Otherwise C_OK is returned. */
int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
sentinelAddr *oldaddr, *newaddr;
sentinelAddr **slaves = NULL;
@@ -1223,7 +1465,7 @@ int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip,
dictEntry *de;
newaddr = createSentinelAddr(ip,port);
- if (newaddr == NULL) return REDIS_ERR;
+ if (newaddr == NULL) return C_ERR;
/* Make a list of slaves to add back after the reset.
* Don't include the one having the address we are switching to. */
@@ -1261,10 +1503,7 @@ int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip,
slave = createSentinelRedisInstance(NULL,SRI_SLAVE,slaves[j]->ip,
slaves[j]->port, master->quorum, master);
releaseSentinelAddr(slaves[j]);
- if (slave) {
- sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");
- sentinelFlushConfig();
- }
+ if (slave) sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
}
zfree(slaves);
@@ -1272,7 +1511,7 @@ int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip,
* gets the master->addr->ip and master->addr->port as arguments. */
releaseSentinelAddr(oldaddr);
sentinelFlushConfig();
- return REDIS_OK;
+ return C_OK;
}
/* Return non-zero if there was no SDOWN or ODOWN error associated to this
@@ -1322,6 +1561,13 @@ void sentinelPropagateDownAfterPeriod(sentinelRedisInstance *master) {
}
}
+char *sentinelGetInstanceTypeString(sentinelRedisInstance *ri) {
+ if (ri->flags & SRI_MASTER) return "master";
+ else if (ri->flags & SRI_SLAVE) return "slave";
+ else if (ri->flags & SRI_SENTINEL) return "sentinel";
+ else return "unknown";
+}
+
/* ============================ Config handling ============================= */
char *sentinelHandleConfiguration(char **argv, int argc) {
sentinelRedisInstance *ri;
@@ -1385,6 +1631,10 @@ char *sentinelHandleConfiguration(char **argv, int argc) {
unsigned long long current_epoch = strtoull(argv[1],NULL,10);
if (current_epoch > sentinel.current_epoch)
sentinel.current_epoch = current_epoch;
+ } else if (!strcasecmp(argv[0],"myid") && argc == 2) {
+ if (strlen(argv[1]) != CONFIG_RUN_ID_SIZE)
+ return "Malformed Sentinel id in myid option.";
+ memcpy(sentinel.myid,argv[1],CONFIG_RUN_ID_SIZE);
} else if (!strcasecmp(argv[0],"config-epoch") && argc == 3) {
/* config-epoch <name> <epoch> */
ri = sentinelGetMasterByName(argv[1]);
@@ -1415,15 +1665,25 @@ char *sentinelHandleConfiguration(char **argv, int argc) {
(argc == 4 || argc == 5)) {
sentinelRedisInstance *si;
- /* known-sentinel <name> <ip> <port> [runid] */
- ri = sentinelGetMasterByName(argv[1]);
- if (!ri) return "No such master with specified name.";
- if ((si = createSentinelRedisInstance(NULL,SRI_SENTINEL,argv[2],
- atoi(argv[3]), ri->quorum, ri)) == NULL)
- {
- return "Wrong hostname or port for sentinel.";
+ if (argc == 5) { /* Ignore the old form without runid. */
+ /* known-sentinel <name> <ip> <port> [runid] */
+ ri = sentinelGetMasterByName(argv[1]);
+ if (!ri) return "No such master with specified name.";
+ if ((si = createSentinelRedisInstance(argv[4],SRI_SENTINEL,argv[2],
+ atoi(argv[3]), ri->quorum, ri)) == NULL)
+ {
+ return "Wrong hostname or port for sentinel.";
+ }
+ si->runid = sdsnew(argv[4]);
+ sentinelTryConnectionSharing(si);
}
- if (argc == 5) si->runid = sdsnew(argv[4]);
+ } else if (!strcasecmp(argv[0],"announce-ip") && argc == 2) {
+ /* announce-ip <ip-address> */
+ if (strlen(argv[1]))
+ sentinel.announce_ip = sdsnew(argv[1]);
+ } else if (!strcasecmp(argv[0],"announce-port") && argc == 2) {
+ /* announce-port <port> */
+ sentinel.announce_port = atoi(argv[1]);
} else {
return "Unrecognized sentinel configuration statement.";
}
@@ -1440,6 +1700,10 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) {
dictEntry *de;
sds line;
+ /* sentinel unique ID. */
+ line = sdscatprintf(sdsempty(), "sentinel myid %s", sentinel.myid);
+ rewriteConfigRewriteLine(state,"sentinel",line,1);
+
/* For every master emit a "sentinel monitor" config entry. */
di = dictGetIterator(sentinel.masters);
while((de = dictNext(di)) != NULL) {
@@ -1531,7 +1795,7 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) {
slave_addr = master->addr;
line = sdscatprintf(sdsempty(),
"sentinel known-slave %s %s %d",
- master->name, ri->addr->ip, ri->addr->port);
+ master->name, slave_addr->ip, slave_addr->port);
rewriteConfigRewriteLine(state,"sentinel",line,1);
}
dictReleaseIterator(di2);
@@ -1540,11 +1804,10 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) {
di2 = dictGetIterator(master->sentinels);
while((de = dictNext(di2)) != NULL) {
ri = dictGetVal(de);
+ if (ri->runid == NULL) continue;
line = sdscatprintf(sdsempty(),
- "sentinel known-sentinel %s %s %d%s%s",
- master->name, ri->addr->ip, ri->addr->port,
- ri->runid ? " " : "",
- ri->runid ? ri->runid : "");
+ "sentinel known-sentinel %s %s %d %s",
+ master->name, ri->addr->ip, ri->addr->port, ri->runid);
rewriteConfigRewriteLine(state,"sentinel",line,1);
}
dictReleaseIterator(di2);
@@ -1555,6 +1818,20 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) {
"sentinel current-epoch %llu", (unsigned long long) sentinel.current_epoch);
rewriteConfigRewriteLine(state,"sentinel",line,1);
+ /* sentinel announce-ip. */
+ if (sentinel.announce_ip) {
+ line = sdsnew("sentinel announce-ip ");
+ line = sdscatrepr(line, sentinel.announce_ip, sdslen(sentinel.announce_ip));
+ rewriteConfigRewriteLine(state,"sentinel",line,1);
+ }
+
+ /* sentinel announce-port. */
+ if (sentinel.announce_port) {
+ line = sdscatprintf(sdsempty(),"sentinel announce-port %d",
+ sentinel.announce_port);
+ rewriteConfigRewriteLine(state,"sentinel",line,1);
+ }
+
dictReleaseIterator(di);
}
@@ -1570,7 +1847,7 @@ void sentinelFlushConfig(void) {
int saved_hz = server.hz;
int rewrite_status;
- server.hz = REDIS_DEFAULT_HZ;
+ server.hz = CONFIG_DEFAULT_HZ;
rewrite_status = rewriteConfig(server.configfile);
server.hz = saved_hz;
@@ -1582,61 +1859,11 @@ void sentinelFlushConfig(void) {
werr:
if (fd != -1) close(fd);
- redisLog(REDIS_WARNING,"WARNING: Sentinel was not able to save the new configuration on disk!!!: %s", strerror(errno));
+ serverLog(LL_WARNING,"WARNING: Sentinel was not able to save the new configuration on disk!!!: %s", strerror(errno));
}
/* ====================== hiredis connection handling ======================= */
-/* Completely disconnect a hiredis link from an instance. */
-void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c) {
- if (ri->cc == c) {
- ri->cc = NULL;
- ri->pending_commands = 0;
- }
- if (ri->pc == c) ri->pc = NULL;
- c->data = NULL;
- ri->flags |= SRI_DISCONNECTED;
- redisAsyncFree(c);
-}
-
-/* This function takes a hiredis context that is in an error condition
- * and make sure to mark the instance as disconnected performing the
- * cleanup needed.
- *
- * Note: we don't free the hiredis context as hiredis will do it for us
- * for async connections. */
-void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) {
- sentinelRedisInstance *ri = c->data;
- int pubsub;
-
- if (ri == NULL) return; /* The instance no longer exists. */
-
- pubsub = (ri->pc == c);
- sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri,
- "%@ #%s", c->errstr);
- if (pubsub)
- ri->pc = NULL;
- else
- ri->cc = NULL;
- ri->flags |= SRI_DISCONNECTED;
-}
-
-void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
- if (status != REDIS_OK) {
- sentinelDisconnectInstanceFromContext(c);
- } else {
- sentinelRedisInstance *ri = c->data;
- int pubsub = (ri->pc == c);
-
- sentinelEvent(REDIS_DEBUG, pubsub ? "+pubsub-link" : "+cmd-link", ri,
- "%@");
- }
-}
-
-void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
- sentinelDisconnectInstanceFromContext(c);
-}
-
/* Send the AUTH command with the specified master password if needed.
* Note that for slaves the password set for the master is used.
*
@@ -1648,8 +1875,8 @@ void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) {
ri->master->auth_pass;
if (auth_pass) {
- if (redisAsyncCommand(c, sentinelDiscardReplyCallback, NULL, "AUTH %s",
- auth_pass) == REDIS_OK) ri->pending_commands++;
+ if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "AUTH %s",
+ auth_pass) == C_OK) ri->link->pending_commands++;
}
}
@@ -1662,77 +1889,84 @@ void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) {
void sentinelSetClientName(sentinelRedisInstance *ri, redisAsyncContext *c, char *type) {
char name[64];
- snprintf(name,sizeof(name),"sentinel-%.8s-%s",server.runid,type);
- if (redisAsyncCommand(c, sentinelDiscardReplyCallback, NULL,
- "CLIENT SETNAME %s", name) == REDIS_OK)
+ snprintf(name,sizeof(name),"sentinel-%.8s-%s",sentinel.myid,type);
+ if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri,
+ "CLIENT SETNAME %s", name) == C_OK)
{
- ri->pending_commands++;
+ ri->link->pending_commands++;
}
}
-/* Create the async connections for the specified instance if the instance
- * is disconnected. Note that the SRI_DISCONNECTED flag is set even if just
+/* Create the async connections for the instance link if the link
+ * is disconnected. Note that link->disconnected is true even if just
* one of the two links (commands and pub/sub) is missing. */
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
- if (!(ri->flags & SRI_DISCONNECTED)) return;
+ if (ri->link->disconnected == 0) return;
+ if (ri->addr->port == 0) return; /* port == 0 means invalid address. */
+ instanceLink *link = ri->link;
+ mstime_t now = mstime();
+
+ if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return;
+ ri->link->last_reconn_time = now;
/* Commands connection. */
- if (ri->cc == NULL) {
- ri->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,REDIS_BIND_ADDR);
- if (ri->cc->err) {
- sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
- ri->cc->errstr);
- sentinelKillLink(ri,ri->cc);
+ if (link->cc == NULL) {
+ link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
+ if (link->cc->err) {
+ sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
+ link->cc->errstr);
+ instanceLinkCloseConnection(link,link->cc);
} else {
- ri->cc_conn_time = mstime();
- ri->cc->data = ri;
- redisAeAttach(server.el,ri->cc);
- redisAsyncSetConnectCallback(ri->cc,
- sentinelLinkEstablishedCallback);
- redisAsyncSetDisconnectCallback(ri->cc,
- sentinelDisconnectCallback);
- sentinelSendAuthIfNeeded(ri,ri->cc);
- sentinelSetClientName(ri,ri->cc,"cmd");
+ link->pending_commands = 0;
+ link->cc_conn_time = mstime();
+ link->cc->data = link;
+ redisAeAttach(server.el,link->cc);
+ redisAsyncSetConnectCallback(link->cc,
+ sentinelLinkEstablishedCallback);
+ redisAsyncSetDisconnectCallback(link->cc,
+ sentinelDisconnectCallback);
+ sentinelSendAuthIfNeeded(ri,link->cc);
+ sentinelSetClientName(ri,link->cc,"cmd");
/* Send a PING ASAP when reconnecting. */
sentinelSendPing(ri);
}
}
/* Pub / Sub */
- if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && ri->pc == NULL) {
- ri->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,REDIS_BIND_ADDR);
- if (ri->pc->err) {
- sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
- ri->pc->errstr);
- sentinelKillLink(ri,ri->pc);
+ if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
+ link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
+ if (link->pc->err) {
+ sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
+ link->pc->errstr);
+ instanceLinkCloseConnection(link,link->pc);
} else {
int retval;
- ri->pc_conn_time = mstime();
- ri->pc->data = ri;
- redisAeAttach(server.el,ri->pc);
- redisAsyncSetConnectCallback(ri->pc,
- sentinelLinkEstablishedCallback);
- redisAsyncSetDisconnectCallback(ri->pc,
- sentinelDisconnectCallback);
- sentinelSendAuthIfNeeded(ri,ri->pc);
- sentinelSetClientName(ri,ri->pc,"pubsub");
+ link->pc_conn_time = mstime();
+ link->pc->data = link;
+ redisAeAttach(server.el,link->pc);
+ redisAsyncSetConnectCallback(link->pc,
+ sentinelLinkEstablishedCallback);
+ redisAsyncSetDisconnectCallback(link->pc,
+ sentinelDisconnectCallback);
+ sentinelSendAuthIfNeeded(ri,link->pc);
+ sentinelSetClientName(ri,link->pc,"pubsub");
/* Now we subscribe to the Sentinels "Hello" channel. */
- retval = redisAsyncCommand(ri->pc,
- sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
+ retval = redisAsyncCommand(link->pc,
+ sentinelReceiveHelloMessages, ri, "SUBSCRIBE %s",
SENTINEL_HELLO_CHANNEL);
- if (retval != REDIS_OK) {
+ if (retval != C_OK) {
/* If we can't subscribe, the Pub/Sub connection is useless
* and we can simply disconnect it and try again. */
- sentinelKillLink(ri,ri->pc);
+ instanceLinkCloseConnection(link,link->pc);
return;
}
}
}
- /* Clear the DISCONNECTED flags only if we have both the connections
+ /* Clear the disconnected status only if we have both the connections
* (or just the commands connection if this is a sentinel instance). */
- if (ri->cc && (ri->flags & SRI_SENTINEL || ri->pc))
- ri->flags &= ~SRI_DISCONNECTED;
+ if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
+ link->disconnected = 0;
}
/* ======================== Redis instances pinging ======================== */
@@ -1756,6 +1990,10 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
int numlines, j;
int role = 0;
+ /* cache full INFO output for instance */
+ sdsfree(ri->info);
+ ri->info = sdsnew(info);
+
/* The following fields must be reset to a given value in the case they
* are not found at all in the INFO output. */
ri->master_link_down_time = 0;
@@ -1772,7 +2010,7 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
ri->runid = sdsnewlen(l+7,40);
} else {
if (strncmp(ri->runid,l+7,40) != 0) {
- sentinelEvent(REDIS_NOTICE,"+reboot",ri,"%@");
+ sentinelEvent(LL_NOTICE,"+reboot",ri,"%@");
sdsfree(ri->runid);
ri->runid = sdsnewlen(l+7,40);
}
@@ -1813,7 +2051,8 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
atoi(port), ri->quorum, ri)) != NULL)
{
- sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");
+ sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
+ sentinelFlushConfig();
}
}
}
@@ -1882,7 +2121,7 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
if (role == SRI_SLAVE) ri->slave_conf_change_time = mstime();
/* Log the event with +role-change if the new role is coherent or
* with -role-change if there is a mismatch with the current config. */
- sentinelEvent(REDIS_VERBOSE,
+ sentinelEvent(LL_VERBOSE,
((ri->flags & (SRI_MASTER|SRI_SLAVE)) == role) ?
"+role-change" : "-role-change",
ri, "%@ new reported role is %s",
@@ -1919,8 +2158,11 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
ri->master->failover_state_change_time = mstime();
sentinelFlushConfig();
- sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@");
- sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves",
+ sentinelEvent(LL_WARNING,"+promoted-slave",ri,"%@");
+ if (sentinel.simfailure_flags &
+ SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION)
+ sentinelSimFailureCrash();
+ sentinelEvent(LL_WARNING,"+failover-state-reconf-slaves",
ri->master,"%@");
sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
"start",ri->master->addr,ri->addr);
@@ -1939,8 +2181,8 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
int retval = sentinelSendSlaveOf(ri,
ri->master->addr->ip,
ri->master->addr->port);
- if (retval == REDIS_OK)
- sentinelEvent(REDIS_NOTICE,"+convert-to-slave",ri,"%@");
+ if (retval == C_OK)
+ sentinelEvent(LL_NOTICE,"+convert-to-slave",ri,"%@");
}
}
}
@@ -1962,8 +2204,8 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
int retval = sentinelSendSlaveOf(ri,
ri->master->addr->ip,
ri->master->addr->port);
- if (retval == REDIS_OK)
- sentinelEvent(REDIS_NOTICE,"+fix-slave-config",ri,"%@");
+ if (retval == C_OK)
+ sentinelEvent(LL_NOTICE,"+fix-slave-config",ri,"%@");
}
}
@@ -1981,7 +2223,7 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
{
ri->flags &= ~SRI_RECONF_SENT;
ri->flags |= SRI_RECONF_INPROG;
- sentinelEvent(REDIS_NOTICE,"+slave-reconf-inprog",ri,"%@");
+ sentinelEvent(LL_NOTICE,"+slave-reconf-inprog",ri,"%@");
}
/* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
@@ -1990,38 +2232,41 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
{
ri->flags &= ~SRI_RECONF_INPROG;
ri->flags |= SRI_RECONF_DONE;
- sentinelEvent(REDIS_NOTICE,"+slave-reconf-done",ri,"%@");
+ sentinelEvent(LL_NOTICE,"+slave-reconf-done",ri,"%@");
}
}
}
void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
- sentinelRedisInstance *ri = c->data;
+ sentinelRedisInstance *ri = privdata;
+ instanceLink *link = c->data;
redisReply *r;
- if (ri) ri->pending_commands--;
- if (!reply || !ri) return;
+ if (!reply || !link) return;
+ link->pending_commands--;
r = reply;
- if (r->type == REDIS_REPLY_STRING) {
+ if (r->type == REDIS_REPLY_STRING)
sentinelRefreshInstanceInfo(ri,r->str);
- }
}
/* Just discard the reply. We use this when we are not monitoring the return
* value of the command but its effects directly. */
void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
- sentinelRedisInstance *ri = c->data;
+ instanceLink *link = c->data;
+ UNUSED(reply);
+ UNUSED(privdata);
- if (ri) ri->pending_commands--;
+ if (link) link->pending_commands--;
}
void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
- sentinelRedisInstance *ri = c->data;
+ sentinelRedisInstance *ri = privdata;
+ instanceLink *link = c->data;
redisReply *r;
- if (ri) ri->pending_commands--;
- if (!reply || !ri) return;
+ if (!reply || !link) return;
+ link->pending_commands--;
r = reply;
if (r->type == REDIS_REPLY_STATUS ||
@@ -2032,8 +2277,8 @@ void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata
strncmp(r->str,"LOADING",7) == 0 ||
strncmp(r->str,"MASTERDOWN",10) == 0)
{
- ri->last_avail_time = mstime();
- ri->last_ping_time = 0; /* Flag the pong as received. */
+ link->last_avail_time = mstime();
+ link->act_ping_time = 0; /* Flag the pong as received. */
} else {
/* Send a SCRIPT KILL command if the instance appears to be
* down because of a busy script. */
@@ -2041,25 +2286,26 @@ void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata
(ri->flags & SRI_S_DOWN) &&
!(ri->flags & SRI_SCRIPT_KILL_SENT))
{
- if (redisAsyncCommand(ri->cc,
- sentinelDiscardReplyCallback, NULL,
- "SCRIPT KILL") == REDIS_OK)
- ri->pending_commands++;
+ if (redisAsyncCommand(ri->link->cc,
+ sentinelDiscardReplyCallback, ri,
+ "SCRIPT KILL") == C_OK)
+ ri->link->pending_commands++;
ri->flags |= SRI_SCRIPT_KILL_SENT;
}
}
}
- ri->last_pong_time = mstime();
+ link->last_pong_time = mstime();
}
/* This is called when we get the reply about the PUBLISH command we send
* to the master to advertise this sentinel. */
void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
- sentinelRedisInstance *ri = c->data;
+ sentinelRedisInstance *ri = privdata;
+ instanceLink *link = c->data;
redisReply *r;
- if (ri) ri->pending_commands--;
- if (!reply || !ri) return;
+ if (!reply || !link) return;
+ link->pending_commands--;
r = reply;
/* Only update pub_time if we actually published our message. Otherwise
@@ -2072,7 +2318,7 @@ void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privd
* or sent directly to this sentinel via the (fake) PUBLISH command of Sentinel.
*
* If the master name specified in the message is not known, the message is
- * discareded. */
+ * discarded. */
void sentinelProcessHelloMessage(char *hello, int hello_len) {
/* Format is composed of 8 tokens:
* 0=ip,1=port,2=runid,3=current_epoch,4=master_name,
@@ -2097,25 +2343,39 @@ void sentinelProcessHelloMessage(char *hello, int hello_len) {
if (!si) {
/* If not, remove all the sentinels that have the same runid
- * OR the same ip/port, because it's either a restart or a
- * network topology change. */
- removed = removeMatchingSentinelsFromMaster(master,token[0],port,
- token[2]);
+ * because there was an address change, and add the same Sentinel
+ * with the new address back. */
+ removed = removeMatchingSentinelFromMaster(master,token[2]);
if (removed) {
- sentinelEvent(REDIS_NOTICE,"-dup-sentinel",master,
- "%@ #duplicate of %s:%d or %s",
- token[0],port,token[2]);
+ sentinelEvent(LL_NOTICE,"+sentinel-address-switch",master,
+ "%@ ip %s port %d for %s", token[0],port,token[2]);
+ } else {
+ /* Check if there is another Sentinel with the same address this
+ * new one is reporting. What we do if this happens is to set its
+ * port to 0, to signal the address is invalid. We'll update it
+ * later if we get an HELLO message. */
+ sentinelRedisInstance *other =
+ getSentinelRedisInstanceByAddrAndRunID(
+ master->sentinels, token[0],port,NULL);
+ if (other) {
+ sentinelEvent(LL_NOTICE,"+sentinel-invalid-addr",other,"%@");
+ other->addr->port = 0; /* It means: invalid address. */
+ sentinelUpdateSentinelAddressInAllMasters(other);
+ }
}
/* Add the new sentinel. */
- si = createSentinelRedisInstance(NULL,SRI_SENTINEL,
+ si = createSentinelRedisInstance(token[2],SRI_SENTINEL,
token[0],port,master->quorum,master);
+
if (si) {
- sentinelEvent(REDIS_NOTICE,"+sentinel",si,"%@");
+ if (!removed) sentinelEvent(LL_NOTICE,"+sentinel",si,"%@");
/* The runid is NULL after a new instance creation and
* for Sentinels we don't have a later chance to fill it,
* so do it now. */
si->runid = sdsnew(token[2]);
+ sentinelTryConnectionSharing(si);
+ if (removed) sentinelUpdateSentinelAddressInAllMasters(si);
sentinelFlushConfig();
}
}
@@ -2124,20 +2384,20 @@ void sentinelProcessHelloMessage(char *hello, int hello_len) {
if (current_epoch > sentinel.current_epoch) {
sentinel.current_epoch = current_epoch;
sentinelFlushConfig();
- sentinelEvent(REDIS_WARNING,"+new-epoch",master,"%llu",
+ sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
}
/* Update master info if received configuration is newer. */
- if (master->config_epoch < master_config_epoch) {
+ if (si && master->config_epoch < master_config_epoch) {
master->config_epoch = master_config_epoch;
if (master_port != master->addr->port ||
strcmp(master->addr->ip, token[5]))
{
sentinelAddr *old_addr;
- sentinelEvent(REDIS_WARNING,"+config-update-from",si,"%@");
- sentinelEvent(REDIS_WARNING,"+switch-master",
+ sentinelEvent(LL_WARNING,"+config-update-from",si,"%@");
+ sentinelEvent(LL_WARNING,"+switch-master",
master,"%s %s %d %s %d",
master->name,
master->addr->ip, master->addr->port,
@@ -2164,8 +2424,9 @@ cleanup:
/* This is our Pub/Sub callback for the Hello channel. It's useful in order
* to discover other sentinels attached at the same master. */
void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
- sentinelRedisInstance *ri = c->data;
+ sentinelRedisInstance *ri = privdata;
redisReply *r;
+ UNUSED(c);
if (!reply || !ri) return;
r = reply;
@@ -2173,7 +2434,7 @@ void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privd
/* Update the last activity in the pubsub channel. Note that since we
* receive our messages as well this timestamp can be used to detect
* if the link is probably disconnected even if it seems otherwise. */
- ri->pc_last_activity = mstime();
+ ri->link->pc_last_activity = mstime();
/* Sanity check in the reply we expect, so that the code that follows
* can avoid to check for details. */
@@ -2185,7 +2446,7 @@ void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privd
strcmp(r->element[0]->str,"message") != 0) return;
/* We are not interested in meeting ourselves */
- if (strstr(r->element[2]->str,server.runid) != NULL) return;
+ if (strstr(r->element[2]->str,sentinel.myid) != NULL) return;
sentinelProcessHelloMessage(r->element[2]->str, r->element[2]->len);
}
@@ -2199,34 +2460,46 @@ void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privd
* sentinel_ip,sentinel_port,sentinel_runid,current_epoch,
* master_name,master_ip,master_port,master_config_epoch.
*
- * Returns REDIS_OK if the PUBLISH was queued correctly, otherwise
- * REDIS_ERR is returned. */
+ * Returns C_OK if the PUBLISH was queued correctly, otherwise
+ * C_ERR is returned. */
int sentinelSendHello(sentinelRedisInstance *ri) {
- char ip[REDIS_IP_STR_LEN];
- char payload[REDIS_IP_STR_LEN+1024];
+ char ip[NET_IP_STR_LEN];
+ char payload[NET_IP_STR_LEN+1024];
int retval;
+ char *announce_ip;
+ int announce_port;
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master;
sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master);
- /* Try to obtain our own IP address. */
- if (anetSockName(ri->cc->c.fd,ip,sizeof(ip),NULL) == -1) return REDIS_ERR;
- if (ri->flags & SRI_DISCONNECTED) return REDIS_ERR;
+ if (ri->link->disconnected) return C_ERR;
+
+ /* Use the specified announce address if specified, otherwise try to
+ * obtain our own IP address. */
+ if (sentinel.announce_ip) {
+ announce_ip = sentinel.announce_ip;
+ } else {
+ if (anetSockName(ri->link->cc->c.fd,ip,sizeof(ip),NULL) == -1)
+ return C_ERR;
+ announce_ip = ip;
+ }
+ announce_port = sentinel.announce_port ?
+ sentinel.announce_port : server.port;
/* Format and send the Hello message. */
snprintf(payload,sizeof(payload),
"%s,%d,%s,%llu," /* Info about this sentinel. */
"%s,%s,%d,%llu", /* Info about current master. */
- ip, server.port, server.runid,
+ announce_ip, announce_port, sentinel.myid,
(unsigned long long) sentinel.current_epoch,
/* --- */
master->name,master_addr->ip,master_addr->port,
(unsigned long long) master->config_epoch);
- retval = redisAsyncCommand(ri->cc,
- sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
+ retval = redisAsyncCommand(ri->link->cc,
+ sentinelPublishReplyCallback, ri, "PUBLISH %s %s",
SENTINEL_HELLO_CHANNEL,payload);
- if (retval != REDIS_OK) return REDIS_ERR;
- ri->pending_commands++;
- return REDIS_OK;
+ if (retval != C_OK) return C_ERR;
+ ri->link->pending_commands++;
+ return C_OK;
}
/* Reset last_pub_time in all the instances in the specified dictionary
@@ -2253,28 +2526,30 @@ void sentinelForceHelloUpdateDictOfRedisInstances(dict *instances) {
* Sentinel upgrades a configuration it is a good idea to deliever an update
* to the other Sentinels ASAP. */
int sentinelForceHelloUpdateForMaster(sentinelRedisInstance *master) {
- if (!(master->flags & SRI_MASTER)) return REDIS_ERR;
+ if (!(master->flags & SRI_MASTER)) return C_ERR;
if (master->last_pub_time >= (SENTINEL_PUBLISH_PERIOD+1))
master->last_pub_time -= (SENTINEL_PUBLISH_PERIOD+1);
sentinelForceHelloUpdateDictOfRedisInstances(master->sentinels);
sentinelForceHelloUpdateDictOfRedisInstances(master->slaves);
- return REDIS_OK;
+ return C_OK;
}
-/* Send a PING to the specified instance and refresh the last_ping_time
+/* Send a PING to the specified instance and refresh the act_ping_time
* if it is zero (that is, if we received a pong for the previous ping).
*
* On error zero is returned, and we can't consider the PING command
* queued in the connection. */
int sentinelSendPing(sentinelRedisInstance *ri) {
- int retval = redisAsyncCommand(ri->cc,
- sentinelPingReplyCallback, NULL, "PING");
- if (retval == REDIS_OK) {
- ri->pending_commands++;
- /* We update the ping time only if we received the pong for
- * the previous ping, otherwise we are technically waiting
- * since the first ping that did not received a reply. */
- if (ri->last_ping_time == 0) ri->last_ping_time = mstime();
+ int retval = redisAsyncCommand(ri->link->cc,
+ sentinelPingReplyCallback, ri, "PING");
+ if (retval == C_OK) {
+ ri->link->pending_commands++;
+ ri->link->last_ping_time = mstime();
+ /* We update the active ping time only if we received the pong for
+ * the previous ping, otherwise we are technically waiting since the
+ * first ping that did not received a reply. */
+ if (ri->link->act_ping_time == 0)
+ ri->link->act_ping_time = ri->link->last_ping_time;
return 1;
} else {
return 0;
@@ -2290,7 +2565,7 @@ void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
/* Return ASAP if we have already a PING or INFO already pending, or
* in the case the instance is not properly connected. */
- if (ri->flags & SRI_DISCONNECTED) return;
+ if (ri->link->disconnected) return;
/* For INFO, PING, PUBLISH that are not critical commands to send we
* also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
@@ -2298,14 +2573,21 @@ void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
* properly (note that anyway there is a redundant protection about this,
* that is, the link will be disconnected and reconnected if a long
* timeout condition is detected. */
- if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
+ if (ri->link->pending_commands >=
+ SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;
/* If this is a slave of a master in O_DOWN condition we start sending
* it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
* period. In this state we want to closely monitor slaves in case they
- * are turned into masters by another Sentinel, or by the sysadmin. */
+ * are turned into masters by another Sentinel, or by the sysadmin.
+ *
+ * Similarly we monitor the INFO output more often if the slave reports
+ * to be disconnected from the master, so that we can have a fresh
+ * disconnection time figure. */
if ((ri->flags & SRI_SLAVE) &&
- (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
+ ((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
+ (ri->master_link_down_time != 0)))
+ {
info_period = 1000;
} else {
info_period = SENTINEL_INFO_PERIOD;
@@ -2322,10 +2604,11 @@ void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
(now - ri->info_refresh) > info_period))
{
/* Send INFO to masters and slaves, not sentinels. */
- retval = redisAsyncCommand(ri->cc,
- sentinelInfoReplyCallback, NULL, "INFO");
- if (retval == REDIS_OK) ri->pending_commands++;
- } else if ((now - ri->last_pong_time) > ping_period) {
+ retval = redisAsyncCommand(ri->link->cc,
+ sentinelInfoReplyCallback, ri, "INFO");
+ if (retval == C_OK) ri->link->pending_commands++;
+ } else if ((now - ri->link->last_pong_time) > ping_period &&
+ (now - ri->link->last_ping_time) > ping_period/2) {
/* Send PING to all the three kinds of instances. */
sentinelSendPing(ri);
} else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
@@ -2350,7 +2633,7 @@ const char *sentinelFailoverStateStr(int state) {
}
/* Redis instance to Redis protocol representation. */
-void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
+void addReplySentinelRedisInstance(client *c, sentinelRedisInstance *ri) {
char *flags = sdsempty();
void *mbl;
int fields = 0;
@@ -2379,7 +2662,7 @@ void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
- if (ri->flags & SRI_DISCONNECTED) flags = sdscat(flags,"disconnected,");
+ if (ri->link->disconnected) flags = sdscat(flags,"disconnected,");
if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
flags = sdscat(flags,"failover_in_progress,");
@@ -2393,8 +2676,12 @@ void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
sdsfree(flags);
fields++;
- addReplyBulkCString(c,"pending-commands");
- addReplyBulkLongLong(c,ri->pending_commands);
+ addReplyBulkCString(c,"link-pending-commands");
+ addReplyBulkLongLong(c,ri->link->pending_commands);
+ fields++;
+
+ addReplyBulkCString(c,"link-refcount");
+ addReplyBulkLongLong(c,ri->link->refcount);
fields++;
if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
@@ -2405,15 +2692,15 @@ void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
addReplyBulkCString(c,"last-ping-sent");
addReplyBulkLongLong(c,
- ri->last_ping_time ? (mstime() - ri->last_ping_time) : 0);
+ ri->link->act_ping_time ? (mstime() - ri->link->act_ping_time) : 0);
fields++;
addReplyBulkCString(c,"last-ok-ping-reply");
- addReplyBulkLongLong(c,mstime() - ri->last_avail_time);
+ addReplyBulkLongLong(c,mstime() - ri->link->last_avail_time);
fields++;
addReplyBulkCString(c,"last-ping-reply");
- addReplyBulkLongLong(c,mstime() - ri->last_pong_time);
+ addReplyBulkLongLong(c,mstime() - ri->link->last_pong_time);
fields++;
if (ri->flags & SRI_S_DOWN) {
@@ -2537,7 +2824,7 @@ void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
/* Output a number of instances contained inside a dictionary as
* Redis protocol. */
-void addReplyDictOfRedisInstances(redisClient *c, dict *instances) {
+void addReplyDictOfRedisInstances(client *c, dict *instances) {
dictIterator *di;
dictEntry *de;
@@ -2554,12 +2841,12 @@ void addReplyDictOfRedisInstances(redisClient *c, dict *instances) {
/* Lookup the named master into sentinel.masters.
* If the master is not found reply to the client with an error and returns
* NULL. */
-sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c,
+sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(client *c,
robj *name)
{
sentinelRedisInstance *ri;
- ri = dictFetchValue(sentinel.masters,c->argv[2]->ptr);
+ ri = dictFetchValue(sentinel.masters,name->ptr);
if (!ri) {
addReplyError(c,"No such master with that name");
return NULL;
@@ -2567,7 +2854,32 @@ sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c,
return ri;
}
-void sentinelCommand(redisClient *c) {
+#define SENTINEL_ISQR_OK 0
+#define SENTINEL_ISQR_NOQUORUM (1<<0)
+#define SENTINEL_ISQR_NOAUTH (1<<1)
+int sentinelIsQuorumReachable(sentinelRedisInstance *master, int *usableptr) {
+ dictIterator *di;
+ dictEntry *de;
+ int usable = 1; /* Number of usable Sentinels. Init to 1 to count myself. */
+ int result = SENTINEL_ISQR_OK;
+ int voters = dictSize(master->sentinels)+1; /* Known Sentinels + myself. */
+
+ di = dictGetIterator(master->sentinels);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+
+ if (ri->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
+ usable++;
+ }
+ dictReleaseIterator(di);
+
+ if (usable < (int)master->quorum) result |= SENTINEL_ISQR_NOQUORUM;
+ if (usable < voters/2+1) result |= SENTINEL_ISQR_NOAUTH;
+ if (usableptr) *usableptr = usable;
+ return result;
+}
+
+void sentinelCommand(client *c) {
if (!strcasecmp(c->argv[1]->ptr,"masters")) {
/* SENTINEL MASTERS */
if (c->argc != 2) goto numargserr;
@@ -2597,7 +2909,23 @@ void sentinelCommand(redisClient *c) {
return;
addReplyDictOfRedisInstances(c,ri->sentinels);
} else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
- /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid>*/
+ /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid>
+ *
+ * Arguments:
+ *
+ * ip and port are the ip and port of the master we want to be
+ * checked by Sentinel. Note that the command will not check by
+ * name but just by master, in theory different Sentinels may monitor
+ * differnet masters with the same name.
+ *
+ * current-epoch is needed in order to understand if we are allowed
+ * to vote for a failover leader or not. Each Sentinel can vote just
+ * one time per epoch.
+ *
+ * runid is "*" if we are not seeking for a vote from the Sentinel
+ * in order to elect the failover leader. Otherwise it is set to the
+ * runid we want the Sentinel to vote if it did not already voted.
+ */
sentinelRedisInstance *ri;
long long req_epoch;
uint64_t leader_epoch = 0;
@@ -2606,9 +2934,9 @@ void sentinelCommand(redisClient *c) {
int isdown = 0;
if (c->argc != 6) goto numargserr;
- if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK ||
+ if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != C_OK ||
getLongLongFromObjectOrReply(c,c->argv[4],&req_epoch,NULL)
- != REDIS_OK)
+ != C_OK)
return;
ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
c->argv[2]->ptr,port,NULL);
@@ -2668,7 +2996,7 @@ void sentinelCommand(redisClient *c) {
addReplySds(c,sdsnew("-NOGOODSLAVE No suitable slave to promote\r\n"));
return;
}
- redisLog(REDIS_WARNING,"Executing user requested FAILOVER of '%s'",
+ serverLog(LL_WARNING,"Executing user requested FAILOVER of '%s'",
ri->name);
sentinelStartFailover(ri);
ri->flags |= SRI_FORCE_FAILOVER;
@@ -2682,17 +3010,23 @@ void sentinelCommand(redisClient *c) {
/* SENTINEL MONITOR <name> <ip> <port> <quorum> */
sentinelRedisInstance *ri;
long quorum, port;
- char buf[32];
+ char ip[NET_IP_STR_LEN];
if (c->argc != 6) goto numargserr;
if (getLongFromObjectOrReply(c,c->argv[5],&quorum,"Invalid quorum")
- != REDIS_OK) return;
+ != C_OK) return;
if (getLongFromObjectOrReply(c,c->argv[4],&port,"Invalid port")
- != REDIS_OK) return;
+ != C_OK) return;
+
+ if (quorum <= 0) {
+ addReplyError(c, "Quorum must be 1 or greater.");
+ return;
+ }
+
/* Make sure the IP field is actually a valid IP before passing it
* to createSentinelRedisInstance(), otherwise we may trigger a
* DNS lookup at runtime. */
- if (anetResolveIP(NULL,c->argv[3]->ptr,buf,sizeof(buf)) == ANET_ERR) {
+ if (anetResolveIP(NULL,c->argv[3]->ptr,ip,sizeof(ip)) == ANET_ERR) {
addReplyError(c,"Invalid IP address specified");
return;
}
@@ -2714,22 +3048,144 @@ void sentinelCommand(redisClient *c) {
}
} else {
sentinelFlushConfig();
- sentinelEvent(REDIS_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);
+ sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);
addReply(c,shared.ok);
}
+ } else if (!strcasecmp(c->argv[1]->ptr,"flushconfig")) {
+ if (c->argc != 2) goto numargserr;
+ sentinelFlushConfig();
+ addReply(c,shared.ok);
+ return;
} else if (!strcasecmp(c->argv[1]->ptr,"remove")) {
/* SENTINEL REMOVE <name> */
sentinelRedisInstance *ri;
+ if (c->argc != 3) goto numargserr;
if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
== NULL) return;
- sentinelEvent(REDIS_WARNING,"-monitor",ri,"%@");
+ sentinelEvent(LL_WARNING,"-monitor",ri,"%@");
dictDelete(sentinel.masters,c->argv[2]->ptr);
sentinelFlushConfig();
addReply(c,shared.ok);
+ } else if (!strcasecmp(c->argv[1]->ptr,"ckquorum")) {
+ /* SENTINEL CKQUORUM <name> */
+ sentinelRedisInstance *ri;
+ int usable;
+
+ if (c->argc != 3) goto numargserr;
+ if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
+ == NULL) return;
+ int result = sentinelIsQuorumReachable(ri,&usable);
+ if (result == SENTINEL_ISQR_OK) {
+ addReplySds(c, sdscatfmt(sdsempty(),
+ "+OK %i usable Sentinels. Quorum and failover authorization "
+ "can be reached\r\n",usable));
+ } else {
+ sds e = sdscatfmt(sdsempty(),
+ "-NOQUORUM %i usable Sentinels. ",usable);
+ if (result & SENTINEL_ISQR_NOQUORUM)
+ e = sdscat(e,"Not enough available Sentinels to reach the"
+ " specified quorum for this master");
+ if (result & SENTINEL_ISQR_NOAUTH) {
+ if (result & SENTINEL_ISQR_NOQUORUM) e = sdscat(e,". ");
+ e = sdscat(e, "Not enough available Sentinels to reach the"
+ " majority and authorize a failover");
+ }
+ e = sdscat(e,"\r\n");
+ addReplySds(c,e);
+ }
} else if (!strcasecmp(c->argv[1]->ptr,"set")) {
if (c->argc < 3 || c->argc % 2 == 0) goto numargserr;
sentinelSetCommand(c);
+ } else if (!strcasecmp(c->argv[1]->ptr,"info-cache")) {
+ /* SENTINEL INFO-CACHE <name> */
+ if (c->argc < 2) goto numargserr;
+ mstime_t now = mstime();
+
+ /* Create an ad-hoc dictionary type so that we can iterate
+ * a dictionary composed of just the master groups the user
+ * requested. */
+ dictType copy_keeper = instancesDictType;
+ copy_keeper.valDestructor = NULL;
+ dict *masters_local = sentinel.masters;
+ if (c->argc > 2) {
+ masters_local = dictCreate(&copy_keeper, NULL);
+
+ for (int i = 2; i < c->argc; i++) {
+ sentinelRedisInstance *ri;
+ ri = sentinelGetMasterByName(c->argv[i]->ptr);
+ if (!ri) continue; /* ignore non-existing names */
+ dictAdd(masters_local, ri->name, ri);
+ }
+ }
+
+ /* Reply format:
+ * 1.) master name
+ * 2.) 1.) info from master
+ * 2.) info from replica
+ * ...
+ * 3.) other master name
+ * ...
+ */
+ addReplyMultiBulkLen(c,dictSize(masters_local) * 2);
+
+ dictIterator *di;
+ dictEntry *de;
+ di = dictGetIterator(masters_local);
+ while ((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+ addReplyBulkCBuffer(c,ri->name,strlen(ri->name));
+ addReplyMultiBulkLen(c,dictSize(ri->slaves) + 1); /* +1 for self */
+ addReplyMultiBulkLen(c,2);
+ addReplyLongLong(c, now - ri->info_refresh);
+ if (ri->info)
+ addReplyBulkCBuffer(c,ri->info,sdslen(ri->info));
+ else
+ addReply(c,shared.nullbulk);
+
+ dictIterator *sdi;
+ dictEntry *sde;
+ sdi = dictGetIterator(ri->slaves);
+ while ((sde = dictNext(sdi)) != NULL) {
+ sentinelRedisInstance *sri = dictGetVal(sde);
+ addReplyMultiBulkLen(c,2);
+ addReplyLongLong(c, now - sri->info_refresh);
+ if (sri->info)
+ addReplyBulkCBuffer(c,sri->info,sdslen(sri->info));
+ else
+ addReply(c,shared.nullbulk);
+ }
+ dictReleaseIterator(sdi);
+ }
+ dictReleaseIterator(di);
+ if (masters_local != sentinel.masters) dictRelease(masters_local);
+ } else if (!strcasecmp(c->argv[1]->ptr,"simulate-failure")) {
+ /* SENTINEL SIMULATE-FAILURE <flag> <flag> ... <flag> */
+ int j;
+
+ sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
+ for (j = 2; j < c->argc; j++) {
+ if (!strcasecmp(c->argv[j]->ptr,"crash-after-election")) {
+ sentinel.simfailure_flags |=
+ SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION;
+ serverLog(LL_WARNING,"Failure simulation: this Sentinel "
+ "will crash after being successfully elected as failover "
+ "leader");
+ } else if (!strcasecmp(c->argv[j]->ptr,"crash-after-promotion")) {
+ sentinel.simfailure_flags |=
+ SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION;
+ serverLog(LL_WARNING,"Failure simulation: this Sentinel "
+ "will crash after promoting the selected slave to master");
+ } else if (!strcasecmp(c->argv[j]->ptr,"help")) {
+ addReplyMultiBulkLen(c,2);
+ addReplyBulkCString(c,"crash-after-election");
+ addReplyBulkCString(c,"crash-after-promotion");
+ } else {
+ addReplyError(c,"Unknown failure simulation specified");
+ return;
+ }
+ }
+ addReply(c,shared.ok);
} else {
addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
(char*)c->argv[1]->ptr);
@@ -2741,26 +3197,41 @@ numargserr:
(char*)c->argv[1]->ptr);
}
-/* SENTINEL INFO [section] */
-void sentinelInfoCommand(redisClient *c) {
- char *section = c->argc == 2 ? c->argv[1]->ptr : "default";
- sds info = sdsempty();
- int defsections = !strcasecmp(section,"default");
- int sections = 0;
+#define info_section_from_redis(section_name) do { \
+ if (defsections || allsections || !strcasecmp(section,section_name)) { \
+ sds redissection; \
+ if (sections++) info = sdscat(info,"\r\n"); \
+ redissection = genRedisInfoString(section_name); \
+ info = sdscatlen(info,redissection,sdslen(redissection)); \
+ sdsfree(redissection); \
+ } \
+} while(0)
+/* SENTINEL INFO [section] */
+void sentinelInfoCommand(client *c) {
if (c->argc > 2) {
addReply(c,shared.syntaxerr);
return;
}
- if (!strcasecmp(section,"server") || defsections) {
- if (sections++) info = sdscat(info,"\r\n");
- sds serversection = genRedisInfoString("server");
- info = sdscatlen(info,serversection,sdslen(serversection));
- sdsfree(serversection);
+ int defsections = 0, allsections = 0;
+ char *section = c->argc == 2 ? c->argv[1]->ptr : NULL;
+ if (section) {
+ allsections = !strcasecmp(section,"all");
+ defsections = !strcasecmp(section,"default");
+ } else {
+ defsections = 1;
}
- if (!strcasecmp(section,"sentinel") || defsections) {
+ int sections = 0;
+ sds info = sdsempty();
+
+ info_section_from_redis("server");
+ info_section_from_redis("clients");
+ info_section_from_redis("cpu");
+ info_section_from_redis("stats");
+
+ if (defsections || allsections || !strcasecmp(section,"sentinel")) {
dictIterator *di;
dictEntry *de;
int master_id = 0;
@@ -2771,11 +3242,13 @@ void sentinelInfoCommand(redisClient *c) {
"sentinel_masters:%lu\r\n"
"sentinel_tilt:%d\r\n"
"sentinel_running_scripts:%d\r\n"
- "sentinel_scripts_queue_length:%ld\r\n",
+ "sentinel_scripts_queue_length:%ld\r\n"
+ "sentinel_simulate_failure_flags:%lu\r\n",
dictSize(sentinel.masters),
sentinel.tilt,
sentinel.running_scripts,
- listLength(sentinel.scripts_queue));
+ listLength(sentinel.scripts_queue),
+ sentinel.simfailure_flags);
di = dictGetIterator(sentinel.masters);
while((de = dictNext(di)) != NULL) {
@@ -2795,15 +3268,12 @@ void sentinelInfoCommand(redisClient *c) {
dictReleaseIterator(di);
}
- addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
- (unsigned long)sdslen(info)));
- addReplySds(c,info);
- addReply(c,shared.crlf);
+ addReplyBulkSds(c, info);
}
/* Implements Sentinel verison of the ROLE command. The output is
* "sentinel" and the list of currently monitored master names. */
-void sentinelRoleCommand(redisClient *c) {
+void sentinelRoleCommand(client *c) {
dictIterator *di;
dictEntry *de;
@@ -2821,7 +3291,7 @@ void sentinelRoleCommand(redisClient *c) {
}
/* SENTINEL SET <mastername> [<option> <value> ...] */
-void sentinelSetCommand(redisClient *c) {
+void sentinelSetCommand(client *c) {
sentinelRedisInstance *ri;
int j, changes = 0;
char *option, *value;
@@ -2838,20 +3308,20 @@ void sentinelSetCommand(redisClient *c) {
if (!strcasecmp(option,"down-after-milliseconds")) {
/* down-after-millisecodns <milliseconds> */
- if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll <= 0)
+ if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0)
goto badfmt;
ri->down_after_period = ll;
sentinelPropagateDownAfterPeriod(ri);
changes++;
} else if (!strcasecmp(option,"failover-timeout")) {
/* failover-timeout <milliseconds> */
- if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll <= 0)
+ if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0)
goto badfmt;
ri->failover_timeout = ll;
changes++;
} else if (!strcasecmp(option,"parallel-syncs")) {
/* parallel-syncs <milliseconds> */
- if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll <= 0)
+ if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0)
goto badfmt;
ri->parallel_syncs = ll;
changes++;
@@ -2885,7 +3355,7 @@ void sentinelSetCommand(redisClient *c) {
changes++;
} else if (!strcasecmp(option,"quorum")) {
/* quorum <count> */
- if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll <= 0)
+ if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0)
goto badfmt;
ri->quorum = ll;
changes++;
@@ -2895,7 +3365,7 @@ void sentinelSetCommand(redisClient *c) {
if (changes) sentinelFlushConfig();
return;
}
- sentinelEvent(REDIS_WARNING,"+set",ri,"%@ %s %s",option,value);
+ sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s",option,value);
}
if (changes) sentinelFlushConfig();
@@ -2914,7 +3384,7 @@ badfmt: /* Bad format errors */
*
* Because we have a Sentinel PUBLISH, the code to send hello messages is the same
* for all the three kind of instances: masters, slaves, sentinels. */
-void sentinelPublishCommand(redisClient *c) {
+void sentinelPublishCommand(client *c) {
if (strcmp(c->argv[1]->ptr,SENTINEL_HELLO_CHANNEL)) {
addReplyError(c, "Only HELLO messages are accepted by Sentinel instances.");
return;
@@ -2929,8 +3399,10 @@ void sentinelPublishCommand(redisClient *c) {
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
mstime_t elapsed = 0;
- if (ri->last_ping_time)
- elapsed = mstime() - ri->last_ping_time;
+ if (ri->link->act_ping_time)
+ elapsed = mstime() - ri->link->act_ping_time;
+ else if (ri->link->disconnected)
+ elapsed = mstime() - ri->link->last_avail_time;
/* Check if we are in need for a reconnection of one of the
* links, because we are detecting low activity.
@@ -2938,15 +3410,16 @@ void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
* 1) Check if the command link seems connected, was connected not less
* than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have a
* pending ping for more than half the timeout. */
- if (ri->cc &&
- (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
- ri->last_ping_time != 0 && /* Ther is a pending ping... */
+ if (ri->link->cc &&
+ (mstime() - ri->link->cc_conn_time) >
+ SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
+ ri->link->act_ping_time != 0 && /* Ther is a pending ping... */
/* The pending ping is delayed, and we did not received
* error replies as well. */
- (mstime() - ri->last_ping_time) > (ri->down_after_period/2) &&
- (mstime() - ri->last_pong_time) > (ri->down_after_period/2))
+ (mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) &&
+ (mstime() - ri->link->last_pong_time) > (ri->down_after_period/2))
{
- sentinelKillLink(ri,ri->cc);
+ instanceLinkCloseConnection(ri->link,ri->link->cc);
}
/* 2) Check if the pubsub link seems connected, was connected not less
@@ -2954,11 +3427,12 @@ void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
* activity in the Pub/Sub channel for more than
* SENTINEL_PUBLISH_PERIOD * 3.
*/
- if (ri->pc &&
- (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
- (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
+ if (ri->link->pc &&
+ (mstime() - ri->link->pc_conn_time) >
+ SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
+ (mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
{
- sentinelKillLink(ri,ri->pc);
+ instanceLinkCloseConnection(ri->link,ri->link->pc);
}
/* Update the SDOWN flag. We believe the instance is SDOWN if:
@@ -2975,14 +3449,14 @@ void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
{
/* Is subjectively down */
if ((ri->flags & SRI_S_DOWN) == 0) {
- sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
+ sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
ri->s_down_since_time = mstime();
ri->flags |= SRI_S_DOWN;
}
} else {
/* Is subjectively up */
if (ri->flags & SRI_S_DOWN) {
- sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
+ sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
}
}
@@ -2997,7 +3471,7 @@ void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
- int quorum = 0, odown = 0;
+ unsigned int quorum = 0, odown = 0;
if (master->flags & SRI_S_DOWN) {
/* Is down for enough sentinels? */
@@ -3016,14 +3490,14 @@ void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
/* Set the flag accordingly to the outcome. */
if (odown) {
if ((master->flags & SRI_O_DOWN) == 0) {
- sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d",
+ sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
quorum, master->quorum);
master->flags |= SRI_O_DOWN;
master->o_down_since_time = mstime();
}
} else {
if (master->flags & SRI_O_DOWN) {
- sentinelEvent(REDIS_WARNING,"-odown",master,"%@");
+ sentinelEvent(LL_WARNING,"-odown",master,"%@");
master->flags &= ~SRI_O_DOWN;
}
}
@@ -3032,11 +3506,12 @@ void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
/* Receive the SENTINEL is-master-down-by-addr reply, see the
* sentinelAskMasterStateToOtherSentinels() function for more information. */
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
- sentinelRedisInstance *ri = c->data;
+ sentinelRedisInstance *ri = privdata;
+ instanceLink *link = c->data;
redisReply *r;
- if (ri) ri->pending_commands--;
- if (!reply || !ri) return;
+ if (!reply || !link) return;
+ link->pending_commands--;
r = reply;
/* Ignore every error or unexpected reply.
@@ -3057,8 +3532,8 @@ void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *p
/* If the runid in the reply is not "*" the Sentinel actually
* replied with a vote. */
sdsfree(ri->leader);
- if (ri->leader_epoch != r->element[2]->integer)
- redisLog(REDIS_WARNING,
+ if ((long long)ri->leader_epoch != r->element[2]->integer)
+ serverLog(LL_WARNING,
"%s voted for %s %llu", ri->name,
r->element[1]->str,
(unsigned long long) r->element[2]->integer);
@@ -3097,27 +3572,34 @@ void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int f
* 2) Sentinel is connected.
* 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
if ((master->flags & SRI_S_DOWN) == 0) continue;
- if (ri->flags & SRI_DISCONNECTED) continue;
+ if (ri->link->disconnected) continue;
if (!(flags & SENTINEL_ASK_FORCED) &&
mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
continue;
/* Ask */
ll2string(port,sizeof(port),master->addr->port);
- retval = redisAsyncCommand(ri->cc,
- sentinelReceiveIsMasterDownReply, NULL,
+ retval = redisAsyncCommand(ri->link->cc,
+ sentinelReceiveIsMasterDownReply, ri,
"SENTINEL is-master-down-by-addr %s %s %llu %s",
master->addr->ip, port,
sentinel.current_epoch,
(master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
- server.runid : "*");
- if (retval == REDIS_OK) ri->pending_commands++;
+ sentinel.myid : "*");
+ if (retval == C_OK) ri->link->pending_commands++;
}
dictReleaseIterator(di);
}
/* =============================== FAILOVER ================================= */
+/* Crash because of user request via SENTINEL simulate-failure command. */
+void sentinelSimFailureCrash(void) {
+ serverLog(LL_WARNING,
+ "Sentinel CRASH because of SENTINEL simulate-failure");
+ exit(99);
+}
+
/* Vote for the sentinel with 'req_runid' or return the old vote if already
* voted for the specifed 'req_epoch' or one greater.
*
@@ -3127,7 +3609,7 @@ char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char
if (req_epoch > sentinel.current_epoch) {
sentinel.current_epoch = req_epoch;
sentinelFlushConfig();
- sentinelEvent(REDIS_WARNING,"+new-epoch",master,"%llu",
+ sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
}
@@ -3137,12 +3619,12 @@ char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char
master->leader = sdsnew(req_runid);
master->leader_epoch = sentinel.current_epoch;
sentinelFlushConfig();
- sentinelEvent(REDIS_WARNING,"+vote-for-leader",master,"%s %llu",
+ sentinelEvent(LL_WARNING,"+vote-for-leader",master,"%s %llu",
master->leader, (unsigned long long) master->leader_epoch);
/* If we did not voted for ourselves, set the master failover start
* time to now, in order to force a delay before we can start a
* failover for the same master. */
- if (strcasecmp(master->leader,server.runid))
+ if (strcasecmp(master->leader,sentinel.myid))
master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
}
@@ -3158,16 +3640,16 @@ struct sentinelLeader {
/* Helper function for sentinelGetLeader, increment the counter
* relative to the specified runid. */
int sentinelLeaderIncr(dict *counters, char *runid) {
- dictEntry *de = dictFind(counters,runid);
+ dictEntry *existing, *de;
uint64_t oldval;
- if (de) {
- oldval = dictGetUnsignedIntegerVal(de);
- dictSetUnsignedIntegerVal(de,oldval+1);
+ de = dictAddRaw(counters,runid,&existing);
+ if (existing) {
+ oldval = dictGetUnsignedIntegerVal(existing);
+ dictSetUnsignedIntegerVal(existing,oldval+1);
return oldval+1;
} else {
- de = dictAddRaw(counters,runid);
- redisAssert(de != NULL);
+ serverAssert(de != NULL);
dictSetUnsignedIntegerVal(de,1);
return 1;
}
@@ -3176,9 +3658,9 @@ int sentinelLeaderIncr(dict *counters, char *runid) {
/* Scan all the Sentinels attached to this master to check if there
* is a leader for the specified epoch.
*
- * To be a leader for a given epoch, we should have the majorify of
- * the Sentinels we know that reported the same instance as
- * leader for the same epoch. */
+ * To be a leader for a given epoch, we should have the majority of
+ * the Sentinels we know (ever seen since the last SENTINEL RESET) that
+ * reported the same instance as leader for the same epoch. */
char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
dict *counters;
dictIterator *di;
@@ -3189,16 +3671,17 @@ char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
uint64_t leader_epoch;
uint64_t max_votes = 0;
- redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
+ serverAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
counters = dictCreate(&leaderVotesDictType,NULL);
+ voters = dictSize(master->sentinels)+1; /* All the other sentinels and me.*/
+
/* Count other sentinels votes */
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch)
sentinelLeaderIncr(counters,ri->leader);
- voters++;
}
dictReleaseIterator(di);
@@ -3222,7 +3705,7 @@ char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
if (winner)
myvote = sentinelVoteLeader(master,epoch,winner,&leader_epoch);
else
- myvote = sentinelVoteLeader(master,epoch,server.runid,&leader_epoch);
+ myvote = sentinelVoteLeader(master,epoch,sentinel.myid,&leader_epoch);
if (myvote && leader_epoch == epoch) {
uint64_t votes = sentinelLeaderIncr(counters,myvote);
@@ -3232,7 +3715,6 @@ char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
winner = myvote;
}
}
- voters++; /* Anyway, count me as one of the voters. */
voters_quorum = voters/2+1;
if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
@@ -3251,8 +3733,8 @@ char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
*
* If Host is NULL the function sends "SLAVEOF NO ONE".
*
- * The command returns REDIS_OK if the SLAVEOF command was accepted for
- * (later) delivery otherwise REDIS_ERR. The command replies are just
+ * The command returns C_OK if the SLAVEOF command was accepted for
+ * (later) delivery otherwise C_ERR. The command replies are just
* discarded. */
int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) {
char portstr[32];
@@ -3277,49 +3759,49 @@ int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) {
*
* Note that we don't check the replies returned by commands, since we
* will observe instead the effects in the next INFO output. */
- retval = redisAsyncCommand(ri->cc,
- sentinelDiscardReplyCallback, NULL, "MULTI");
- if (retval == REDIS_ERR) return retval;
- ri->pending_commands++;
+ retval = redisAsyncCommand(ri->link->cc,
+ sentinelDiscardReplyCallback, ri, "MULTI");
+ if (retval == C_ERR) return retval;
+ ri->link->pending_commands++;
- retval = redisAsyncCommand(ri->cc,
- sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s", host, portstr);
- if (retval == REDIS_ERR) return retval;
- ri->pending_commands++;
+ retval = redisAsyncCommand(ri->link->cc,
+ sentinelDiscardReplyCallback, ri, "SLAVEOF %s %s", host, portstr);
+ if (retval == C_ERR) return retval;
+ ri->link->pending_commands++;
- retval = redisAsyncCommand(ri->cc,
- sentinelDiscardReplyCallback, NULL, "CONFIG REWRITE");
- if (retval == REDIS_ERR) return retval;
- ri->pending_commands++;
+ retval = redisAsyncCommand(ri->link->cc,
+ sentinelDiscardReplyCallback, ri, "CONFIG REWRITE");
+ if (retval == C_ERR) return retval;
+ ri->link->pending_commands++;
/* CLIENT KILL TYPE <type> is only supported starting from Redis 2.8.12,
* however sending it to an instance not understanding this command is not
* an issue because CLIENT is variadic command, so Redis will not
* recognized as a syntax error, and the transaction will not fail (but
* only the unsupported command will fail). */
- retval = redisAsyncCommand(ri->cc,
- sentinelDiscardReplyCallback, NULL, "CLIENT KILL TYPE normal");
- if (retval == REDIS_ERR) return retval;
- ri->pending_commands++;
+ retval = redisAsyncCommand(ri->link->cc,
+ sentinelDiscardReplyCallback, ri, "CLIENT KILL TYPE normal");
+ if (retval == C_ERR) return retval;
+ ri->link->pending_commands++;
- retval = redisAsyncCommand(ri->cc,
- sentinelDiscardReplyCallback, NULL, "EXEC");
- if (retval == REDIS_ERR) return retval;
- ri->pending_commands++;
+ retval = redisAsyncCommand(ri->link->cc,
+ sentinelDiscardReplyCallback, ri, "EXEC");
+ if (retval == C_ERR) return retval;
+ ri->link->pending_commands++;
- return REDIS_OK;
+ return C_OK;
}
/* Setup the master state to start a failover. */
void sentinelStartFailover(sentinelRedisInstance *master) {
- redisAssert(master->flags & SRI_MASTER);
+ serverAssert(master->flags & SRI_MASTER);
master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
master->flags |= SRI_FAILOVER_IN_PROGRESS;
master->failover_epoch = ++sentinel.current_epoch;
- sentinelEvent(REDIS_WARNING,"+new-epoch",master,"%llu",
+ sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
- sentinelEvent(REDIS_WARNING,"+try-failover",master,"%@");
+ sentinelEvent(LL_WARNING,"+try-failover",master,"%@");
master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
master->failover_state_change_time = mstime();
}
@@ -3354,7 +3836,7 @@ int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
ctime_r(&clock,ctimebuf);
ctimebuf[24] = '\0'; /* Remove newline. */
master->failover_delay_logged = master->failover_start_time;
- redisLog(REDIS_WARNING,
+ serverLog(LL_WARNING,
"Next failover delay: I will not start a failover before %s",
ctimebuf);
}
@@ -3406,11 +3888,11 @@ int compareSlavesForPromotion(const void *a, const void *b) {
return (*sa)->slave_priority - (*sb)->slave_priority;
/* If priority is the same, select the slave with greater replication
- * offset (processed more data frmo the master). */
+ * offset (processed more data from the master). */
if ((*sa)->slave_repl_offset > (*sb)->slave_repl_offset) {
return -1; /* a < b */
} else if ((*sa)->slave_repl_offset < (*sb)->slave_repl_offset) {
- return 1; /* b > a */
+ return 1; /* a > b */
}
/* If the replication offset is the same select the slave with that has
@@ -3443,8 +3925,9 @@ sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
sentinelRedisInstance *slave = dictGetVal(de);
mstime_t info_validity_time;
- if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
- if (mstime() - slave->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
+ if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
+ if (slave->link->disconnected) continue;
+ if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
if (slave->slave_priority == 0) continue;
/* If the master is in SDOWN state we get INFO for slaves every second.
@@ -3475,7 +3958,7 @@ void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
/* Check if we are the leader for the failover epoch. */
leader = sentinelGetLeader(ri, ri->failover_epoch);
- isleader = leader && strcasecmp(leader,server.runid) == 0;
+ isleader = leader && strcasecmp(leader,sentinel.myid) == 0;
sdsfree(leader);
/* If I'm not the leader, and it is not a forced failover via
@@ -3489,15 +3972,17 @@ void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
election_timeout = ri->failover_timeout;
/* Abort the failover if I'm not the leader after some time. */
if (mstime() - ri->failover_start_time > election_timeout) {
- sentinelEvent(REDIS_WARNING,"-failover-abort-not-elected",ri,"%@");
+ sentinelEvent(LL_WARNING,"-failover-abort-not-elected",ri,"%@");
sentinelAbortFailover(ri);
}
return;
}
- sentinelEvent(REDIS_WARNING,"+elected-leader",ri,"%@");
+ sentinelEvent(LL_WARNING,"+elected-leader",ri,"%@");
+ if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION)
+ sentinelSimFailureCrash();
ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
ri->failover_state_change_time = mstime();
- sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
+ sentinelEvent(LL_WARNING,"+failover-state-select-slave",ri,"%@");
}
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
@@ -3506,15 +3991,15 @@ void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
/* We don't handle the timeout in this state as the function aborts
* the failover or go forward in the next state. */
if (slave == NULL) {
- sentinelEvent(REDIS_WARNING,"-failover-abort-no-good-slave",ri,"%@");
+ sentinelEvent(LL_WARNING,"-failover-abort-no-good-slave",ri,"%@");
sentinelAbortFailover(ri);
} else {
- sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
+ sentinelEvent(LL_WARNING,"+selected-slave",slave,"%@");
slave->flags |= SRI_PROMOTED;
ri->promoted_slave = slave;
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
ri->failover_state_change_time = mstime();
- sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone",
+ sentinelEvent(LL_NOTICE,"+failover-state-send-slaveof-noone",
slave, "%@");
}
}
@@ -3525,9 +4010,9 @@ void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
/* We can't send the command to the promoted slave if it is now
* disconnected. Retry again and again with this state until the timeout
* is reached, then abort the failover. */
- if (ri->promoted_slave->flags & SRI_DISCONNECTED) {
+ if (ri->promoted_slave->link->disconnected) {
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
- sentinelEvent(REDIS_WARNING,"-failover-abort-slave-timeout",ri,"%@");
+ sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
return;
@@ -3538,8 +4023,8 @@ void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
* really care about the reply. We check if it worked indirectly observing
* if INFO returns a different role (master instead of slave). */
retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0);
- if (retval != REDIS_OK) return;
- sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion",
+ if (retval != C_OK) return;
+ sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion",
ri->promoted_slave,"%@");
ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
ri->failover_state_change_time = mstime();
@@ -3551,7 +4036,7 @@ void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
/* Just handle the timeout. Switching to the next state is handled
* by the function parsing the INFO command of the promoted slave. */
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
- sentinelEvent(REDIS_WARNING,"-failover-abort-slave-timeout",ri,"%@");
+ sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
}
@@ -3583,11 +4068,11 @@ void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
if (elapsed > master->failover_timeout) {
not_reconfigured = 0;
timeout = 1;
- sentinelEvent(REDIS_WARNING,"+failover-end-for-timeout",master,"%@");
+ sentinelEvent(LL_WARNING,"+failover-end-for-timeout",master,"%@");
}
if (not_reconfigured == 0) {
- sentinelEvent(REDIS_WARNING,"+failover-end",master,"%@");
+ sentinelEvent(LL_WARNING,"+failover-end",master,"%@");
master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
master->failover_state_change_time = mstime();
}
@@ -3604,14 +4089,14 @@ void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
sentinelRedisInstance *slave = dictGetVal(de);
int retval;
- if (slave->flags &
- (SRI_RECONF_DONE|SRI_RECONF_SENT|SRI_DISCONNECTED)) continue;
+ if (slave->flags & (SRI_RECONF_DONE|SRI_RECONF_SENT)) continue;
+ if (slave->link->disconnected) continue;
retval = sentinelSendSlaveOf(slave,
master->promoted_slave->addr->ip,
master->promoted_slave->addr->port);
- if (retval == REDIS_OK) {
- sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent-be",slave,"%@");
+ if (retval == C_OK) {
+ sentinelEvent(LL_NOTICE,"+slave-reconf-sent-be",slave,"%@");
slave->flags |= SRI_RECONF_SENT;
}
}
@@ -3653,24 +4138,24 @@ void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
(mstime() - slave->slave_reconf_sent_time) >
SENTINEL_SLAVE_RECONF_TIMEOUT)
{
- sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
+ sentinelEvent(LL_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
slave->flags &= ~SRI_RECONF_SENT;
slave->flags |= SRI_RECONF_DONE;
}
/* Nothing to do for instances that are disconnected or already
* in RECONF_SENT state. */
- if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG))
- continue;
+ if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)) continue;
+ if (slave->link->disconnected) continue;
/* Send SLAVEOF <new master>. */
retval = sentinelSendSlaveOf(slave,
master->promoted_slave->addr->ip,
master->promoted_slave->addr->port);
- if (retval == REDIS_OK) {
+ if (retval == C_OK) {
slave->flags |= SRI_RECONF_SENT;
slave->slave_reconf_sent_time = mstime();
- sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@");
+ sentinelEvent(LL_NOTICE,"+slave-reconf-sent",slave,"%@");
in_progress++;
}
}
@@ -3687,7 +4172,7 @@ void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
sentinelRedisInstance *ref = master->promoted_slave ?
master->promoted_slave : master;
- sentinelEvent(REDIS_WARNING,"+switch-master",master,"%s %s %d %s %d",
+ sentinelEvent(LL_WARNING,"+switch-master",master,"%s %s %d %s %d",
master->name, master->addr->ip, master->addr->port,
ref->addr->ip, ref->addr->port);
@@ -3695,7 +4180,7 @@ void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
}
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
- redisAssert(ri->flags & SRI_MASTER);
+ serverAssert(ri->flags & SRI_MASTER);
if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
@@ -3724,8 +4209,8 @@ void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
* the slave -> master switch. Otherwise the failover can't be aborted and
* will reach its end (possibly by timeout). */
void sentinelAbortFailover(sentinelRedisInstance *ri) {
- redisAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
- redisAssert(ri->failover_state <= SENTINEL_FAILOVER_STATE_WAIT_PROMOTION);
+ serverAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
+ serverAssert(ri->failover_state <= SENTINEL_FAILOVER_STATE_WAIT_PROMOTION);
ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_FORCE_FAILOVER);
ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
@@ -3755,7 +4240,7 @@ void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
if (sentinel.tilt) {
if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
sentinel.tilt = 0;
- sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
+ sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
}
/* Every kind of instance */
@@ -3828,7 +4313,7 @@ void sentinelCheckTiltCondition(void) {
if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
sentinel.tilt = 1;
sentinel.tilt_start_time = mstime();
- sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered");
+ sentinelEvent(LL_WARNING,"+tilt",NULL,"#tilt mode entered");
}
sentinel.previous_time = mstime();
}
@@ -3846,6 +4331,6 @@ void sentinelTimer(void) {
* exactly continue to stay synchronized asking to be voted at the
* same time again and again (resulting in nobody likely winning the
* election because of split brain voting). */
- server.hz = REDIS_DEFAULT_HZ + rand() % REDIS_DEFAULT_HZ;
+ server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
}