diff options
Diffstat (limited to 'src/sentinel.c')
-rw-r--r-- | src/sentinel.c | 1427 |
1 files changed, 956 insertions, 471 deletions
diff --git a/src/sentinel.c b/src/sentinel.c index 48e1de8dd..6c6a3a0cd 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -28,7 +28,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#include "redis.h" +#include "server.h" #include "hiredis.h" #include "async.h" @@ -54,19 +54,18 @@ typedef struct sentinelAddr { #define SRI_MASTER (1<<0) #define SRI_SLAVE (1<<1) #define SRI_SENTINEL (1<<2) -#define SRI_DISCONNECTED (1<<3) -#define SRI_S_DOWN (1<<4) /* Subjectively down (no quorum). */ -#define SRI_O_DOWN (1<<5) /* Objectively down (confirmed by others). */ -#define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that +#define SRI_S_DOWN (1<<3) /* Subjectively down (no quorum). */ +#define SRI_O_DOWN (1<<4) /* Objectively down (confirmed by others). */ +#define SRI_MASTER_DOWN (1<<5) /* A Sentinel with this flag set thinks that its master is down. */ -#define SRI_FAILOVER_IN_PROGRESS (1<<7) /* Failover is in progress for +#define SRI_FAILOVER_IN_PROGRESS (1<<6) /* Failover is in progress for this master. */ -#define SRI_PROMOTED (1<<8) /* Slave selected for promotion. */ -#define SRI_RECONF_SENT (1<<9) /* SLAVEOF <newmaster> sent. */ -#define SRI_RECONF_INPROG (1<<10) /* Slave synchronization in progress. */ -#define SRI_RECONF_DONE (1<<11) /* Slave synchronized with new master. */ -#define SRI_FORCE_FAILOVER (1<<12) /* Force failover with master up. */ -#define SRI_SCRIPT_KILL_SENT (1<<13) /* SCRIPT KILL already sent on -BUSY */ +#define SRI_PROMOTED (1<<7) /* Slave selected for promotion. */ +#define SRI_RECONF_SENT (1<<8) /* SLAVEOF <newmaster> sent. */ +#define SRI_RECONF_INPROG (1<<9) /* Slave synchronization in progress. */ +#define SRI_RECONF_DONE (1<<10) /* Slave synchronized with new master. */ +#define SRI_FORCE_FAILOVER (1<<11) /* Force failover with master up. */ +#define SRI_SCRIPT_KILL_SENT (1<<12) /* SCRIPT KILL already sent on -BUSY */ /* Note: times are in milliseconds. */ #define SENTINEL_INFO_PERIOD 10000 @@ -115,27 +114,59 @@ typedef struct sentinelAddr { #define SENTINEL_SCRIPT_MAX_RETRY 10 #define SENTINEL_SCRIPT_RETRY_DELAY 30000 /* 30 seconds between retries. */ -typedef struct sentinelRedisInstance { - int flags; /* See SRI_... defines */ - char *name; /* Master name from the point of view of this sentinel. */ - char *runid; /* run ID of this instance. */ - uint64_t config_epoch; /* Configuration epoch. */ - sentinelAddr *addr; /* Master host. */ +/* SENTINEL SIMULATE-FAILURE command flags. */ +#define SENTINEL_SIMFAILURE_NONE 0 +#define SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION (1<<0) +#define SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION (1<<1) + +/* The link to a sentinelRedisInstance. When we have the same set of Sentinels + * monitoring many masters, we have different instances representing the + * same Sentinels, one per master, and we need to share the hiredis connections + * among them. Oherwise if 5 Sentinels are monitoring 100 masters we create + * 500 outgoing connections instead of 5. + * + * So this structure represents a reference counted link in terms of the two + * hiredis connections for commands and Pub/Sub, and the fields needed for + * failure detection, since the ping/pong time are now local to the link: if + * the link is available, the instance is avaialbe. This way we don't just + * have 5 connections instead of 500, we also send 5 pings instead of 500. + * + * Links are shared only for Sentinels: master and slave instances have + * a link with refcount = 1, always. */ +typedef struct instanceLink { + int refcount; /* Number of sentinelRedisInstance owners. */ + int disconnected; /* Non-zero if we need to reconnect cc or pc. */ + int pending_commands; /* Number of commands sent waiting for a reply. */ redisAsyncContext *cc; /* Hiredis context for commands. */ redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */ - int pending_commands; /* Number of commands sent waiting for a reply. */ mstime_t cc_conn_time; /* cc connection time. */ mstime_t pc_conn_time; /* pc connection time. */ mstime_t pc_last_activity; /* Last time we received any message. */ mstime_t last_avail_time; /* Last time the instance replied to ping with a reply we consider valid. */ - mstime_t last_ping_time; /* Last time a pending ping was sent in the - context of the current command connection - with the instance. 0 if still not sent or - if pong already received. */ + mstime_t act_ping_time; /* Time at which the last pending ping (no pong + received after it) was sent. This field is + set to 0 when a pong is received, and set again + to the current time if the value is 0 and a new + ping is sent. */ + mstime_t last_ping_time; /* Time at which we sent the last ping. This is + only used to avoid sending too many pings + during failure. Idle time is computed using + the act_ping_time field. */ mstime_t last_pong_time; /* Last time the instance replied to ping, whatever the reply was. That's used to check if the link is idle and must be reconnected. */ + mstime_t last_reconn_time; /* Last reconnection attempt performed when + the link was down. */ +} instanceLink; + +typedef struct sentinelRedisInstance { + int flags; /* See SRI_... defines */ + char *name; /* Master name from the point of view of this sentinel. */ + char *runid; /* Run ID of this instance, or unique ID if is a Sentinel.*/ + uint64_t config_epoch; /* Configuration epoch. */ + sentinelAddr *addr; /* Master host. */ + instanceLink *link; /* Link to the instance, may be shared for Sentinels. */ mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */ mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time we received a hello from this Sentinel @@ -159,7 +190,7 @@ typedef struct sentinelRedisInstance { /* Master specific. */ dict *sentinels; /* Other sentinels monitoring the same master. */ dict *slaves; /* Slaves for this master instance. */ - int quorum; /* Number of sentinels that need to agree on failure. */ + unsigned int quorum;/* Number of sentinels that need to agree on failure. */ int parallel_syncs; /* How many slaves to reconfigure at same time. */ char *auth_pass; /* Password to use for AUTH against master & slaves. */ @@ -190,19 +221,26 @@ typedef struct sentinelRedisInstance { * are set to NULL no script is executed. */ char *notification_script; char *client_reconfig_script; + sds info; /* cached INFO output */ } sentinelRedisInstance; /* Main state. */ struct sentinelState { - uint64_t current_epoch; /* Current epoch. */ + char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */ + uint64_t current_epoch; /* Current epoch. */ dict *masters; /* Dictionary of master sentinelRedisInstances. Key is the instance name, value is the sentinelRedisInstance structure pointer. */ int tilt; /* Are we in TILT mode? */ int running_scripts; /* Number of scripts in execution right now. */ - mstime_t tilt_start_time; /* When TITL started. */ - mstime_t previous_time; /* Last time we ran the time handler. */ - list *scripts_queue; /* Queue of user scripts to execute. */ + mstime_t tilt_start_time; /* When TITL started. */ + mstime_t previous_time; /* Last time we ran the time handler. */ + list *scripts_queue; /* Queue of user scripts to execute. */ + char *announce_ip; /* IP addr that is gossiped to other sentinels if + not NULL. */ + int announce_port; /* Port that is gossiped to other sentinels if + non zero. */ + unsigned long simfailure_flags; /* Failures simulation. */ } sentinel; /* A script execution job. */ @@ -293,7 +331,7 @@ static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) { /* Nothing should be attached when something is already attached */ if (ac->ev.data != NULL) - return REDIS_ERR; + return C_ERR; /* Create container for context and r/w events */ e = (redisAeEvents*)zmalloc(sizeof(*e)); @@ -310,7 +348,7 @@ static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) { ac->ev.cleanup = redisAeCleanup; ac->ev.data = e; - return REDIS_OK; + return C_OK; } /* ============================= Prototypes ================================= */ @@ -322,8 +360,7 @@ sentinelRedisInstance *sentinelGetMasterByName(char *name); char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master); char *sentinelGetObjectiveLeader(sentinelRedisInstance *master); int yesnotoi(char *s); -void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c); -void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c); +void instanceLinkConnectionError(const redisAsyncContext *c); const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri); void sentinelAbortFailover(sentinelRedisInstance *ri); void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...); @@ -337,14 +374,17 @@ void sentinelFlushConfig(void); void sentinelGenerateInitialMonitorEvents(void); int sentinelSendPing(sentinelRedisInstance *ri); int sentinelForceHelloUpdateForMaster(sentinelRedisInstance *master); +sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid); +void sentinelSimFailureCrash(void); /* ========================= Dictionary types =============================== */ -unsigned int dictSdsHash(const void *key); +uint64_t dictSdsHash(const void *key); int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2); void releaseSentinelRedisInstance(sentinelRedisInstance *ri); void dictInstancesValDestructor (void *privdata, void *obj) { + UNUSED(privdata); releaseSentinelRedisInstance(obj); } @@ -376,11 +416,11 @@ dictType leaderVotesDictType = { /* =========================== Initialization =============================== */ -void sentinelCommand(redisClient *c); -void sentinelInfoCommand(redisClient *c); -void sentinelSetCommand(redisClient *c); -void sentinelPublishCommand(redisClient *c); -void sentinelRoleCommand(redisClient *c); +void sentinelCommand(client *c); +void sentinelInfoCommand(client *c); +void sentinelSetCommand(client *c); +void sentinelPublishCommand(client *c); +void sentinelRoleCommand(client *c); struct redisCommand sentinelcmds[] = { {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0}, @@ -392,6 +432,7 @@ struct redisCommand sentinelcmds[] = { {"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0}, {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0}, {"role",sentinelRoleCommand,1,"l",0,NULL,0,0,0,0,0}, + {"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0}, {"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0} }; @@ -403,7 +444,7 @@ void initSentinelConfig(void) { /* Perform the Sentinel mode initialization. */ void initSentinel(void) { - int j; + unsigned int j; /* Remove usual Redis commands from the command table, then just add * the SENTINEL command. */ @@ -413,7 +454,7 @@ void initSentinel(void) { struct redisCommand *cmd = sentinelcmds+j; retval = dictAdd(server.commands, sdsnew(cmd->name), cmd); - redisAssert(retval == DICT_OK); + serverAssert(retval == DICT_OK); } /* Initialize various data structures. */ @@ -424,24 +465,43 @@ void initSentinel(void) { sentinel.previous_time = mstime(); sentinel.running_scripts = 0; sentinel.scripts_queue = listCreate(); + sentinel.announce_ip = NULL; + sentinel.announce_port = 0; + sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE; + memset(sentinel.myid,0,sizeof(sentinel.myid)); } /* This function gets called when the server is in Sentinel mode, started, * loaded the configuration, and is ready for normal operations. */ void sentinelIsRunning(void) { - redisLog(REDIS_WARNING,"Sentinel runid is %s", server.runid); + int j; if (server.configfile == NULL) { - redisLog(REDIS_WARNING, + serverLog(LL_WARNING, "Sentinel started without a config file. Exiting..."); exit(1); } else if (access(server.configfile,W_OK) == -1) { - redisLog(REDIS_WARNING, + serverLog(LL_WARNING, "Sentinel config file %s is not writable: %s. Exiting...", server.configfile,strerror(errno)); exit(1); } + /* If this Sentinel has yet no ID set in the configuration file, we + * pick a random one and persist the config on disk. From now on this + * will be this Sentinel ID across restarts. */ + for (j = 0; j < CONFIG_RUN_ID_SIZE; j++) + if (sentinel.myid[j] != 0) break; + + if (j == CONFIG_RUN_ID_SIZE) { + /* Pick ID and presist the config. */ + getRandomHexChars(sentinel.myid,CONFIG_RUN_ID_SIZE); + sentinelFlushConfig(); + } + + /* Log its ID to make debugging of issues simpler. */ + serverLog(LL_WARNING,"Sentinel ID is %s", sentinel.myid); + /* We want to generate a +monitor event for every configured master * at startup. */ sentinelGenerateInitialMonitorEvents(); @@ -455,19 +515,19 @@ void sentinelIsRunning(void) { * EINVAL: Invalid port number. */ sentinelAddr *createSentinelAddr(char *hostname, int port) { - char buf[32]; + char ip[NET_IP_STR_LEN]; sentinelAddr *sa; - if (port <= 0 || port > 65535) { + if (port < 0 || port > 65535) { errno = EINVAL; return NULL; } - if (anetResolve(NULL,hostname,buf,sizeof(buf)) == ANET_ERR) { + if (anetResolve(NULL,hostname,ip,sizeof(ip)) == ANET_ERR) { errno = ENOENT; return NULL; } sa = zmalloc(sizeof(*sa)); - sa->ip = sdsnew(buf); + sa->ip = sdsnew(ip); sa->port = port; return sa; } @@ -497,7 +557,7 @@ int sentinelAddrIsEqual(sentinelAddr *a, sentinelAddr *b) { /* Send an event to log, pub/sub, user notification script. * - * 'level' is the log level for logging. Only REDIS_WARNING events will trigger + * 'level' is the log level for logging. Only LL_WARNING events will trigger * the execution of the user notification script. * * 'type' is the message type, also used as a pub/sub channel name. @@ -522,7 +582,7 @@ int sentinelAddrIsEqual(sentinelAddr *a, sentinelAddr *b) { void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...) { va_list ap; - char msg[REDIS_MAX_LOGMSG_LEN]; + char msg[LOG_MAX_LEN]; robj *channel, *payload; /* Handle %@ */ @@ -554,10 +614,10 @@ void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, /* Log the message if the log level allows it to be logged. */ if (level >= server.verbosity) - redisLog(level,"%s %s",type,msg); + serverLog(level,"%s %s",type,msg); /* Publish the message via Pub/Sub if it's not a debugging one. */ - if (level != REDIS_DEBUG) { + if (level != LL_DEBUG) { channel = createStringObject(type,strlen(type)); payload = createStringObject(msg,strlen(msg)); pubsubPublishMessage(channel,payload); @@ -566,10 +626,10 @@ void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, } /* Call the notification script if applicable. */ - if (level == REDIS_WARNING && ri != NULL) { + if (level == LL_WARNING && ri != NULL) { sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master; - if (master->notification_script) { + if (master && master->notification_script) { sentinelScheduleScriptExecution(master->notification_script, type,msg,NULL); } @@ -587,7 +647,7 @@ void sentinelGenerateInitialMonitorEvents(void) { di = dictGetIterator(sentinel.masters); while((de = dictNext(di)) != NULL) { sentinelRedisInstance *ri = dictGetVal(de); - sentinelEvent(REDIS_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum); + sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum); } dictReleaseIterator(di); } @@ -645,7 +705,7 @@ void sentinelScheduleScriptExecution(char *path, ...) { sentinelReleaseScriptJob(sj); break; } - redisAssert(listLength(sentinel.scripts_queue) <= + serverAssert(listLength(sentinel.scripts_queue) <= SENTINEL_SCRIPT_MAX_QUEUE); } } @@ -697,7 +757,7 @@ void sentinelRunPendingScripts(void) { /* Parent (fork error). * We report fork errors as signal 99, in order to unify the * reporting with other kind of errors. */ - sentinelEvent(REDIS_WARNING,"-script-error",NULL, + sentinelEvent(LL_WARNING,"-script-error",NULL, "%s %d %d", sj->argv[0], 99, 0); sj->flags &= ~SENTINEL_SCRIPT_RUNNING; sj->pid = 0; @@ -709,7 +769,7 @@ void sentinelRunPendingScripts(void) { } else { sentinel.running_scripts++; sj->pid = pid; - sentinelEvent(REDIS_DEBUG,"+script-child",NULL,"%ld",(long)pid); + sentinelEvent(LL_DEBUG,"+script-child",NULL,"%ld",(long)pid); } } } @@ -743,12 +803,12 @@ void sentinelCollectTerminatedScripts(void) { sentinelScriptJob *sj; if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); - sentinelEvent(REDIS_DEBUG,"-script-child",NULL,"%ld %d %d", + sentinelEvent(LL_DEBUG,"-script-child",NULL,"%ld %d %d", (long)pid, exitcode, bysignal); ln = sentinelGetScriptListNodeByPid(pid); if (ln == NULL) { - redisLog(REDIS_WARNING,"wait3() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid); + serverLog(LL_WARNING,"wait3() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid); continue; } sj = ln->value; @@ -767,7 +827,7 @@ void sentinelCollectTerminatedScripts(void) { /* Otherwise let's remove the script, but log the event if the * execution did not terminated in the best of the ways. */ if (bysignal || exitcode != 0) { - sentinelEvent(REDIS_WARNING,"-script-error",NULL, + sentinelEvent(LL_WARNING,"-script-error",NULL, "%s %d %d", sj->argv[0], bysignal, exitcode); } listDelNode(sentinel.scripts_queue,ln); @@ -791,7 +851,7 @@ void sentinelKillTimedoutScripts(void) { if (sj->flags & SENTINEL_SCRIPT_RUNNING && (now - sj->start_time) > SENTINEL_SCRIPT_MAX_RUNTIME) { - sentinelEvent(REDIS_WARNING,"-script-timeout",NULL,"%s %ld", + sentinelEvent(LL_WARNING,"-script-timeout",NULL,"%s %ld", sj->argv[0], (long)sj->pid); kill(sj->pid,SIGKILL); } @@ -799,7 +859,7 @@ void sentinelKillTimedoutScripts(void) { } /* Implements SENTINEL PENDING-SCRIPTS command. */ -void sentinelPendingScriptsCommand(redisClient *c) { +void sentinelPendingScriptsCommand(client *c) { listNode *ln; listIter li; @@ -863,6 +923,201 @@ void sentinelCallClientReconfScript(sentinelRedisInstance *master, int role, cha state, from->ip, fromport, to->ip, toport, NULL); } +/* =============================== instanceLink ============================= */ + +/* Create a not yet connected link object. */ +instanceLink *createInstanceLink(void) { + instanceLink *link = zmalloc(sizeof(*link)); + + link->refcount = 1; + link->disconnected = 1; + link->pending_commands = 0; + link->cc = NULL; + link->pc = NULL; + link->cc_conn_time = 0; + link->pc_conn_time = 0; + link->last_reconn_time = 0; + link->pc_last_activity = 0; + /* We set the act_ping_time to "now" even if we actually don't have yet + * a connection with the node, nor we sent a ping. + * This is useful to detect a timeout in case we'll not be able to connect + * with the node at all. */ + link->act_ping_time = mstime(); + link->last_ping_time = 0; + link->last_avail_time = mstime(); + link->last_pong_time = mstime(); + return link; +} + +/* Disconnect an hiredis connection in the context of an instance link. */ +void instanceLinkCloseConnection(instanceLink *link, redisAsyncContext *c) { + if (c == NULL) return; + + if (link->cc == c) { + link->cc = NULL; + link->pending_commands = 0; + } + if (link->pc == c) link->pc = NULL; + c->data = NULL; + link->disconnected = 1; + redisAsyncFree(c); +} + +/* Decrement the refcount of a link object, if it drops to zero, actually + * free it and return NULL. Otherwise don't do anything and return the pointer + * to the object. + * + * If we are not going to free the link and ri is not NULL, we rebind all the + * pending requests in link->cc (hiredis connection for commands) to a + * callback that will just ignore them. This is useful to avoid processing + * replies for an instance that no longer exists. */ +instanceLink *releaseInstanceLink(instanceLink *link, sentinelRedisInstance *ri) +{ + serverAssert(link->refcount > 0); + link->refcount--; + if (link->refcount != 0) { + if (ri && ri->link->cc) { + /* This instance may have pending callbacks in the hiredis async + * context, having as 'privdata' the instance that we are going to + * free. Let's rewrite the callback list, directly exploiting + * hiredis internal data structures, in order to bind them with + * a callback that will ignore the reply at all. */ + redisCallback *cb; + redisCallbackList *callbacks = &link->cc->replies; + + cb = callbacks->head; + while(cb) { + if (cb->privdata == ri) { + cb->fn = sentinelDiscardReplyCallback; + cb->privdata = NULL; /* Not strictly needed. */ + } + cb = cb->next; + } + } + return link; /* Other active users. */ + } + + instanceLinkCloseConnection(link,link->cc); + instanceLinkCloseConnection(link,link->pc); + zfree(link); + return NULL; +} + +/* This function will attempt to share the instance link we already have + * for the same Sentinel in the context of a different master, with the + * instance we are passing as argument. + * + * This way multiple Sentinel objects that refer all to the same physical + * Sentinel instance but in the context of different masters will use + * a single connection, will send a single PING per second for failure + * detection and so forth. + * + * Return C_OK if a matching Sentinel was found in the context of a + * different master and sharing was performed. Otherwise C_ERR + * is returned. */ +int sentinelTryConnectionSharing(sentinelRedisInstance *ri) { + serverAssert(ri->flags & SRI_SENTINEL); + dictIterator *di; + dictEntry *de; + + if (ri->runid == NULL) return C_ERR; /* No way to identify it. */ + if (ri->link->refcount > 1) return C_ERR; /* Already shared. */ + + di = dictGetIterator(sentinel.masters); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *master = dictGetVal(de), *match; + /* We want to share with the same physical Sentinel referenced + * in other masters, so skip our master. */ + if (master == ri->master) continue; + match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels, + NULL,0,ri->runid); + if (match == NULL) continue; /* No match. */ + if (match == ri) continue; /* Should never happen but... safer. */ + + /* We identified a matching Sentinel, great! Let's free our link + * and use the one of the matching Sentinel. */ + releaseInstanceLink(ri->link,NULL); + ri->link = match->link; + match->link->refcount++; + return C_OK; + } + dictReleaseIterator(di); + return C_ERR; +} + +/* When we detect a Sentinel to switch address (reporting a different IP/port + * pair in Hello messages), let's update all the matching Sentinels in the + * context of other masters as well and disconnect the links, so that everybody + * will be updated. + * + * Return the number of updated Sentinel addresses. */ +int sentinelUpdateSentinelAddressInAllMasters(sentinelRedisInstance *ri) { + serverAssert(ri->flags & SRI_SENTINEL); + dictIterator *di; + dictEntry *de; + int reconfigured = 0; + + di = dictGetIterator(sentinel.masters); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *master = dictGetVal(de), *match; + match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels, + NULL,0,ri->runid); + /* If there is no match, this master does not know about this + * Sentinel, try with the next one. */ + if (match == NULL) continue; + + /* Disconnect the old links if connected. */ + if (match->link->cc != NULL) + instanceLinkCloseConnection(match->link,match->link->cc); + if (match->link->pc != NULL) + instanceLinkCloseConnection(match->link,match->link->pc); + + if (match == ri) continue; /* Address already updated for it. */ + + /* Update the address of the matching Sentinel by copying the address + * of the Sentinel object that received the address update. */ + releaseSentinelAddr(match->addr); + match->addr = dupSentinelAddr(ri->addr); + reconfigured++; + } + dictReleaseIterator(di); + if (reconfigured) + sentinelEvent(LL_NOTICE,"+sentinel-address-update", ri, + "%@ %d additional matching instances", reconfigured); + return reconfigured; +} + +/* This function is called when an hiredis connection reported an error. + * We set it to NULL and mark the link as disconnected so that it will be + * reconnected again. + * + * Note: we don't free the hiredis context as hiredis will do it for us + * for async connections. */ +void instanceLinkConnectionError(const redisAsyncContext *c) { + instanceLink *link = c->data; + int pubsub; + + if (!link) return; + + pubsub = (link->pc == c); + if (pubsub) + link->pc = NULL; + else + link->cc = NULL; + link->disconnected = 1; +} + +/* Hiredis connection established / disconnected callbacks. We need them + * just to cleanup our link state. */ +void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) { + if (status != C_OK) instanceLinkConnectionError(c); +} + +void sentinelDisconnectCallback(const redisAsyncContext *c, int status) { + UNUSED(status); + instanceLinkConnectionError(c); +} + /* ========================== sentinelRedisInstance ========================= */ /* Create a redis instance, the following fields must be populated by the @@ -884,25 +1139,25 @@ void sentinelCallClientReconfScript(sentinelRedisInstance *master, int role, cha * createSentinelAddr() function. * * The function may also fail and return NULL with errno set to EBUSY if - * a master or slave with the same name already exists. */ + * a master with the same name, a slave with the same address, or a sentinel + * with the same ID already exists. */ + sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) { sentinelRedisInstance *ri; sentinelAddr *addr; dict *table = NULL; - char slavename[128], *sdsname; + char slavename[NET_PEER_ID_LEN], *sdsname; - redisAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL)); - redisAssert((flags & SRI_MASTER) || master != NULL); + serverAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL)); + serverAssert((flags & SRI_MASTER) || master != NULL); /* Check address validity. */ addr = createSentinelAddr(hostname,port); if (addr == NULL) return NULL; - /* For slaves and sentinel we use ip:port as name. */ - if (flags & (SRI_SLAVE|SRI_SENTINEL)) { - snprintf(slavename,sizeof(slavename), - strchr(hostname,':') ? "[%s]:%d" : "%s:%d", - hostname,port); + /* For slaves use ip:port as name. */ + if (flags & SRI_SLAVE) { + anetFormatAddr(slavename, sizeof(slavename), hostname, port); name = slavename; } @@ -915,6 +1170,7 @@ sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char * else if (flags & SRI_SENTINEL) table = master->sentinels; sdsname = sdsnew(name); if (dictFind(table,sdsname)) { + releaseSentinelAddr(addr); sdsfree(sdsname); errno = EBUSY; return NULL; @@ -924,24 +1180,12 @@ sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char * ri = zmalloc(sizeof(*ri)); /* Note that all the instances are started in the disconnected state, * the event loop will take care of connecting them. */ - ri->flags = flags | SRI_DISCONNECTED; + ri->flags = flags; ri->name = sdsname; ri->runid = NULL; ri->config_epoch = 0; ri->addr = addr; - ri->cc = NULL; - ri->pc = NULL; - ri->pending_commands = 0; - ri->cc_conn_time = 0; - ri->pc_conn_time = 0; - ri->pc_last_activity = 0; - /* We set the last_ping_time to "now" even if we actually don't have yet - * a connection with the node, nor we sent a ping. - * This is useful to detect a timeout in case we'll not be able to connect - * with the node at all. */ - ri->last_ping_time = mstime(); - ri->last_avail_time = mstime(); - ri->last_pong_time = mstime(); + ri->link = createInstanceLink(); ri->last_pub_time = mstime(); ri->last_hello_time = mstime(); ri->last_master_down_reply_time = mstime(); @@ -976,6 +1220,7 @@ sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char * ri->promoted_slave = NULL; ri->notification_script = NULL; ri->client_reconfig_script = NULL; + ri->info = NULL; /* Role */ ri->role_reported = ri->flags & (SRI_MASTER|SRI_SLAVE); @@ -996,9 +1241,8 @@ void releaseSentinelRedisInstance(sentinelRedisInstance *ri) { dictRelease(ri->sentinels); dictRelease(ri->slaves); - /* Release hiredis connections. */ - if (ri->cc) sentinelKillLink(ri,ri->cc); - if (ri->pc) sentinelKillLink(ri,ri->pc); + /* Disconnect the instance. */ + releaseInstanceLink(ri->link,ri); /* Free other resources. */ sdsfree(ri->name); @@ -1008,6 +1252,7 @@ void releaseSentinelRedisInstance(sentinelRedisInstance *ri) { sdsfree(ri->slave_master_host); sdsfree(ri->leader); sdsfree(ri->auth_pass); + sdsfree(ri->info); releaseSentinelAddr(ri->addr); /* Clear state into the master if needed. */ @@ -1023,11 +1268,11 @@ sentinelRedisInstance *sentinelRedisInstanceLookupSlave( { sds key; sentinelRedisInstance *slave; + char buf[NET_PEER_ID_LEN]; - redisAssert(ri->flags & SRI_MASTER); - key = sdscatprintf(sdsempty(), - strchr(ip,':') ? "[%s]:%d" : "%s:%d", - ip,port); + serverAssert(ri->flags & SRI_MASTER); + anetFormatAddr(buf,sizeof(buf),ip,port); + key = sdsnew(buf); slave = dictFetchValue(ri->slaves,key); sdsfree(key); return slave; @@ -1041,35 +1286,29 @@ const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) { else return "unknown"; } -/* This function removes all the instances found in the dictionary of - * sentinels in the specified 'master', having either: +/* This function remove the Sentinel with the specified ID from the + * specified master. * - * 1) The same ip/port as specified. - * 2) The same runid. + * If "runid" is NULL the function returns ASAP. * - * "1" and "2" don't need to verify at the same time, just one is enough. - * If "runid" is NULL it is not checked. - * Similarly if "ip" is NULL it is not checked. + * This function is useful because on Sentinels address switch, we want to + * remove our old entry and add a new one for the same ID but with the new + * address. * - * This function is useful because every time we add a new Sentinel into - * a master's Sentinels dictionary, we want to be very sure about not - * having duplicated instances for any reason. This is important because - * other sentinels are needed to reach ODOWN quorum, and later to get - * voted for a given configuration epoch in order to perform the failover. - * - * The function returns the number of Sentinels removed. */ -int removeMatchingSentinelsFromMaster(sentinelRedisInstance *master, char *ip, int port, char *runid) { + * The function returns 1 if the matching Sentinel was removed, otherwise + * 0 if there was no Sentinel with this ID. */ +int removeMatchingSentinelFromMaster(sentinelRedisInstance *master, char *runid) { dictIterator *di; dictEntry *de; int removed = 0; + if (runid == NULL) return 0; + di = dictGetSafeIterator(master->sentinels); while((de = dictNext(di)) != NULL) { sentinelRedisInstance *ri = dictGetVal(de); - if ((ri->runid && runid && strcmp(ri->runid,runid) == 0) || - (ip && strcmp(ri->addr->ip,ip) == 0 && port == ri->addr->port)) - { + if (ri->runid && strcmp(ri->runid,runid) == 0) { dictDelete(master->sentinels,ri->name); removed++; } @@ -1089,7 +1328,7 @@ sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, c dictEntry *de; sentinelRedisInstance *instance = NULL; - redisAssert(ip || runid); /* User must pass at least one search param. */ + serverAssert(ip || runid); /* User must pass at least one search param. */ di = dictGetIterator(instances); while((de = dictNext(di)) != NULL) { sentinelRedisInstance *ri = dictGetVal(de); @@ -1148,42 +1387,45 @@ void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) { * 1) Remove all slaves. * 2) Remove all sentinels. * 3) Remove most of the flags resulting from runtime operations. - * 4) Reset timers to their default value. + * 4) Reset timers to their default value. For example after a reset it will be + * possible to failover again the same master ASAP, without waiting the + * failover timeout delay. * 5) In the process of doing this undo the failover if in progress. * 6) Disconnect the connections with the master (will reconnect automatically). */ #define SENTINEL_RESET_NO_SENTINELS (1<<0) void sentinelResetMaster(sentinelRedisInstance *ri, int flags) { - redisAssert(ri->flags & SRI_MASTER); + serverAssert(ri->flags & SRI_MASTER); dictRelease(ri->slaves); ri->slaves = dictCreate(&instancesDictType,NULL); if (!(flags & SENTINEL_RESET_NO_SENTINELS)) { dictRelease(ri->sentinels); ri->sentinels = dictCreate(&instancesDictType,NULL); } - if (ri->cc) sentinelKillLink(ri,ri->cc); - if (ri->pc) sentinelKillLink(ri,ri->pc); - ri->flags &= SRI_MASTER|SRI_DISCONNECTED; + instanceLinkCloseConnection(ri->link,ri->link->cc); + instanceLinkCloseConnection(ri->link,ri->link->pc); + ri->flags &= SRI_MASTER; if (ri->leader) { sdsfree(ri->leader); ri->leader = NULL; } ri->failover_state = SENTINEL_FAILOVER_STATE_NONE; ri->failover_state_change_time = 0; - ri->failover_start_time = 0; + ri->failover_start_time = 0; /* We can failover again ASAP. */ ri->promoted_slave = NULL; sdsfree(ri->runid); sdsfree(ri->slave_master_host); ri->runid = NULL; ri->slave_master_host = NULL; - ri->last_ping_time = mstime(); - ri->last_avail_time = mstime(); - ri->last_pong_time = mstime(); + ri->link->act_ping_time = mstime(); + ri->link->last_ping_time = 0; + ri->link->last_avail_time = mstime(); + ri->link->last_pong_time = mstime(); ri->role_reported_time = mstime(); ri->role_reported = SRI_MASTER; if (flags & SENTINEL_GENERATE_EVENT) - sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@"); + sentinelEvent(LL_WARNING,"+reset-master",ri,"%@"); } /* Call sentinelResetMaster() on every master with a name matching the specified @@ -1213,8 +1455,8 @@ int sentinelResetMastersByPattern(char *pattern, int flags) { * * This is used to handle the +switch-master event. * - * The function returns REDIS_ERR if the address can't be resolved for some - * reason. Otherwise REDIS_OK is returned. */ + * The function returns C_ERR if the address can't be resolved for some + * reason. Otherwise C_OK is returned. */ int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) { sentinelAddr *oldaddr, *newaddr; sentinelAddr **slaves = NULL; @@ -1223,7 +1465,7 @@ int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, dictEntry *de; newaddr = createSentinelAddr(ip,port); - if (newaddr == NULL) return REDIS_ERR; + if (newaddr == NULL) return C_ERR; /* Make a list of slaves to add back after the reset. * Don't include the one having the address we are switching to. */ @@ -1261,10 +1503,7 @@ int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, slave = createSentinelRedisInstance(NULL,SRI_SLAVE,slaves[j]->ip, slaves[j]->port, master->quorum, master); releaseSentinelAddr(slaves[j]); - if (slave) { - sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@"); - sentinelFlushConfig(); - } + if (slave) sentinelEvent(LL_NOTICE,"+slave",slave,"%@"); } zfree(slaves); @@ -1272,7 +1511,7 @@ int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, * gets the master->addr->ip and master->addr->port as arguments. */ releaseSentinelAddr(oldaddr); sentinelFlushConfig(); - return REDIS_OK; + return C_OK; } /* Return non-zero if there was no SDOWN or ODOWN error associated to this @@ -1322,6 +1561,13 @@ void sentinelPropagateDownAfterPeriod(sentinelRedisInstance *master) { } } +char *sentinelGetInstanceTypeString(sentinelRedisInstance *ri) { + if (ri->flags & SRI_MASTER) return "master"; + else if (ri->flags & SRI_SLAVE) return "slave"; + else if (ri->flags & SRI_SENTINEL) return "sentinel"; + else return "unknown"; +} + /* ============================ Config handling ============================= */ char *sentinelHandleConfiguration(char **argv, int argc) { sentinelRedisInstance *ri; @@ -1385,6 +1631,10 @@ char *sentinelHandleConfiguration(char **argv, int argc) { unsigned long long current_epoch = strtoull(argv[1],NULL,10); if (current_epoch > sentinel.current_epoch) sentinel.current_epoch = current_epoch; + } else if (!strcasecmp(argv[0],"myid") && argc == 2) { + if (strlen(argv[1]) != CONFIG_RUN_ID_SIZE) + return "Malformed Sentinel id in myid option."; + memcpy(sentinel.myid,argv[1],CONFIG_RUN_ID_SIZE); } else if (!strcasecmp(argv[0],"config-epoch") && argc == 3) { /* config-epoch <name> <epoch> */ ri = sentinelGetMasterByName(argv[1]); @@ -1415,15 +1665,25 @@ char *sentinelHandleConfiguration(char **argv, int argc) { (argc == 4 || argc == 5)) { sentinelRedisInstance *si; - /* known-sentinel <name> <ip> <port> [runid] */ - ri = sentinelGetMasterByName(argv[1]); - if (!ri) return "No such master with specified name."; - if ((si = createSentinelRedisInstance(NULL,SRI_SENTINEL,argv[2], - atoi(argv[3]), ri->quorum, ri)) == NULL) - { - return "Wrong hostname or port for sentinel."; + if (argc == 5) { /* Ignore the old form without runid. */ + /* known-sentinel <name> <ip> <port> [runid] */ + ri = sentinelGetMasterByName(argv[1]); + if (!ri) return "No such master with specified name."; + if ((si = createSentinelRedisInstance(argv[4],SRI_SENTINEL,argv[2], + atoi(argv[3]), ri->quorum, ri)) == NULL) + { + return "Wrong hostname or port for sentinel."; + } + si->runid = sdsnew(argv[4]); + sentinelTryConnectionSharing(si); } - if (argc == 5) si->runid = sdsnew(argv[4]); + } else if (!strcasecmp(argv[0],"announce-ip") && argc == 2) { + /* announce-ip <ip-address> */ + if (strlen(argv[1])) + sentinel.announce_ip = sdsnew(argv[1]); + } else if (!strcasecmp(argv[0],"announce-port") && argc == 2) { + /* announce-port <port> */ + sentinel.announce_port = atoi(argv[1]); } else { return "Unrecognized sentinel configuration statement."; } @@ -1440,6 +1700,10 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { dictEntry *de; sds line; + /* sentinel unique ID. */ + line = sdscatprintf(sdsempty(), "sentinel myid %s", sentinel.myid); + rewriteConfigRewriteLine(state,"sentinel",line,1); + /* For every master emit a "sentinel monitor" config entry. */ di = dictGetIterator(sentinel.masters); while((de = dictNext(di)) != NULL) { @@ -1531,7 +1795,7 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { slave_addr = master->addr; line = sdscatprintf(sdsempty(), "sentinel known-slave %s %s %d", - master->name, ri->addr->ip, ri->addr->port); + master->name, slave_addr->ip, slave_addr->port); rewriteConfigRewriteLine(state,"sentinel",line,1); } dictReleaseIterator(di2); @@ -1540,11 +1804,10 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { di2 = dictGetIterator(master->sentinels); while((de = dictNext(di2)) != NULL) { ri = dictGetVal(de); + if (ri->runid == NULL) continue; line = sdscatprintf(sdsempty(), - "sentinel known-sentinel %s %s %d%s%s", - master->name, ri->addr->ip, ri->addr->port, - ri->runid ? " " : "", - ri->runid ? ri->runid : ""); + "sentinel known-sentinel %s %s %d %s", + master->name, ri->addr->ip, ri->addr->port, ri->runid); rewriteConfigRewriteLine(state,"sentinel",line,1); } dictReleaseIterator(di2); @@ -1555,6 +1818,20 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { "sentinel current-epoch %llu", (unsigned long long) sentinel.current_epoch); rewriteConfigRewriteLine(state,"sentinel",line,1); + /* sentinel announce-ip. */ + if (sentinel.announce_ip) { + line = sdsnew("sentinel announce-ip "); + line = sdscatrepr(line, sentinel.announce_ip, sdslen(sentinel.announce_ip)); + rewriteConfigRewriteLine(state,"sentinel",line,1); + } + + /* sentinel announce-port. */ + if (sentinel.announce_port) { + line = sdscatprintf(sdsempty(),"sentinel announce-port %d", + sentinel.announce_port); + rewriteConfigRewriteLine(state,"sentinel",line,1); + } + dictReleaseIterator(di); } @@ -1570,7 +1847,7 @@ void sentinelFlushConfig(void) { int saved_hz = server.hz; int rewrite_status; - server.hz = REDIS_DEFAULT_HZ; + server.hz = CONFIG_DEFAULT_HZ; rewrite_status = rewriteConfig(server.configfile); server.hz = saved_hz; @@ -1582,61 +1859,11 @@ void sentinelFlushConfig(void) { werr: if (fd != -1) close(fd); - redisLog(REDIS_WARNING,"WARNING: Sentinel was not able to save the new configuration on disk!!!: %s", strerror(errno)); + serverLog(LL_WARNING,"WARNING: Sentinel was not able to save the new configuration on disk!!!: %s", strerror(errno)); } /* ====================== hiredis connection handling ======================= */ -/* Completely disconnect a hiredis link from an instance. */ -void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c) { - if (ri->cc == c) { - ri->cc = NULL; - ri->pending_commands = 0; - } - if (ri->pc == c) ri->pc = NULL; - c->data = NULL; - ri->flags |= SRI_DISCONNECTED; - redisAsyncFree(c); -} - -/* This function takes a hiredis context that is in an error condition - * and make sure to mark the instance as disconnected performing the - * cleanup needed. - * - * Note: we don't free the hiredis context as hiredis will do it for us - * for async connections. */ -void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) { - sentinelRedisInstance *ri = c->data; - int pubsub; - - if (ri == NULL) return; /* The instance no longer exists. */ - - pubsub = (ri->pc == c); - sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri, - "%@ #%s", c->errstr); - if (pubsub) - ri->pc = NULL; - else - ri->cc = NULL; - ri->flags |= SRI_DISCONNECTED; -} - -void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) { - if (status != REDIS_OK) { - sentinelDisconnectInstanceFromContext(c); - } else { - sentinelRedisInstance *ri = c->data; - int pubsub = (ri->pc == c); - - sentinelEvent(REDIS_DEBUG, pubsub ? "+pubsub-link" : "+cmd-link", ri, - "%@"); - } -} - -void sentinelDisconnectCallback(const redisAsyncContext *c, int status) { - sentinelDisconnectInstanceFromContext(c); -} - /* Send the AUTH command with the specified master password if needed. * Note that for slaves the password set for the master is used. * @@ -1648,8 +1875,8 @@ void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) { ri->master->auth_pass; if (auth_pass) { - if (redisAsyncCommand(c, sentinelDiscardReplyCallback, NULL, "AUTH %s", - auth_pass) == REDIS_OK) ri->pending_commands++; + if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "AUTH %s", + auth_pass) == C_OK) ri->link->pending_commands++; } } @@ -1662,77 +1889,84 @@ void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) { void sentinelSetClientName(sentinelRedisInstance *ri, redisAsyncContext *c, char *type) { char name[64]; - snprintf(name,sizeof(name),"sentinel-%.8s-%s",server.runid,type); - if (redisAsyncCommand(c, sentinelDiscardReplyCallback, NULL, - "CLIENT SETNAME %s", name) == REDIS_OK) + snprintf(name,sizeof(name),"sentinel-%.8s-%s",sentinel.myid,type); + if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, + "CLIENT SETNAME %s", name) == C_OK) { - ri->pending_commands++; + ri->link->pending_commands++; } } -/* Create the async connections for the specified instance if the instance - * is disconnected. Note that the SRI_DISCONNECTED flag is set even if just +/* Create the async connections for the instance link if the link + * is disconnected. Note that link->disconnected is true even if just * one of the two links (commands and pub/sub) is missing. */ void sentinelReconnectInstance(sentinelRedisInstance *ri) { - if (!(ri->flags & SRI_DISCONNECTED)) return; + if (ri->link->disconnected == 0) return; + if (ri->addr->port == 0) return; /* port == 0 means invalid address. */ + instanceLink *link = ri->link; + mstime_t now = mstime(); + + if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return; + ri->link->last_reconn_time = now; /* Commands connection. */ - if (ri->cc == NULL) { - ri->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,REDIS_BIND_ADDR); - if (ri->cc->err) { - sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s", - ri->cc->errstr); - sentinelKillLink(ri,ri->cc); + if (link->cc == NULL) { + link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR); + if (link->cc->err) { + sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s", + link->cc->errstr); + instanceLinkCloseConnection(link,link->cc); } else { - ri->cc_conn_time = mstime(); - ri->cc->data = ri; - redisAeAttach(server.el,ri->cc); - redisAsyncSetConnectCallback(ri->cc, - sentinelLinkEstablishedCallback); - redisAsyncSetDisconnectCallback(ri->cc, - sentinelDisconnectCallback); - sentinelSendAuthIfNeeded(ri,ri->cc); - sentinelSetClientName(ri,ri->cc,"cmd"); + link->pending_commands = 0; + link->cc_conn_time = mstime(); + link->cc->data = link; + redisAeAttach(server.el,link->cc); + redisAsyncSetConnectCallback(link->cc, + sentinelLinkEstablishedCallback); + redisAsyncSetDisconnectCallback(link->cc, + sentinelDisconnectCallback); + sentinelSendAuthIfNeeded(ri,link->cc); + sentinelSetClientName(ri,link->cc,"cmd"); /* Send a PING ASAP when reconnecting. */ sentinelSendPing(ri); } } /* Pub / Sub */ - if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && ri->pc == NULL) { - ri->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,REDIS_BIND_ADDR); - if (ri->pc->err) { - sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s", - ri->pc->errstr); - sentinelKillLink(ri,ri->pc); + if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) { + link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR); + if (link->pc->err) { + sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s", + link->pc->errstr); + instanceLinkCloseConnection(link,link->pc); } else { int retval; - ri->pc_conn_time = mstime(); - ri->pc->data = ri; - redisAeAttach(server.el,ri->pc); - redisAsyncSetConnectCallback(ri->pc, - sentinelLinkEstablishedCallback); - redisAsyncSetDisconnectCallback(ri->pc, - sentinelDisconnectCallback); - sentinelSendAuthIfNeeded(ri,ri->pc); - sentinelSetClientName(ri,ri->pc,"pubsub"); + link->pc_conn_time = mstime(); + link->pc->data = link; + redisAeAttach(server.el,link->pc); + redisAsyncSetConnectCallback(link->pc, + sentinelLinkEstablishedCallback); + redisAsyncSetDisconnectCallback(link->pc, + sentinelDisconnectCallback); + sentinelSendAuthIfNeeded(ri,link->pc); + sentinelSetClientName(ri,link->pc,"pubsub"); /* Now we subscribe to the Sentinels "Hello" channel. */ - retval = redisAsyncCommand(ri->pc, - sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s", + retval = redisAsyncCommand(link->pc, + sentinelReceiveHelloMessages, ri, "SUBSCRIBE %s", SENTINEL_HELLO_CHANNEL); - if (retval != REDIS_OK) { + if (retval != C_OK) { /* If we can't subscribe, the Pub/Sub connection is useless * and we can simply disconnect it and try again. */ - sentinelKillLink(ri,ri->pc); + instanceLinkCloseConnection(link,link->pc); return; } } } - /* Clear the DISCONNECTED flags only if we have both the connections + /* Clear the disconnected status only if we have both the connections * (or just the commands connection if this is a sentinel instance). */ - if (ri->cc && (ri->flags & SRI_SENTINEL || ri->pc)) - ri->flags &= ~SRI_DISCONNECTED; + if (link->cc && (ri->flags & SRI_SENTINEL || link->pc)) + link->disconnected = 0; } /* ======================== Redis instances pinging ======================== */ @@ -1756,6 +1990,10 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) { int numlines, j; int role = 0; + /* cache full INFO output for instance */ + sdsfree(ri->info); + ri->info = sdsnew(info); + /* The following fields must be reset to a given value in the case they * are not found at all in the INFO output. */ ri->master_link_down_time = 0; @@ -1772,7 +2010,7 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) { ri->runid = sdsnewlen(l+7,40); } else { if (strncmp(ri->runid,l+7,40) != 0) { - sentinelEvent(REDIS_NOTICE,"+reboot",ri,"%@"); + sentinelEvent(LL_NOTICE,"+reboot",ri,"%@"); sdsfree(ri->runid); ri->runid = sdsnewlen(l+7,40); } @@ -1813,7 +2051,8 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) { if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip, atoi(port), ri->quorum, ri)) != NULL) { - sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@"); + sentinelEvent(LL_NOTICE,"+slave",slave,"%@"); + sentinelFlushConfig(); } } } @@ -1882,7 +2121,7 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) { if (role == SRI_SLAVE) ri->slave_conf_change_time = mstime(); /* Log the event with +role-change if the new role is coherent or * with -role-change if there is a mismatch with the current config. */ - sentinelEvent(REDIS_VERBOSE, + sentinelEvent(LL_VERBOSE, ((ri->flags & (SRI_MASTER|SRI_SLAVE)) == role) ? "+role-change" : "-role-change", ri, "%@ new reported role is %s", @@ -1919,8 +2158,11 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) { ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES; ri->master->failover_state_change_time = mstime(); sentinelFlushConfig(); - sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@"); - sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves", + sentinelEvent(LL_WARNING,"+promoted-slave",ri,"%@"); + if (sentinel.simfailure_flags & + SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION) + sentinelSimFailureCrash(); + sentinelEvent(LL_WARNING,"+failover-state-reconf-slaves", ri->master,"%@"); sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER, "start",ri->master->addr,ri->addr); @@ -1939,8 +2181,8 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) { int retval = sentinelSendSlaveOf(ri, ri->master->addr->ip, ri->master->addr->port); - if (retval == REDIS_OK) - sentinelEvent(REDIS_NOTICE,"+convert-to-slave",ri,"%@"); + if (retval == C_OK) + sentinelEvent(LL_NOTICE,"+convert-to-slave",ri,"%@"); } } } @@ -1962,8 +2204,8 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) { int retval = sentinelSendSlaveOf(ri, ri->master->addr->ip, ri->master->addr->port); - if (retval == REDIS_OK) - sentinelEvent(REDIS_NOTICE,"+fix-slave-config",ri,"%@"); + if (retval == C_OK) + sentinelEvent(LL_NOTICE,"+fix-slave-config",ri,"%@"); } } @@ -1981,7 +2223,7 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) { { ri->flags &= ~SRI_RECONF_SENT; ri->flags |= SRI_RECONF_INPROG; - sentinelEvent(REDIS_NOTICE,"+slave-reconf-inprog",ri,"%@"); + sentinelEvent(LL_NOTICE,"+slave-reconf-inprog",ri,"%@"); } /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */ @@ -1990,38 +2232,41 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) { { ri->flags &= ~SRI_RECONF_INPROG; ri->flags |= SRI_RECONF_DONE; - sentinelEvent(REDIS_NOTICE,"+slave-reconf-done",ri,"%@"); + sentinelEvent(LL_NOTICE,"+slave-reconf-done",ri,"%@"); } } } void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) { - sentinelRedisInstance *ri = c->data; + sentinelRedisInstance *ri = privdata; + instanceLink *link = c->data; redisReply *r; - if (ri) ri->pending_commands--; - if (!reply || !ri) return; + if (!reply || !link) return; + link->pending_commands--; r = reply; - if (r->type == REDIS_REPLY_STRING) { + if (r->type == REDIS_REPLY_STRING) sentinelRefreshInstanceInfo(ri,r->str); - } } /* Just discard the reply. We use this when we are not monitoring the return * value of the command but its effects directly. */ void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) { - sentinelRedisInstance *ri = c->data; + instanceLink *link = c->data; + UNUSED(reply); + UNUSED(privdata); - if (ri) ri->pending_commands--; + if (link) link->pending_commands--; } void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) { - sentinelRedisInstance *ri = c->data; + sentinelRedisInstance *ri = privdata; + instanceLink *link = c->data; redisReply *r; - if (ri) ri->pending_commands--; - if (!reply || !ri) return; + if (!reply || !link) return; + link->pending_commands--; r = reply; if (r->type == REDIS_REPLY_STATUS || @@ -2032,8 +2277,8 @@ void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata strncmp(r->str,"LOADING",7) == 0 || strncmp(r->str,"MASTERDOWN",10) == 0) { - ri->last_avail_time = mstime(); - ri->last_ping_time = 0; /* Flag the pong as received. */ + link->last_avail_time = mstime(); + link->act_ping_time = 0; /* Flag the pong as received. */ } else { /* Send a SCRIPT KILL command if the instance appears to be * down because of a busy script. */ @@ -2041,25 +2286,26 @@ void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata (ri->flags & SRI_S_DOWN) && !(ri->flags & SRI_SCRIPT_KILL_SENT)) { - if (redisAsyncCommand(ri->cc, - sentinelDiscardReplyCallback, NULL, - "SCRIPT KILL") == REDIS_OK) - ri->pending_commands++; + if (redisAsyncCommand(ri->link->cc, + sentinelDiscardReplyCallback, ri, + "SCRIPT KILL") == C_OK) + ri->link->pending_commands++; ri->flags |= SRI_SCRIPT_KILL_SENT; } } } - ri->last_pong_time = mstime(); + link->last_pong_time = mstime(); } /* This is called when we get the reply about the PUBLISH command we send * to the master to advertise this sentinel. */ void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) { - sentinelRedisInstance *ri = c->data; + sentinelRedisInstance *ri = privdata; + instanceLink *link = c->data; redisReply *r; - if (ri) ri->pending_commands--; - if (!reply || !ri) return; + if (!reply || !link) return; + link->pending_commands--; r = reply; /* Only update pub_time if we actually published our message. Otherwise @@ -2072,7 +2318,7 @@ void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privd * or sent directly to this sentinel via the (fake) PUBLISH command of Sentinel. * * If the master name specified in the message is not known, the message is - * discareded. */ + * discarded. */ void sentinelProcessHelloMessage(char *hello, int hello_len) { /* Format is composed of 8 tokens: * 0=ip,1=port,2=runid,3=current_epoch,4=master_name, @@ -2097,25 +2343,39 @@ void sentinelProcessHelloMessage(char *hello, int hello_len) { if (!si) { /* If not, remove all the sentinels that have the same runid - * OR the same ip/port, because it's either a restart or a - * network topology change. */ - removed = removeMatchingSentinelsFromMaster(master,token[0],port, - token[2]); + * because there was an address change, and add the same Sentinel + * with the new address back. */ + removed = removeMatchingSentinelFromMaster(master,token[2]); if (removed) { - sentinelEvent(REDIS_NOTICE,"-dup-sentinel",master, - "%@ #duplicate of %s:%d or %s", - token[0],port,token[2]); + sentinelEvent(LL_NOTICE,"+sentinel-address-switch",master, + "%@ ip %s port %d for %s", token[0],port,token[2]); + } else { + /* Check if there is another Sentinel with the same address this + * new one is reporting. What we do if this happens is to set its + * port to 0, to signal the address is invalid. We'll update it + * later if we get an HELLO message. */ + sentinelRedisInstance *other = + getSentinelRedisInstanceByAddrAndRunID( + master->sentinels, token[0],port,NULL); + if (other) { + sentinelEvent(LL_NOTICE,"+sentinel-invalid-addr",other,"%@"); + other->addr->port = 0; /* It means: invalid address. */ + sentinelUpdateSentinelAddressInAllMasters(other); + } } /* Add the new sentinel. */ - si = createSentinelRedisInstance(NULL,SRI_SENTINEL, + si = createSentinelRedisInstance(token[2],SRI_SENTINEL, token[0],port,master->quorum,master); + if (si) { - sentinelEvent(REDIS_NOTICE,"+sentinel",si,"%@"); + if (!removed) sentinelEvent(LL_NOTICE,"+sentinel",si,"%@"); /* The runid is NULL after a new instance creation and * for Sentinels we don't have a later chance to fill it, * so do it now. */ si->runid = sdsnew(token[2]); + sentinelTryConnectionSharing(si); + if (removed) sentinelUpdateSentinelAddressInAllMasters(si); sentinelFlushConfig(); } } @@ -2124,20 +2384,20 @@ void sentinelProcessHelloMessage(char *hello, int hello_len) { if (current_epoch > sentinel.current_epoch) { sentinel.current_epoch = current_epoch; sentinelFlushConfig(); - sentinelEvent(REDIS_WARNING,"+new-epoch",master,"%llu", + sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu", (unsigned long long) sentinel.current_epoch); } /* Update master info if received configuration is newer. */ - if (master->config_epoch < master_config_epoch) { + if (si && master->config_epoch < master_config_epoch) { master->config_epoch = master_config_epoch; if (master_port != master->addr->port || strcmp(master->addr->ip, token[5])) { sentinelAddr *old_addr; - sentinelEvent(REDIS_WARNING,"+config-update-from",si,"%@"); - sentinelEvent(REDIS_WARNING,"+switch-master", + sentinelEvent(LL_WARNING,"+config-update-from",si,"%@"); + sentinelEvent(LL_WARNING,"+switch-master", master,"%s %s %d %s %d", master->name, master->addr->ip, master->addr->port, @@ -2164,8 +2424,9 @@ cleanup: /* This is our Pub/Sub callback for the Hello channel. It's useful in order * to discover other sentinels attached at the same master. */ void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) { - sentinelRedisInstance *ri = c->data; + sentinelRedisInstance *ri = privdata; redisReply *r; + UNUSED(c); if (!reply || !ri) return; r = reply; @@ -2173,7 +2434,7 @@ void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privd /* Update the last activity in the pubsub channel. Note that since we * receive our messages as well this timestamp can be used to detect * if the link is probably disconnected even if it seems otherwise. */ - ri->pc_last_activity = mstime(); + ri->link->pc_last_activity = mstime(); /* Sanity check in the reply we expect, so that the code that follows * can avoid to check for details. */ @@ -2185,7 +2446,7 @@ void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privd strcmp(r->element[0]->str,"message") != 0) return; /* We are not interested in meeting ourselves */ - if (strstr(r->element[2]->str,server.runid) != NULL) return; + if (strstr(r->element[2]->str,sentinel.myid) != NULL) return; sentinelProcessHelloMessage(r->element[2]->str, r->element[2]->len); } @@ -2199,34 +2460,46 @@ void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privd * sentinel_ip,sentinel_port,sentinel_runid,current_epoch, * master_name,master_ip,master_port,master_config_epoch. * - * Returns REDIS_OK if the PUBLISH was queued correctly, otherwise - * REDIS_ERR is returned. */ + * Returns C_OK if the PUBLISH was queued correctly, otherwise + * C_ERR is returned. */ int sentinelSendHello(sentinelRedisInstance *ri) { - char ip[REDIS_IP_STR_LEN]; - char payload[REDIS_IP_STR_LEN+1024]; + char ip[NET_IP_STR_LEN]; + char payload[NET_IP_STR_LEN+1024]; int retval; + char *announce_ip; + int announce_port; sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master; sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master); - /* Try to obtain our own IP address. */ - if (anetSockName(ri->cc->c.fd,ip,sizeof(ip),NULL) == -1) return REDIS_ERR; - if (ri->flags & SRI_DISCONNECTED) return REDIS_ERR; + if (ri->link->disconnected) return C_ERR; + + /* Use the specified announce address if specified, otherwise try to + * obtain our own IP address. */ + if (sentinel.announce_ip) { + announce_ip = sentinel.announce_ip; + } else { + if (anetSockName(ri->link->cc->c.fd,ip,sizeof(ip),NULL) == -1) + return C_ERR; + announce_ip = ip; + } + announce_port = sentinel.announce_port ? + sentinel.announce_port : server.port; /* Format and send the Hello message. */ snprintf(payload,sizeof(payload), "%s,%d,%s,%llu," /* Info about this sentinel. */ "%s,%s,%d,%llu", /* Info about current master. */ - ip, server.port, server.runid, + announce_ip, announce_port, sentinel.myid, (unsigned long long) sentinel.current_epoch, /* --- */ master->name,master_addr->ip,master_addr->port, (unsigned long long) master->config_epoch); - retval = redisAsyncCommand(ri->cc, - sentinelPublishReplyCallback, NULL, "PUBLISH %s %s", + retval = redisAsyncCommand(ri->link->cc, + sentinelPublishReplyCallback, ri, "PUBLISH %s %s", SENTINEL_HELLO_CHANNEL,payload); - if (retval != REDIS_OK) return REDIS_ERR; - ri->pending_commands++; - return REDIS_OK; + if (retval != C_OK) return C_ERR; + ri->link->pending_commands++; + return C_OK; } /* Reset last_pub_time in all the instances in the specified dictionary @@ -2253,28 +2526,30 @@ void sentinelForceHelloUpdateDictOfRedisInstances(dict *instances) { * Sentinel upgrades a configuration it is a good idea to deliever an update * to the other Sentinels ASAP. */ int sentinelForceHelloUpdateForMaster(sentinelRedisInstance *master) { - if (!(master->flags & SRI_MASTER)) return REDIS_ERR; + if (!(master->flags & SRI_MASTER)) return C_ERR; if (master->last_pub_time >= (SENTINEL_PUBLISH_PERIOD+1)) master->last_pub_time -= (SENTINEL_PUBLISH_PERIOD+1); sentinelForceHelloUpdateDictOfRedisInstances(master->sentinels); sentinelForceHelloUpdateDictOfRedisInstances(master->slaves); - return REDIS_OK; + return C_OK; } -/* Send a PING to the specified instance and refresh the last_ping_time +/* Send a PING to the specified instance and refresh the act_ping_time * if it is zero (that is, if we received a pong for the previous ping). * * On error zero is returned, and we can't consider the PING command * queued in the connection. */ int sentinelSendPing(sentinelRedisInstance *ri) { - int retval = redisAsyncCommand(ri->cc, - sentinelPingReplyCallback, NULL, "PING"); - if (retval == REDIS_OK) { - ri->pending_commands++; - /* We update the ping time only if we received the pong for - * the previous ping, otherwise we are technically waiting - * since the first ping that did not received a reply. */ - if (ri->last_ping_time == 0) ri->last_ping_time = mstime(); + int retval = redisAsyncCommand(ri->link->cc, + sentinelPingReplyCallback, ri, "PING"); + if (retval == C_OK) { + ri->link->pending_commands++; + ri->link->last_ping_time = mstime(); + /* We update the active ping time only if we received the pong for + * the previous ping, otherwise we are technically waiting since the + * first ping that did not received a reply. */ + if (ri->link->act_ping_time == 0) + ri->link->act_ping_time = ri->link->last_ping_time; return 1; } else { return 0; @@ -2290,7 +2565,7 @@ void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) { /* Return ASAP if we have already a PING or INFO already pending, or * in the case the instance is not properly connected. */ - if (ri->flags & SRI_DISCONNECTED) return; + if (ri->link->disconnected) return; /* For INFO, PING, PUBLISH that are not critical commands to send we * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't @@ -2298,14 +2573,21 @@ void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) { * properly (note that anyway there is a redundant protection about this, * that is, the link will be disconnected and reconnected if a long * timeout condition is detected. */ - if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return; + if (ri->link->pending_commands >= + SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return; /* If this is a slave of a master in O_DOWN condition we start sending * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD * period. In this state we want to closely monitor slaves in case they - * are turned into masters by another Sentinel, or by the sysadmin. */ + * are turned into masters by another Sentinel, or by the sysadmin. + * + * Similarly we monitor the INFO output more often if the slave reports + * to be disconnected from the master, so that we can have a fresh + * disconnection time figure. */ if ((ri->flags & SRI_SLAVE) && - (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) { + ((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) || + (ri->master_link_down_time != 0))) + { info_period = 1000; } else { info_period = SENTINEL_INFO_PERIOD; @@ -2322,10 +2604,11 @@ void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) { (now - ri->info_refresh) > info_period)) { /* Send INFO to masters and slaves, not sentinels. */ - retval = redisAsyncCommand(ri->cc, - sentinelInfoReplyCallback, NULL, "INFO"); - if (retval == REDIS_OK) ri->pending_commands++; - } else if ((now - ri->last_pong_time) > ping_period) { + retval = redisAsyncCommand(ri->link->cc, + sentinelInfoReplyCallback, ri, "INFO"); + if (retval == C_OK) ri->link->pending_commands++; + } else if ((now - ri->link->last_pong_time) > ping_period && + (now - ri->link->last_ping_time) > ping_period/2) { /* Send PING to all the three kinds of instances. */ sentinelSendPing(ri); } else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) { @@ -2350,7 +2633,7 @@ const char *sentinelFailoverStateStr(int state) { } /* Redis instance to Redis protocol representation. */ -void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) { +void addReplySentinelRedisInstance(client *c, sentinelRedisInstance *ri) { char *flags = sdsempty(); void *mbl; int fields = 0; @@ -2379,7 +2662,7 @@ void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) { if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,"); if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,"); if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,"); - if (ri->flags & SRI_DISCONNECTED) flags = sdscat(flags,"disconnected,"); + if (ri->link->disconnected) flags = sdscat(flags,"disconnected,"); if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,"); if (ri->flags & SRI_FAILOVER_IN_PROGRESS) flags = sdscat(flags,"failover_in_progress,"); @@ -2393,8 +2676,12 @@ void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) { sdsfree(flags); fields++; - addReplyBulkCString(c,"pending-commands"); - addReplyBulkLongLong(c,ri->pending_commands); + addReplyBulkCString(c,"link-pending-commands"); + addReplyBulkLongLong(c,ri->link->pending_commands); + fields++; + + addReplyBulkCString(c,"link-refcount"); + addReplyBulkLongLong(c,ri->link->refcount); fields++; if (ri->flags & SRI_FAILOVER_IN_PROGRESS) { @@ -2405,15 +2692,15 @@ void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) { addReplyBulkCString(c,"last-ping-sent"); addReplyBulkLongLong(c, - ri->last_ping_time ? (mstime() - ri->last_ping_time) : 0); + ri->link->act_ping_time ? (mstime() - ri->link->act_ping_time) : 0); fields++; addReplyBulkCString(c,"last-ok-ping-reply"); - addReplyBulkLongLong(c,mstime() - ri->last_avail_time); + addReplyBulkLongLong(c,mstime() - ri->link->last_avail_time); fields++; addReplyBulkCString(c,"last-ping-reply"); - addReplyBulkLongLong(c,mstime() - ri->last_pong_time); + addReplyBulkLongLong(c,mstime() - ri->link->last_pong_time); fields++; if (ri->flags & SRI_S_DOWN) { @@ -2537,7 +2824,7 @@ void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) { /* Output a number of instances contained inside a dictionary as * Redis protocol. */ -void addReplyDictOfRedisInstances(redisClient *c, dict *instances) { +void addReplyDictOfRedisInstances(client *c, dict *instances) { dictIterator *di; dictEntry *de; @@ -2554,12 +2841,12 @@ void addReplyDictOfRedisInstances(redisClient *c, dict *instances) { /* Lookup the named master into sentinel.masters. * If the master is not found reply to the client with an error and returns * NULL. */ -sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c, +sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(client *c, robj *name) { sentinelRedisInstance *ri; - ri = dictFetchValue(sentinel.masters,c->argv[2]->ptr); + ri = dictFetchValue(sentinel.masters,name->ptr); if (!ri) { addReplyError(c,"No such master with that name"); return NULL; @@ -2567,7 +2854,32 @@ sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c, return ri; } -void sentinelCommand(redisClient *c) { +#define SENTINEL_ISQR_OK 0 +#define SENTINEL_ISQR_NOQUORUM (1<<0) +#define SENTINEL_ISQR_NOAUTH (1<<1) +int sentinelIsQuorumReachable(sentinelRedisInstance *master, int *usableptr) { + dictIterator *di; + dictEntry *de; + int usable = 1; /* Number of usable Sentinels. Init to 1 to count myself. */ + int result = SENTINEL_ISQR_OK; + int voters = dictSize(master->sentinels)+1; /* Known Sentinels + myself. */ + + di = dictGetIterator(master->sentinels); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + + if (ri->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue; + usable++; + } + dictReleaseIterator(di); + + if (usable < (int)master->quorum) result |= SENTINEL_ISQR_NOQUORUM; + if (usable < voters/2+1) result |= SENTINEL_ISQR_NOAUTH; + if (usableptr) *usableptr = usable; + return result; +} + +void sentinelCommand(client *c) { if (!strcasecmp(c->argv[1]->ptr,"masters")) { /* SENTINEL MASTERS */ if (c->argc != 2) goto numargserr; @@ -2597,7 +2909,23 @@ void sentinelCommand(redisClient *c) { return; addReplyDictOfRedisInstances(c,ri->sentinels); } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) { - /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid>*/ + /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid> + * + * Arguments: + * + * ip and port are the ip and port of the master we want to be + * checked by Sentinel. Note that the command will not check by + * name but just by master, in theory different Sentinels may monitor + * differnet masters with the same name. + * + * current-epoch is needed in order to understand if we are allowed + * to vote for a failover leader or not. Each Sentinel can vote just + * one time per epoch. + * + * runid is "*" if we are not seeking for a vote from the Sentinel + * in order to elect the failover leader. Otherwise it is set to the + * runid we want the Sentinel to vote if it did not already voted. + */ sentinelRedisInstance *ri; long long req_epoch; uint64_t leader_epoch = 0; @@ -2606,9 +2934,9 @@ void sentinelCommand(redisClient *c) { int isdown = 0; if (c->argc != 6) goto numargserr; - if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK || + if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != C_OK || getLongLongFromObjectOrReply(c,c->argv[4],&req_epoch,NULL) - != REDIS_OK) + != C_OK) return; ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters, c->argv[2]->ptr,port,NULL); @@ -2668,7 +2996,7 @@ void sentinelCommand(redisClient *c) { addReplySds(c,sdsnew("-NOGOODSLAVE No suitable slave to promote\r\n")); return; } - redisLog(REDIS_WARNING,"Executing user requested FAILOVER of '%s'", + serverLog(LL_WARNING,"Executing user requested FAILOVER of '%s'", ri->name); sentinelStartFailover(ri); ri->flags |= SRI_FORCE_FAILOVER; @@ -2682,17 +3010,23 @@ void sentinelCommand(redisClient *c) { /* SENTINEL MONITOR <name> <ip> <port> <quorum> */ sentinelRedisInstance *ri; long quorum, port; - char buf[32]; + char ip[NET_IP_STR_LEN]; if (c->argc != 6) goto numargserr; if (getLongFromObjectOrReply(c,c->argv[5],&quorum,"Invalid quorum") - != REDIS_OK) return; + != C_OK) return; if (getLongFromObjectOrReply(c,c->argv[4],&port,"Invalid port") - != REDIS_OK) return; + != C_OK) return; + + if (quorum <= 0) { + addReplyError(c, "Quorum must be 1 or greater."); + return; + } + /* Make sure the IP field is actually a valid IP before passing it * to createSentinelRedisInstance(), otherwise we may trigger a * DNS lookup at runtime. */ - if (anetResolveIP(NULL,c->argv[3]->ptr,buf,sizeof(buf)) == ANET_ERR) { + if (anetResolveIP(NULL,c->argv[3]->ptr,ip,sizeof(ip)) == ANET_ERR) { addReplyError(c,"Invalid IP address specified"); return; } @@ -2714,22 +3048,144 @@ void sentinelCommand(redisClient *c) { } } else { sentinelFlushConfig(); - sentinelEvent(REDIS_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum); + sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum); addReply(c,shared.ok); } + } else if (!strcasecmp(c->argv[1]->ptr,"flushconfig")) { + if (c->argc != 2) goto numargserr; + sentinelFlushConfig(); + addReply(c,shared.ok); + return; } else if (!strcasecmp(c->argv[1]->ptr,"remove")) { /* SENTINEL REMOVE <name> */ sentinelRedisInstance *ri; + if (c->argc != 3) goto numargserr; if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL) return; - sentinelEvent(REDIS_WARNING,"-monitor",ri,"%@"); + sentinelEvent(LL_WARNING,"-monitor",ri,"%@"); dictDelete(sentinel.masters,c->argv[2]->ptr); sentinelFlushConfig(); addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"ckquorum")) { + /* SENTINEL CKQUORUM <name> */ + sentinelRedisInstance *ri; + int usable; + + if (c->argc != 3) goto numargserr; + if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) + == NULL) return; + int result = sentinelIsQuorumReachable(ri,&usable); + if (result == SENTINEL_ISQR_OK) { + addReplySds(c, sdscatfmt(sdsempty(), + "+OK %i usable Sentinels. Quorum and failover authorization " + "can be reached\r\n",usable)); + } else { + sds e = sdscatfmt(sdsempty(), + "-NOQUORUM %i usable Sentinels. ",usable); + if (result & SENTINEL_ISQR_NOQUORUM) + e = sdscat(e,"Not enough available Sentinels to reach the" + " specified quorum for this master"); + if (result & SENTINEL_ISQR_NOAUTH) { + if (result & SENTINEL_ISQR_NOQUORUM) e = sdscat(e,". "); + e = sdscat(e, "Not enough available Sentinels to reach the" + " majority and authorize a failover"); + } + e = sdscat(e,"\r\n"); + addReplySds(c,e); + } } else if (!strcasecmp(c->argv[1]->ptr,"set")) { if (c->argc < 3 || c->argc % 2 == 0) goto numargserr; sentinelSetCommand(c); + } else if (!strcasecmp(c->argv[1]->ptr,"info-cache")) { + /* SENTINEL INFO-CACHE <name> */ + if (c->argc < 2) goto numargserr; + mstime_t now = mstime(); + + /* Create an ad-hoc dictionary type so that we can iterate + * a dictionary composed of just the master groups the user + * requested. */ + dictType copy_keeper = instancesDictType; + copy_keeper.valDestructor = NULL; + dict *masters_local = sentinel.masters; + if (c->argc > 2) { + masters_local = dictCreate(©_keeper, NULL); + + for (int i = 2; i < c->argc; i++) { + sentinelRedisInstance *ri; + ri = sentinelGetMasterByName(c->argv[i]->ptr); + if (!ri) continue; /* ignore non-existing names */ + dictAdd(masters_local, ri->name, ri); + } + } + + /* Reply format: + * 1.) master name + * 2.) 1.) info from master + * 2.) info from replica + * ... + * 3.) other master name + * ... + */ + addReplyMultiBulkLen(c,dictSize(masters_local) * 2); + + dictIterator *di; + dictEntry *de; + di = dictGetIterator(masters_local); + while ((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + addReplyBulkCBuffer(c,ri->name,strlen(ri->name)); + addReplyMultiBulkLen(c,dictSize(ri->slaves) + 1); /* +1 for self */ + addReplyMultiBulkLen(c,2); + addReplyLongLong(c, now - ri->info_refresh); + if (ri->info) + addReplyBulkCBuffer(c,ri->info,sdslen(ri->info)); + else + addReply(c,shared.nullbulk); + + dictIterator *sdi; + dictEntry *sde; + sdi = dictGetIterator(ri->slaves); + while ((sde = dictNext(sdi)) != NULL) { + sentinelRedisInstance *sri = dictGetVal(sde); + addReplyMultiBulkLen(c,2); + addReplyLongLong(c, now - sri->info_refresh); + if (sri->info) + addReplyBulkCBuffer(c,sri->info,sdslen(sri->info)); + else + addReply(c,shared.nullbulk); + } + dictReleaseIterator(sdi); + } + dictReleaseIterator(di); + if (masters_local != sentinel.masters) dictRelease(masters_local); + } else if (!strcasecmp(c->argv[1]->ptr,"simulate-failure")) { + /* SENTINEL SIMULATE-FAILURE <flag> <flag> ... <flag> */ + int j; + + sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE; + for (j = 2; j < c->argc; j++) { + if (!strcasecmp(c->argv[j]->ptr,"crash-after-election")) { + sentinel.simfailure_flags |= + SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION; + serverLog(LL_WARNING,"Failure simulation: this Sentinel " + "will crash after being successfully elected as failover " + "leader"); + } else if (!strcasecmp(c->argv[j]->ptr,"crash-after-promotion")) { + sentinel.simfailure_flags |= + SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION; + serverLog(LL_WARNING,"Failure simulation: this Sentinel " + "will crash after promoting the selected slave to master"); + } else if (!strcasecmp(c->argv[j]->ptr,"help")) { + addReplyMultiBulkLen(c,2); + addReplyBulkCString(c,"crash-after-election"); + addReplyBulkCString(c,"crash-after-promotion"); + } else { + addReplyError(c,"Unknown failure simulation specified"); + return; + } + } + addReply(c,shared.ok); } else { addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'", (char*)c->argv[1]->ptr); @@ -2741,26 +3197,41 @@ numargserr: (char*)c->argv[1]->ptr); } -/* SENTINEL INFO [section] */ -void sentinelInfoCommand(redisClient *c) { - char *section = c->argc == 2 ? c->argv[1]->ptr : "default"; - sds info = sdsempty(); - int defsections = !strcasecmp(section,"default"); - int sections = 0; +#define info_section_from_redis(section_name) do { \ + if (defsections || allsections || !strcasecmp(section,section_name)) { \ + sds redissection; \ + if (sections++) info = sdscat(info,"\r\n"); \ + redissection = genRedisInfoString(section_name); \ + info = sdscatlen(info,redissection,sdslen(redissection)); \ + sdsfree(redissection); \ + } \ +} while(0) +/* SENTINEL INFO [section] */ +void sentinelInfoCommand(client *c) { if (c->argc > 2) { addReply(c,shared.syntaxerr); return; } - if (!strcasecmp(section,"server") || defsections) { - if (sections++) info = sdscat(info,"\r\n"); - sds serversection = genRedisInfoString("server"); - info = sdscatlen(info,serversection,sdslen(serversection)); - sdsfree(serversection); + int defsections = 0, allsections = 0; + char *section = c->argc == 2 ? c->argv[1]->ptr : NULL; + if (section) { + allsections = !strcasecmp(section,"all"); + defsections = !strcasecmp(section,"default"); + } else { + defsections = 1; } - if (!strcasecmp(section,"sentinel") || defsections) { + int sections = 0; + sds info = sdsempty(); + + info_section_from_redis("server"); + info_section_from_redis("clients"); + info_section_from_redis("cpu"); + info_section_from_redis("stats"); + + if (defsections || allsections || !strcasecmp(section,"sentinel")) { dictIterator *di; dictEntry *de; int master_id = 0; @@ -2771,11 +3242,13 @@ void sentinelInfoCommand(redisClient *c) { "sentinel_masters:%lu\r\n" "sentinel_tilt:%d\r\n" "sentinel_running_scripts:%d\r\n" - "sentinel_scripts_queue_length:%ld\r\n", + "sentinel_scripts_queue_length:%ld\r\n" + "sentinel_simulate_failure_flags:%lu\r\n", dictSize(sentinel.masters), sentinel.tilt, sentinel.running_scripts, - listLength(sentinel.scripts_queue)); + listLength(sentinel.scripts_queue), + sentinel.simfailure_flags); di = dictGetIterator(sentinel.masters); while((de = dictNext(di)) != NULL) { @@ -2795,15 +3268,12 @@ void sentinelInfoCommand(redisClient *c) { dictReleaseIterator(di); } - addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n", - (unsigned long)sdslen(info))); - addReplySds(c,info); - addReply(c,shared.crlf); + addReplyBulkSds(c, info); } /* Implements Sentinel verison of the ROLE command. The output is * "sentinel" and the list of currently monitored master names. */ -void sentinelRoleCommand(redisClient *c) { +void sentinelRoleCommand(client *c) { dictIterator *di; dictEntry *de; @@ -2821,7 +3291,7 @@ void sentinelRoleCommand(redisClient *c) { } /* SENTINEL SET <mastername> [<option> <value> ...] */ -void sentinelSetCommand(redisClient *c) { +void sentinelSetCommand(client *c) { sentinelRedisInstance *ri; int j, changes = 0; char *option, *value; @@ -2838,20 +3308,20 @@ void sentinelSetCommand(redisClient *c) { if (!strcasecmp(option,"down-after-milliseconds")) { /* down-after-millisecodns <milliseconds> */ - if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll <= 0) + if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) goto badfmt; ri->down_after_period = ll; sentinelPropagateDownAfterPeriod(ri); changes++; } else if (!strcasecmp(option,"failover-timeout")) { /* failover-timeout <milliseconds> */ - if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll <= 0) + if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) goto badfmt; ri->failover_timeout = ll; changes++; } else if (!strcasecmp(option,"parallel-syncs")) { /* parallel-syncs <milliseconds> */ - if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll <= 0) + if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) goto badfmt; ri->parallel_syncs = ll; changes++; @@ -2885,7 +3355,7 @@ void sentinelSetCommand(redisClient *c) { changes++; } else if (!strcasecmp(option,"quorum")) { /* quorum <count> */ - if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll <= 0) + if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) goto badfmt; ri->quorum = ll; changes++; @@ -2895,7 +3365,7 @@ void sentinelSetCommand(redisClient *c) { if (changes) sentinelFlushConfig(); return; } - sentinelEvent(REDIS_WARNING,"+set",ri,"%@ %s %s",option,value); + sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s",option,value); } if (changes) sentinelFlushConfig(); @@ -2914,7 +3384,7 @@ badfmt: /* Bad format errors */ * * Because we have a Sentinel PUBLISH, the code to send hello messages is the same * for all the three kind of instances: masters, slaves, sentinels. */ -void sentinelPublishCommand(redisClient *c) { +void sentinelPublishCommand(client *c) { if (strcmp(c->argv[1]->ptr,SENTINEL_HELLO_CHANNEL)) { addReplyError(c, "Only HELLO messages are accepted by Sentinel instances."); return; @@ -2929,8 +3399,10 @@ void sentinelPublishCommand(redisClient *c) { void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) { mstime_t elapsed = 0; - if (ri->last_ping_time) - elapsed = mstime() - ri->last_ping_time; + if (ri->link->act_ping_time) + elapsed = mstime() - ri->link->act_ping_time; + else if (ri->link->disconnected) + elapsed = mstime() - ri->link->last_avail_time; /* Check if we are in need for a reconnection of one of the * links, because we are detecting low activity. @@ -2938,15 +3410,16 @@ void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) { * 1) Check if the command link seems connected, was connected not less * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have a * pending ping for more than half the timeout. */ - if (ri->cc && - (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD && - ri->last_ping_time != 0 && /* Ther is a pending ping... */ + if (ri->link->cc && + (mstime() - ri->link->cc_conn_time) > + SENTINEL_MIN_LINK_RECONNECT_PERIOD && + ri->link->act_ping_time != 0 && /* Ther is a pending ping... */ /* The pending ping is delayed, and we did not received * error replies as well. */ - (mstime() - ri->last_ping_time) > (ri->down_after_period/2) && - (mstime() - ri->last_pong_time) > (ri->down_after_period/2)) + (mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) && + (mstime() - ri->link->last_pong_time) > (ri->down_after_period/2)) { - sentinelKillLink(ri,ri->cc); + instanceLinkCloseConnection(ri->link,ri->link->cc); } /* 2) Check if the pubsub link seems connected, was connected not less @@ -2954,11 +3427,12 @@ void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) { * activity in the Pub/Sub channel for more than * SENTINEL_PUBLISH_PERIOD * 3. */ - if (ri->pc && - (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD && - (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3)) + if (ri->link->pc && + (mstime() - ri->link->pc_conn_time) > + SENTINEL_MIN_LINK_RECONNECT_PERIOD && + (mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3)) { - sentinelKillLink(ri,ri->pc); + instanceLinkCloseConnection(ri->link,ri->link->pc); } /* Update the SDOWN flag. We believe the instance is SDOWN if: @@ -2975,14 +3449,14 @@ void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) { { /* Is subjectively down */ if ((ri->flags & SRI_S_DOWN) == 0) { - sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@"); + sentinelEvent(LL_WARNING,"+sdown",ri,"%@"); ri->s_down_since_time = mstime(); ri->flags |= SRI_S_DOWN; } } else { /* Is subjectively up */ if (ri->flags & SRI_S_DOWN) { - sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@"); + sentinelEvent(LL_WARNING,"-sdown",ri,"%@"); ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT); } } @@ -2997,7 +3471,7 @@ void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) { void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) { dictIterator *di; dictEntry *de; - int quorum = 0, odown = 0; + unsigned int quorum = 0, odown = 0; if (master->flags & SRI_S_DOWN) { /* Is down for enough sentinels? */ @@ -3016,14 +3490,14 @@ void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) { /* Set the flag accordingly to the outcome. */ if (odown) { if ((master->flags & SRI_O_DOWN) == 0) { - sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d", + sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d", quorum, master->quorum); master->flags |= SRI_O_DOWN; master->o_down_since_time = mstime(); } } else { if (master->flags & SRI_O_DOWN) { - sentinelEvent(REDIS_WARNING,"-odown",master,"%@"); + sentinelEvent(LL_WARNING,"-odown",master,"%@"); master->flags &= ~SRI_O_DOWN; } } @@ -3032,11 +3506,12 @@ void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) { /* Receive the SENTINEL is-master-down-by-addr reply, see the * sentinelAskMasterStateToOtherSentinels() function for more information. */ void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) { - sentinelRedisInstance *ri = c->data; + sentinelRedisInstance *ri = privdata; + instanceLink *link = c->data; redisReply *r; - if (ri) ri->pending_commands--; - if (!reply || !ri) return; + if (!reply || !link) return; + link->pending_commands--; r = reply; /* Ignore every error or unexpected reply. @@ -3057,8 +3532,8 @@ void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *p /* If the runid in the reply is not "*" the Sentinel actually * replied with a vote. */ sdsfree(ri->leader); - if (ri->leader_epoch != r->element[2]->integer) - redisLog(REDIS_WARNING, + if ((long long)ri->leader_epoch != r->element[2]->integer) + serverLog(LL_WARNING, "%s voted for %s %llu", ri->name, r->element[1]->str, (unsigned long long) r->element[2]->integer); @@ -3097,27 +3572,34 @@ void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int f * 2) Sentinel is connected. * 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */ if ((master->flags & SRI_S_DOWN) == 0) continue; - if (ri->flags & SRI_DISCONNECTED) continue; + if (ri->link->disconnected) continue; if (!(flags & SENTINEL_ASK_FORCED) && mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD) continue; /* Ask */ ll2string(port,sizeof(port),master->addr->port); - retval = redisAsyncCommand(ri->cc, - sentinelReceiveIsMasterDownReply, NULL, + retval = redisAsyncCommand(ri->link->cc, + sentinelReceiveIsMasterDownReply, ri, "SENTINEL is-master-down-by-addr %s %s %llu %s", master->addr->ip, port, sentinel.current_epoch, (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ? - server.runid : "*"); - if (retval == REDIS_OK) ri->pending_commands++; + sentinel.myid : "*"); + if (retval == C_OK) ri->link->pending_commands++; } dictReleaseIterator(di); } /* =============================== FAILOVER ================================= */ +/* Crash because of user request via SENTINEL simulate-failure command. */ +void sentinelSimFailureCrash(void) { + serverLog(LL_WARNING, + "Sentinel CRASH because of SENTINEL simulate-failure"); + exit(99); +} + /* Vote for the sentinel with 'req_runid' or return the old vote if already * voted for the specifed 'req_epoch' or one greater. * @@ -3127,7 +3609,7 @@ char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char if (req_epoch > sentinel.current_epoch) { sentinel.current_epoch = req_epoch; sentinelFlushConfig(); - sentinelEvent(REDIS_WARNING,"+new-epoch",master,"%llu", + sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu", (unsigned long long) sentinel.current_epoch); } @@ -3137,12 +3619,12 @@ char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char master->leader = sdsnew(req_runid); master->leader_epoch = sentinel.current_epoch; sentinelFlushConfig(); - sentinelEvent(REDIS_WARNING,"+vote-for-leader",master,"%s %llu", + sentinelEvent(LL_WARNING,"+vote-for-leader",master,"%s %llu", master->leader, (unsigned long long) master->leader_epoch); /* If we did not voted for ourselves, set the master failover start * time to now, in order to force a delay before we can start a * failover for the same master. */ - if (strcasecmp(master->leader,server.runid)) + if (strcasecmp(master->leader,sentinel.myid)) master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC; } @@ -3158,16 +3640,16 @@ struct sentinelLeader { /* Helper function for sentinelGetLeader, increment the counter * relative to the specified runid. */ int sentinelLeaderIncr(dict *counters, char *runid) { - dictEntry *de = dictFind(counters,runid); + dictEntry *existing, *de; uint64_t oldval; - if (de) { - oldval = dictGetUnsignedIntegerVal(de); - dictSetUnsignedIntegerVal(de,oldval+1); + de = dictAddRaw(counters,runid,&existing); + if (existing) { + oldval = dictGetUnsignedIntegerVal(existing); + dictSetUnsignedIntegerVal(existing,oldval+1); return oldval+1; } else { - de = dictAddRaw(counters,runid); - redisAssert(de != NULL); + serverAssert(de != NULL); dictSetUnsignedIntegerVal(de,1); return 1; } @@ -3176,9 +3658,9 @@ int sentinelLeaderIncr(dict *counters, char *runid) { /* Scan all the Sentinels attached to this master to check if there * is a leader for the specified epoch. * - * To be a leader for a given epoch, we should have the majorify of - * the Sentinels we know that reported the same instance as - * leader for the same epoch. */ + * To be a leader for a given epoch, we should have the majority of + * the Sentinels we know (ever seen since the last SENTINEL RESET) that + * reported the same instance as leader for the same epoch. */ char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) { dict *counters; dictIterator *di; @@ -3189,16 +3671,17 @@ char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) { uint64_t leader_epoch; uint64_t max_votes = 0; - redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)); + serverAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)); counters = dictCreate(&leaderVotesDictType,NULL); + voters = dictSize(master->sentinels)+1; /* All the other sentinels and me.*/ + /* Count other sentinels votes */ di = dictGetIterator(master->sentinels); while((de = dictNext(di)) != NULL) { sentinelRedisInstance *ri = dictGetVal(de); if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch) sentinelLeaderIncr(counters,ri->leader); - voters++; } dictReleaseIterator(di); @@ -3222,7 +3705,7 @@ char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) { if (winner) myvote = sentinelVoteLeader(master,epoch,winner,&leader_epoch); else - myvote = sentinelVoteLeader(master,epoch,server.runid,&leader_epoch); + myvote = sentinelVoteLeader(master,epoch,sentinel.myid,&leader_epoch); if (myvote && leader_epoch == epoch) { uint64_t votes = sentinelLeaderIncr(counters,myvote); @@ -3232,7 +3715,6 @@ char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) { winner = myvote; } } - voters++; /* Anyway, count me as one of the voters. */ voters_quorum = voters/2+1; if (winner && (max_votes < voters_quorum || max_votes < master->quorum)) @@ -3251,8 +3733,8 @@ char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) { * * If Host is NULL the function sends "SLAVEOF NO ONE". * - * The command returns REDIS_OK if the SLAVEOF command was accepted for - * (later) delivery otherwise REDIS_ERR. The command replies are just + * The command returns C_OK if the SLAVEOF command was accepted for + * (later) delivery otherwise C_ERR. The command replies are just * discarded. */ int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) { char portstr[32]; @@ -3277,49 +3759,49 @@ int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) { * * Note that we don't check the replies returned by commands, since we * will observe instead the effects in the next INFO output. */ - retval = redisAsyncCommand(ri->cc, - sentinelDiscardReplyCallback, NULL, "MULTI"); - if (retval == REDIS_ERR) return retval; - ri->pending_commands++; + retval = redisAsyncCommand(ri->link->cc, + sentinelDiscardReplyCallback, ri, "MULTI"); + if (retval == C_ERR) return retval; + ri->link->pending_commands++; - retval = redisAsyncCommand(ri->cc, - sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s", host, portstr); - if (retval == REDIS_ERR) return retval; - ri->pending_commands++; + retval = redisAsyncCommand(ri->link->cc, + sentinelDiscardReplyCallback, ri, "SLAVEOF %s %s", host, portstr); + if (retval == C_ERR) return retval; + ri->link->pending_commands++; - retval = redisAsyncCommand(ri->cc, - sentinelDiscardReplyCallback, NULL, "CONFIG REWRITE"); - if (retval == REDIS_ERR) return retval; - ri->pending_commands++; + retval = redisAsyncCommand(ri->link->cc, + sentinelDiscardReplyCallback, ri, "CONFIG REWRITE"); + if (retval == C_ERR) return retval; + ri->link->pending_commands++; /* CLIENT KILL TYPE <type> is only supported starting from Redis 2.8.12, * however sending it to an instance not understanding this command is not * an issue because CLIENT is variadic command, so Redis will not * recognized as a syntax error, and the transaction will not fail (but * only the unsupported command will fail). */ - retval = redisAsyncCommand(ri->cc, - sentinelDiscardReplyCallback, NULL, "CLIENT KILL TYPE normal"); - if (retval == REDIS_ERR) return retval; - ri->pending_commands++; + retval = redisAsyncCommand(ri->link->cc, + sentinelDiscardReplyCallback, ri, "CLIENT KILL TYPE normal"); + if (retval == C_ERR) return retval; + ri->link->pending_commands++; - retval = redisAsyncCommand(ri->cc, - sentinelDiscardReplyCallback, NULL, "EXEC"); - if (retval == REDIS_ERR) return retval; - ri->pending_commands++; + retval = redisAsyncCommand(ri->link->cc, + sentinelDiscardReplyCallback, ri, "EXEC"); + if (retval == C_ERR) return retval; + ri->link->pending_commands++; - return REDIS_OK; + return C_OK; } /* Setup the master state to start a failover. */ void sentinelStartFailover(sentinelRedisInstance *master) { - redisAssert(master->flags & SRI_MASTER); + serverAssert(master->flags & SRI_MASTER); master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START; master->flags |= SRI_FAILOVER_IN_PROGRESS; master->failover_epoch = ++sentinel.current_epoch; - sentinelEvent(REDIS_WARNING,"+new-epoch",master,"%llu", + sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu", (unsigned long long) sentinel.current_epoch); - sentinelEvent(REDIS_WARNING,"+try-failover",master,"%@"); + sentinelEvent(LL_WARNING,"+try-failover",master,"%@"); master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC; master->failover_state_change_time = mstime(); } @@ -3354,7 +3836,7 @@ int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) { ctime_r(&clock,ctimebuf); ctimebuf[24] = '\0'; /* Remove newline. */ master->failover_delay_logged = master->failover_start_time; - redisLog(REDIS_WARNING, + serverLog(LL_WARNING, "Next failover delay: I will not start a failover before %s", ctimebuf); } @@ -3406,11 +3888,11 @@ int compareSlavesForPromotion(const void *a, const void *b) { return (*sa)->slave_priority - (*sb)->slave_priority; /* If priority is the same, select the slave with greater replication - * offset (processed more data frmo the master). */ + * offset (processed more data from the master). */ if ((*sa)->slave_repl_offset > (*sb)->slave_repl_offset) { return -1; /* a < b */ } else if ((*sa)->slave_repl_offset < (*sb)->slave_repl_offset) { - return 1; /* b > a */ + return 1; /* a > b */ } /* If the replication offset is the same select the slave with that has @@ -3443,8 +3925,9 @@ sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) { sentinelRedisInstance *slave = dictGetVal(de); mstime_t info_validity_time; - if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue; - if (mstime() - slave->last_avail_time > SENTINEL_PING_PERIOD*5) continue; + if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue; + if (slave->link->disconnected) continue; + if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue; if (slave->slave_priority == 0) continue; /* If the master is in SDOWN state we get INFO for slaves every second. @@ -3475,7 +3958,7 @@ void sentinelFailoverWaitStart(sentinelRedisInstance *ri) { /* Check if we are the leader for the failover epoch. */ leader = sentinelGetLeader(ri, ri->failover_epoch); - isleader = leader && strcasecmp(leader,server.runid) == 0; + isleader = leader && strcasecmp(leader,sentinel.myid) == 0; sdsfree(leader); /* If I'm not the leader, and it is not a forced failover via @@ -3489,15 +3972,17 @@ void sentinelFailoverWaitStart(sentinelRedisInstance *ri) { election_timeout = ri->failover_timeout; /* Abort the failover if I'm not the leader after some time. */ if (mstime() - ri->failover_start_time > election_timeout) { - sentinelEvent(REDIS_WARNING,"-failover-abort-not-elected",ri,"%@"); + sentinelEvent(LL_WARNING,"-failover-abort-not-elected",ri,"%@"); sentinelAbortFailover(ri); } return; } - sentinelEvent(REDIS_WARNING,"+elected-leader",ri,"%@"); + sentinelEvent(LL_WARNING,"+elected-leader",ri,"%@"); + if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION) + sentinelSimFailureCrash(); ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE; ri->failover_state_change_time = mstime(); - sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@"); + sentinelEvent(LL_WARNING,"+failover-state-select-slave",ri,"%@"); } void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) { @@ -3506,15 +3991,15 @@ void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) { /* We don't handle the timeout in this state as the function aborts * the failover or go forward in the next state. */ if (slave == NULL) { - sentinelEvent(REDIS_WARNING,"-failover-abort-no-good-slave",ri,"%@"); + sentinelEvent(LL_WARNING,"-failover-abort-no-good-slave",ri,"%@"); sentinelAbortFailover(ri); } else { - sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@"); + sentinelEvent(LL_WARNING,"+selected-slave",slave,"%@"); slave->flags |= SRI_PROMOTED; ri->promoted_slave = slave; ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE; ri->failover_state_change_time = mstime(); - sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone", + sentinelEvent(LL_NOTICE,"+failover-state-send-slaveof-noone", slave, "%@"); } } @@ -3525,9 +4010,9 @@ void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) { /* We can't send the command to the promoted slave if it is now * disconnected. Retry again and again with this state until the timeout * is reached, then abort the failover. */ - if (ri->promoted_slave->flags & SRI_DISCONNECTED) { + if (ri->promoted_slave->link->disconnected) { if (mstime() - ri->failover_state_change_time > ri->failover_timeout) { - sentinelEvent(REDIS_WARNING,"-failover-abort-slave-timeout",ri,"%@"); + sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@"); sentinelAbortFailover(ri); } return; @@ -3538,8 +4023,8 @@ void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) { * really care about the reply. We check if it worked indirectly observing * if INFO returns a different role (master instead of slave). */ retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0); - if (retval != REDIS_OK) return; - sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion", + if (retval != C_OK) return; + sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion", ri->promoted_slave,"%@"); ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION; ri->failover_state_change_time = mstime(); @@ -3551,7 +4036,7 @@ void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) { /* Just handle the timeout. Switching to the next state is handled * by the function parsing the INFO command of the promoted slave. */ if (mstime() - ri->failover_state_change_time > ri->failover_timeout) { - sentinelEvent(REDIS_WARNING,"-failover-abort-slave-timeout",ri,"%@"); + sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@"); sentinelAbortFailover(ri); } } @@ -3583,11 +4068,11 @@ void sentinelFailoverDetectEnd(sentinelRedisInstance *master) { if (elapsed > master->failover_timeout) { not_reconfigured = 0; timeout = 1; - sentinelEvent(REDIS_WARNING,"+failover-end-for-timeout",master,"%@"); + sentinelEvent(LL_WARNING,"+failover-end-for-timeout",master,"%@"); } if (not_reconfigured == 0) { - sentinelEvent(REDIS_WARNING,"+failover-end",master,"%@"); + sentinelEvent(LL_WARNING,"+failover-end",master,"%@"); master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG; master->failover_state_change_time = mstime(); } @@ -3604,14 +4089,14 @@ void sentinelFailoverDetectEnd(sentinelRedisInstance *master) { sentinelRedisInstance *slave = dictGetVal(de); int retval; - if (slave->flags & - (SRI_RECONF_DONE|SRI_RECONF_SENT|SRI_DISCONNECTED)) continue; + if (slave->flags & (SRI_RECONF_DONE|SRI_RECONF_SENT)) continue; + if (slave->link->disconnected) continue; retval = sentinelSendSlaveOf(slave, master->promoted_slave->addr->ip, master->promoted_slave->addr->port); - if (retval == REDIS_OK) { - sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent-be",slave,"%@"); + if (retval == C_OK) { + sentinelEvent(LL_NOTICE,"+slave-reconf-sent-be",slave,"%@"); slave->flags |= SRI_RECONF_SENT; } } @@ -3653,24 +4138,24 @@ void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) { (mstime() - slave->slave_reconf_sent_time) > SENTINEL_SLAVE_RECONF_TIMEOUT) { - sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@"); + sentinelEvent(LL_NOTICE,"-slave-reconf-sent-timeout",slave,"%@"); slave->flags &= ~SRI_RECONF_SENT; slave->flags |= SRI_RECONF_DONE; } /* Nothing to do for instances that are disconnected or already * in RECONF_SENT state. */ - if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG)) - continue; + if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)) continue; + if (slave->link->disconnected) continue; /* Send SLAVEOF <new master>. */ retval = sentinelSendSlaveOf(slave, master->promoted_slave->addr->ip, master->promoted_slave->addr->port); - if (retval == REDIS_OK) { + if (retval == C_OK) { slave->flags |= SRI_RECONF_SENT; slave->slave_reconf_sent_time = mstime(); - sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@"); + sentinelEvent(LL_NOTICE,"+slave-reconf-sent",slave,"%@"); in_progress++; } } @@ -3687,7 +4172,7 @@ void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) { sentinelRedisInstance *ref = master->promoted_slave ? master->promoted_slave : master; - sentinelEvent(REDIS_WARNING,"+switch-master",master,"%s %s %d %s %d", + sentinelEvent(LL_WARNING,"+switch-master",master,"%s %s %d %s %d", master->name, master->addr->ip, master->addr->port, ref->addr->ip, ref->addr->port); @@ -3695,7 +4180,7 @@ void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) { } void sentinelFailoverStateMachine(sentinelRedisInstance *ri) { - redisAssert(ri->flags & SRI_MASTER); + serverAssert(ri->flags & SRI_MASTER); if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return; @@ -3724,8 +4209,8 @@ void sentinelFailoverStateMachine(sentinelRedisInstance *ri) { * the slave -> master switch. Otherwise the failover can't be aborted and * will reach its end (possibly by timeout). */ void sentinelAbortFailover(sentinelRedisInstance *ri) { - redisAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS); - redisAssert(ri->failover_state <= SENTINEL_FAILOVER_STATE_WAIT_PROMOTION); + serverAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS); + serverAssert(ri->failover_state <= SENTINEL_FAILOVER_STATE_WAIT_PROMOTION); ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_FORCE_FAILOVER); ri->failover_state = SENTINEL_FAILOVER_STATE_NONE; @@ -3755,7 +4240,7 @@ void sentinelHandleRedisInstance(sentinelRedisInstance *ri) { if (sentinel.tilt) { if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return; sentinel.tilt = 0; - sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited"); + sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited"); } /* Every kind of instance */ @@ -3828,7 +4313,7 @@ void sentinelCheckTiltCondition(void) { if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) { sentinel.tilt = 1; sentinel.tilt_start_time = mstime(); - sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered"); + sentinelEvent(LL_WARNING,"+tilt",NULL,"#tilt mode entered"); } sentinel.previous_time = mstime(); } @@ -3846,6 +4331,6 @@ void sentinelTimer(void) { * exactly continue to stay synchronized asking to be voted at the * same time again and again (resulting in nobody likely winning the * election because of split brain voting). */ - server.hz = REDIS_DEFAULT_HZ + rand() % REDIS_DEFAULT_HZ; + server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ; } |