summaryrefslogtreecommitdiff
path: root/src/replication.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/replication.c')
-rw-r--r--src/replication.c393
1 files changed, 377 insertions, 16 deletions
diff --git a/src/replication.c b/src/replication.c
index 9fb19eaca..f23fcb6de 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -200,6 +200,16 @@ void feedReplicationBacklogWithObject(robj *o) {
feedReplicationBacklog(p,len);
}
+int canFeedReplicaReplBuffer(client *replica) {
+ /* Don't feed replicas that only want the RDB. */
+ if (replica->flags & CLIENT_REPL_RDBONLY) return 0;
+
+ /* Don't feed replicas that are still waiting for BGSAVE to start. */
+ if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) return 0;
+
+ return 1;
+}
+
/* Propagate write commands to slaves, and populate the replication backlog
* as well. This function is used if the instance is a master: we use
* the commands received by our clients in order to create the replication
@@ -249,7 +259,8 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
- if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
+
+ if (!canFeedReplicaReplBuffer(slave)) continue;
addReply(slave,selectcmd);
}
@@ -290,8 +301,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
while((ln = listNext(&li))) {
client *slave = ln->value;
- /* Don't feed slaves that are still waiting for BGSAVE to start. */
- if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
+ if (!canFeedReplicaReplBuffer(slave)) continue;
/* Feed slaves that are waiting for the initial SYNC (so these commands
* are queued in the output buffer until the initial SYNC completes),
@@ -363,8 +373,7 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle
while((ln = listNext(&li))) {
client *slave = ln->value;
- /* Don't feed slaves that are still waiting for BGSAVE to start. */
- if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
+ if (!canFeedReplicaReplBuffer(slave)) continue;
addReplyProto(slave,buf,buflen);
}
}
@@ -712,6 +721,36 @@ void syncCommand(client *c) {
/* ignore SYNC if already slave or in monitor mode */
if (c->flags & CLIENT_SLAVE) return;
+ /* Check if this is a failover request to a replica with the same replid and
+ * become a master if so. */
+ if (c->argc > 3 && !strcasecmp(c->argv[0]->ptr,"psync") &&
+ !strcasecmp(c->argv[3]->ptr,"failover"))
+ {
+ serverLog(LL_WARNING, "Failover request received for replid %s.",
+ (unsigned char *)c->argv[1]->ptr);
+ if (!server.masterhost) {
+ addReplyError(c, "PSYNC FAILOVER can't be sent to a master.");
+ return;
+ }
+
+ if (!strcasecmp(c->argv[1]->ptr,server.replid)) {
+ replicationUnsetMaster();
+ sds client = catClientInfoString(sdsempty(),c);
+ serverLog(LL_NOTICE,
+ "MASTER MODE enabled (failover request from '%s')",client);
+ sdsfree(client);
+ } else {
+ addReplyError(c, "PSYNC FAILOVER replid must match my replid.");
+ return;
+ }
+ }
+
+ /* Don't let replicas sync with us while we're failing over */
+ if (server.failover_state != NO_FAILOVER) {
+ addReplyError(c,"-NOMASTERLINK Can't SYNC while failing over");
+ return;
+ }
+
/* Refuse SYNC requests if we are a slave but the link with our master
* is not ok... */
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
@@ -799,14 +838,20 @@ void syncCommand(client *c) {
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
slave = ln->value;
- if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
+ /* If the client needs a buffer of commands, we can't use
+ * a replica without replication buffer. */
+ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
+ (!(slave->flags & CLIENT_REPL_RDBONLY) ||
+ (c->flags & CLIENT_REPL_RDBONLY)))
+ break;
}
/* To attach this slave, we check that it has at least all the
* capabilities of the slave that triggered the current BGSAVE. */
if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
/* Perfect, the server is already registering differences for
- * another slave. Set the right state, and copy the buffer. */
- copyClientOutputBuffer(c,slave);
+ * another slave. Set the right state, and copy the buffer.
+ * We don't copy buffer if clients don't want. */
+ if (!(c->flags & CLIENT_REPL_RDBONLY)) copyClientOutputBuffer(c,slave);
replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
@@ -925,6 +970,15 @@ void replconfCommand(client *c) {
* to the slave. */
if (server.masterhost && server.master) replicationSendAck();
return;
+ } else if (!strcasecmp(c->argv[j]->ptr,"rdb-only")) {
+ /* REPLCONF RDB-ONLY is used to identify the client only wants
+ * RDB snapshot without replication buffer. */
+ long rdb_only = 0;
+ if (getRangeLongFromObjectOrReply(c,c->argv[j+1],
+ 0,1,&rdb_only,NULL) != C_OK)
+ return;
+ if (rdb_only == 1) c->flags |= CLIENT_REPL_RDBONLY;
+ else c->flags &= ~CLIENT_REPL_RDBONLY;
} else {
addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
(char*)c->argv[j]->ptr);
@@ -939,19 +993,28 @@ void replconfCommand(client *c) {
* we are finally ready to send the incremental stream of commands.
*
* It does a few things:
- *
- * 1) Put the slave in ONLINE state. Note that the function may also be called
+ * 1) Close the replica's connection async if it doesn't need replication
+ * commands buffer stream, since it actually isn't a valid replica.
+ * 2) Put the slave in ONLINE state. Note that the function may also be called
* for a replicas that are already in ONLINE state, but having the flag
* repl_put_online_on_ack set to true: we still have to install the write
* handler in that case. This function will take care of that.
- * 2) Make sure the writable event is re-installed, since calling the SYNC
+ * 3) Make sure the writable event is re-installed, since calling the SYNC
* command disables it, so that we can accumulate output buffer without
* sending it to the replica.
- * 3) Update the count of "good replicas". */
+ * 4) Update the count of "good replicas". */
void putSlaveOnline(client *slave) {
slave->replstate = SLAVE_STATE_ONLINE;
slave->repl_put_online_on_ack = 0;
slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
+
+ if (slave->flags & CLIENT_REPL_RDBONLY) {
+ serverLog(LL_NOTICE,
+ "Close the connection with replica %s as RDB transfer is complete",
+ replicationGetSlaveName(slave));
+ freeClientAsync(slave);
+ return;
+ }
if (connSetWriteHandler(slave->conn, sendReplyToClient) == C_ERR) {
serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno));
freeClient(slave);
@@ -1998,8 +2061,15 @@ int slaveTryPartialResynchronization(connection *conn, int read_reply) {
memcpy(psync_offset,"-1",3);
}
- /* Issue the PSYNC command */
- reply = sendCommand(conn,"PSYNC",psync_replid,psync_offset,NULL);
+ /* Issue the PSYNC command, if this is a master with a failover in
+ * progress then send the failover argument to the replica to cause it
+ * to become a master */
+ if (server.failover_state == FAILOVER_IN_PROGRESS) {
+ reply = sendCommand(conn,"PSYNC",psync_replid,psync_offset,"FAILOVER",NULL);
+ } else {
+ reply = sendCommand(conn,"PSYNC",psync_replid,psync_offset,NULL);
+ }
+
if (reply != NULL) {
serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
sdsfree(reply);
@@ -2323,6 +2393,7 @@ void syncWithMaster(connection *conn) {
if (server.repl_state == REPL_STATE_SEND_PSYNC) {
if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) {
err = sdsnew("Write error sending the PSYNC command.");
+ abortFailover("Write error to failover target");
goto write_error;
}
server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY;
@@ -2340,6 +2411,18 @@ void syncWithMaster(connection *conn) {
psync_result = slaveTryPartialResynchronization(conn,1);
if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */
+ /* Check the status of the planned failover. We expect PSYNC_CONTINUE,
+ * but there is nothing technically wrong with a full resync which
+ * could happen in edge cases. */
+ if (server.failover_state == FAILOVER_IN_PROGRESS) {
+ if (psync_result == PSYNC_CONTINUE || psync_result == PSYNC_FULLRESYNC) {
+ clearFailoverState();
+ } else {
+ abortFailover("Failover target rejected psync request");
+ return;
+ }
+ }
+
/* If the master is in an transient error, we should try to PSYNC
* from scratch later, so go to the error path. This happens when
* the server is loading the dataset or is not connected with its
@@ -2645,6 +2728,11 @@ void replicaofCommand(client *c) {
return;
}
+ if (server.failover_state != NO_FAILOVER) {
+ addReplyError(c,"REPLICAOF not allowed while failing over.");
+ return;
+ }
+
/* The special host/port combination "NO" "ONE" turns the instance
* into a master. Otherwise the new master address is set. */
if (!strcasecmp(c->argv[1]->ptr,"no") &&
@@ -3178,6 +3266,10 @@ long long replicationGetSlaveOffset(void) {
void replicationCron(void) {
static long long replication_cron_loops = 0;
+ /* Check failover status first, to see if we need to start
+ * handling the failover. */
+ updateFailoverStatus();
+
/* Non blocking connection timeout? */
if (server.masterhost &&
(server.repl_state == REPL_STATE_CONNECTING ||
@@ -3235,8 +3327,9 @@ void replicationCron(void) {
* alter the replication offsets of master and slave, and will no longer
* match the one stored into 'mf_master_offset' state. */
int manual_failover_in_progress =
- server.cluster_enabled &&
- server.cluster->mf_end &&
+ ((server.cluster_enabled &&
+ server.cluster->mf_end) ||
+ server.failover_end_time) &&
checkClientPauseTimeoutAndReturnIfPaused();
if (!manual_failover_in_progress) {
@@ -3390,3 +3483,271 @@ void replicationStartPendingFork(void) {
}
}
}
+
+/* Find replica at IP:PORT from replica list */
+static client *findReplica(char *host, int port) {
+ listIter li;
+ listNode *ln;
+ client *replica;
+
+ listRewind(server.slaves,&li);
+ while((ln = listNext(&li))) {
+ replica = ln->value;
+ char ip[NET_IP_STR_LEN], *replicaip = replica->slave_ip;
+
+ if (replicaip[0] == '\0') {
+ if (connPeerToString(replica->conn, ip, sizeof(ip), NULL) == -1)
+ continue;
+ replicaip = ip;
+ }
+
+ if (!strcasecmp(host, replicaip) &&
+ (port == replica->slave_listening_port))
+ return replica;
+ }
+
+ return NULL;
+}
+
+const char *getFailoverStateString() {
+ switch(server.failover_state) {
+ case NO_FAILOVER: return "no-failover";
+ case FAILOVER_IN_PROGRESS: return "failover-in-progress";
+ case FAILOVER_WAIT_FOR_SYNC: return "waiting-for-sync";
+ default: return "unknown";
+ }
+}
+
+/* Resets the internal failover configuration, this needs
+ * to be called after a failover either succeeds or fails
+ * as it includes the client unpause. */
+void clearFailoverState() {
+ server.failover_end_time = 0;
+ server.force_failover = 0;
+ zfree(server.target_replica_host);
+ server.target_replica_host = NULL;
+ server.target_replica_port = 0;
+ server.failover_state = NO_FAILOVER;
+ unpauseClients();
+}
+
+/* Abort an ongoing failover if one is going on. */
+void abortFailover(const char *err) {
+ if (server.failover_state == NO_FAILOVER) return;
+
+ if (server.target_replica_host) {
+ serverLog(LL_NOTICE,"FAILOVER to %s:%d aborted: %s",
+ server.target_replica_host,server.target_replica_port,err);
+ } else {
+ serverLog(LL_NOTICE,"FAILOVER to any replica aborted: %s",err);
+ }
+ if (server.failover_state == FAILOVER_IN_PROGRESS) {
+ replicationUnsetMaster();
+ }
+ clearFailoverState();
+}
+
+/*
+ * FAILOVER [TO <HOST> <IP> [FORCE]] [ABORT] [TIMEOUT <timeout>]
+ *
+ * This command will coordinate a failover between the master and one
+ * of its replicas. The happy path contains the following steps:
+ * 1) The master will initiate a client pause write, to stop replication
+ * traffic.
+ * 2) The master will periodically check if any of its replicas has
+ * consumed the entire replication stream through acks.
+ * 3) Once any replica has caught up, the master will itself become a replica.
+ * 4) The master will send a PSYNC FAILOVER request to the target replica, which
+ * if accepted will cause the replica to become the new master and start a sync.
+ *
+ * FAILOVER ABORT is the only way to abort a failover command, as replicaof
+ * will be disabled. This may be needed if the failover is unable to progress.
+ *
+ * The optional arguments [TO <HOST> <IP>] allows designating a specific replica
+ * to be failed over to.
+ *
+ * FORCE flag indicates that even if the target replica is not caught up,
+ * failover to it anyway. This must be specified with a timeout and a target
+ * HOST and IP.
+ *
+ * TIMEOUT <timeout> indicates how long should the primary wait for
+ * a replica to sync up before aborting. If not specified, the failover
+ * will attempt forever and must be manually aborted.
+ */
+void failoverCommand(client *c) {
+ if (server.cluster_enabled) {
+ addReplyError(c,"FAILOVER not allowed in cluster mode. "
+ "Use CLUSTER FAILOVER command instead.");
+ return;
+ }
+
+ /* Handle special case for abort */
+ if ((c->argc == 2) && !strcasecmp(c->argv[1]->ptr,"abort")) {
+ if (server.failover_state == NO_FAILOVER) {
+ addReplyError(c, "No failover in progress.");
+ return;
+ }
+
+ abortFailover("Failover manually aborted");
+ addReply(c,shared.ok);
+ return;
+ }
+
+ long timeout_in_ms = 0;
+ int force_flag = 0;
+ long port = 0;
+ char *host = NULL;
+
+ /* Parse the command for syntax and arguments. */
+ for (int j = 1; j < c->argc; j++) {
+ if (!strcasecmp(c->argv[j]->ptr,"timeout") && (j + 1 < c->argc) &&
+ timeout_in_ms == 0)
+ {
+ if (getLongFromObjectOrReply(c,c->argv[j + 1],
+ &timeout_in_ms,NULL) != C_OK) return;
+ if (timeout_in_ms <= 0) {
+ addReplyError(c,"FAILOVER timeout must be greater than 0");
+ return;
+ }
+ j++;
+ } else if (!strcasecmp(c->argv[j]->ptr,"to") && (j + 2 < c->argc) &&
+ !host)
+ {
+ if (getLongFromObjectOrReply(c,c->argv[j + 2],&port,NULL) != C_OK)
+ return;
+ host = c->argv[j + 1]->ptr;
+ j += 2;
+ } else if (!strcasecmp(c->argv[j]->ptr,"force") && !force_flag) {
+ force_flag = 1;
+ } else {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
+ }
+
+ if (server.failover_state != NO_FAILOVER) {
+ addReplyError(c,"FAILOVER already in progress.");
+ return;
+ }
+
+ if (server.masterhost) {
+ addReplyError(c,"FAILOVER is not valid when server is a replica.");
+ return;
+ }
+
+ if (listLength(server.slaves) == 0) {
+ addReplyError(c,"FAILOVER requires connected replicas.");
+ return;
+ }
+
+ if (force_flag && (!timeout_in_ms || !host)) {
+ addReplyError(c,"FAILOVER with force option requires both a timeout "
+ "and target HOST and IP.");
+ return;
+ }
+
+ /* If a replica address was provided, validate that it is connected. */
+ if (host) {
+ client *replica = findReplica(host, port);
+
+ if (replica == NULL) {
+ addReplyError(c,"FAILOVER target HOST and IP is not "
+ "a replica.");
+ return;
+ }
+
+ /* Check if requested replica is online */
+ if (replica->replstate != SLAVE_STATE_ONLINE) {
+ addReplyError(c,"FAILOVER target replica is not online.");
+ return;
+ }
+
+ server.target_replica_host = zstrdup(host);
+ server.target_replica_port = port;
+ serverLog(LL_NOTICE,"FAILOVER requested to %s:%ld.",host,port);
+ } else {
+ serverLog(LL_NOTICE,"FAILOVER requested to any replica.");
+ }
+
+ mstime_t now = mstime();
+ if (timeout_in_ms) {
+ server.failover_end_time = now + timeout_in_ms;
+ }
+
+ server.force_failover = force_flag;
+ server.failover_state = FAILOVER_WAIT_FOR_SYNC;
+ /* Cluster failover will unpause eventually */
+ pauseClients(LLONG_MAX,CLIENT_PAUSE_WRITE);
+ addReply(c,shared.ok);
+}
+
+/* Failover cron function, checks coordinated failover state.
+ *
+ * Implementation note: The current implementation calls replicationSetMaster()
+ * to start the failover request, this has some unintended side effects if the
+ * failover doesn't work like blocked clients will be unblocked and replicas will
+ * be disconnected. This could be optimized further.
+ */
+void updateFailoverStatus(void) {
+ if (server.failover_state != FAILOVER_WAIT_FOR_SYNC) return;
+ mstime_t now = server.mstime;
+
+ /* Check if failover operation has timed out */
+ if (server.failover_end_time && server.failover_end_time <= now) {
+ if (server.force_failover) {
+ serverLog(LL_NOTICE,
+ "FAILOVER to %s:%d time out exceeded, failing over.",
+ server.target_replica_host, server.target_replica_port);
+ server.failover_state = FAILOVER_IN_PROGRESS;
+ /* If timeout has expired force a failover if requested. */
+ replicationSetMaster(server.target_replica_host,
+ server.target_replica_port);
+ return;
+ } else {
+ /* Force was not requested, so timeout. */
+ abortFailover("Replica never caught up before timeout");
+ return;
+ }
+ }
+
+ /* Check to see if the replica has caught up so failover can start */
+ client *replica = NULL;
+ if (server.target_replica_host) {
+ replica = findReplica(server.target_replica_host,
+ server.target_replica_port);
+ } else {
+ listIter li;
+ listNode *ln;
+
+ listRewind(server.slaves,&li);
+ /* Find any replica that has matched our repl_offset */
+ while((ln = listNext(&li))) {
+ replica = ln->value;
+ if (replica->repl_ack_off == server.master_repl_offset) {
+ char ip[NET_IP_STR_LEN], *replicaip = replica->slave_ip;
+
+ if (replicaip[0] == '\0') {
+ if (connPeerToString(replica->conn,ip,sizeof(ip),NULL) == -1)
+ continue;
+ replicaip = ip;
+ }
+
+ /* We are now failing over to this specific node */
+ server.target_replica_host = zstrdup(replicaip);
+ server.target_replica_port = replica->slave_listening_port;
+ break;
+ }
+ }
+ }
+
+ /* We've found a replica that is caught up */
+ if (replica && (replica->repl_ack_off == server.master_repl_offset)) {
+ server.failover_state = FAILOVER_IN_PROGRESS;
+ serverLog(LL_NOTICE,
+ "Failover target %s:%d is synced, failing over.",
+ server.target_replica_host, server.target_replica_port);
+ /* Designated replica is caught up, failover to it. */
+ replicationSetMaster(server.target_replica_host,
+ server.target_replica_port);
+ }
+}