diff options
Diffstat (limited to 'src/replication.c')
-rw-r--r-- | src/replication.c | 393 |
1 files changed, 377 insertions, 16 deletions
diff --git a/src/replication.c b/src/replication.c index 9fb19eaca..f23fcb6de 100644 --- a/src/replication.c +++ b/src/replication.c @@ -200,6 +200,16 @@ void feedReplicationBacklogWithObject(robj *o) { feedReplicationBacklog(p,len); } +int canFeedReplicaReplBuffer(client *replica) { + /* Don't feed replicas that only want the RDB. */ + if (replica->flags & CLIENT_REPL_RDBONLY) return 0; + + /* Don't feed replicas that are still waiting for BGSAVE to start. */ + if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) return 0; + + return 1; +} + /* Propagate write commands to slaves, and populate the replication backlog * as well. This function is used if the instance is a master: we use * the commands received by our clients in order to create the replication @@ -249,7 +259,8 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { listRewind(slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + + if (!canFeedReplicaReplBuffer(slave)) continue; addReply(slave,selectcmd); } @@ -290,8 +301,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { while((ln = listNext(&li))) { client *slave = ln->value; - /* Don't feed slaves that are still waiting for BGSAVE to start. */ - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + if (!canFeedReplicaReplBuffer(slave)) continue; /* Feed slaves that are waiting for the initial SYNC (so these commands * are queued in the output buffer until the initial SYNC completes), @@ -363,8 +373,7 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle while((ln = listNext(&li))) { client *slave = ln->value; - /* Don't feed slaves that are still waiting for BGSAVE to start. */ - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + if (!canFeedReplicaReplBuffer(slave)) continue; addReplyProto(slave,buf,buflen); } } @@ -712,6 +721,36 @@ void syncCommand(client *c) { /* ignore SYNC if already slave or in monitor mode */ if (c->flags & CLIENT_SLAVE) return; + /* Check if this is a failover request to a replica with the same replid and + * become a master if so. */ + if (c->argc > 3 && !strcasecmp(c->argv[0]->ptr,"psync") && + !strcasecmp(c->argv[3]->ptr,"failover")) + { + serverLog(LL_WARNING, "Failover request received for replid %s.", + (unsigned char *)c->argv[1]->ptr); + if (!server.masterhost) { + addReplyError(c, "PSYNC FAILOVER can't be sent to a master."); + return; + } + + if (!strcasecmp(c->argv[1]->ptr,server.replid)) { + replicationUnsetMaster(); + sds client = catClientInfoString(sdsempty(),c); + serverLog(LL_NOTICE, + "MASTER MODE enabled (failover request from '%s')",client); + sdsfree(client); + } else { + addReplyError(c, "PSYNC FAILOVER replid must match my replid."); + return; + } + } + + /* Don't let replicas sync with us while we're failing over */ + if (server.failover_state != NO_FAILOVER) { + addReplyError(c,"-NOMASTERLINK Can't SYNC while failing over"); + return; + } + /* Refuse SYNC requests if we are a slave but the link with our master * is not ok... */ if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { @@ -799,14 +838,20 @@ void syncCommand(client *c) { listRewind(server.slaves,&li); while((ln = listNext(&li))) { slave = ln->value; - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break; + /* If the client needs a buffer of commands, we can't use + * a replica without replication buffer. */ + if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && + (!(slave->flags & CLIENT_REPL_RDBONLY) || + (c->flags & CLIENT_REPL_RDBONLY))) + break; } /* To attach this slave, we check that it has at least all the * capabilities of the slave that triggered the current BGSAVE. */ if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) { /* Perfect, the server is already registering differences for - * another slave. Set the right state, and copy the buffer. */ - copyClientOutputBuffer(c,slave); + * another slave. Set the right state, and copy the buffer. + * We don't copy buffer if clients don't want. */ + if (!(c->flags & CLIENT_REPL_RDBONLY)) copyClientOutputBuffer(c,slave); replicationSetupSlaveForFullResync(c,slave->psync_initial_offset); serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { @@ -925,6 +970,15 @@ void replconfCommand(client *c) { * to the slave. */ if (server.masterhost && server.master) replicationSendAck(); return; + } else if (!strcasecmp(c->argv[j]->ptr,"rdb-only")) { + /* REPLCONF RDB-ONLY is used to identify the client only wants + * RDB snapshot without replication buffer. */ + long rdb_only = 0; + if (getRangeLongFromObjectOrReply(c,c->argv[j+1], + 0,1,&rdb_only,NULL) != C_OK) + return; + if (rdb_only == 1) c->flags |= CLIENT_REPL_RDBONLY; + else c->flags &= ~CLIENT_REPL_RDBONLY; } else { addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s", (char*)c->argv[j]->ptr); @@ -939,19 +993,28 @@ void replconfCommand(client *c) { * we are finally ready to send the incremental stream of commands. * * It does a few things: - * - * 1) Put the slave in ONLINE state. Note that the function may also be called + * 1) Close the replica's connection async if it doesn't need replication + * commands buffer stream, since it actually isn't a valid replica. + * 2) Put the slave in ONLINE state. Note that the function may also be called * for a replicas that are already in ONLINE state, but having the flag * repl_put_online_on_ack set to true: we still have to install the write * handler in that case. This function will take care of that. - * 2) Make sure the writable event is re-installed, since calling the SYNC + * 3) Make sure the writable event is re-installed, since calling the SYNC * command disables it, so that we can accumulate output buffer without * sending it to the replica. - * 3) Update the count of "good replicas". */ + * 4) Update the count of "good replicas". */ void putSlaveOnline(client *slave) { slave->replstate = SLAVE_STATE_ONLINE; slave->repl_put_online_on_ack = 0; slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */ + + if (slave->flags & CLIENT_REPL_RDBONLY) { + serverLog(LL_NOTICE, + "Close the connection with replica %s as RDB transfer is complete", + replicationGetSlaveName(slave)); + freeClientAsync(slave); + return; + } if (connSetWriteHandler(slave->conn, sendReplyToClient) == C_ERR) { serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno)); freeClient(slave); @@ -1998,8 +2061,15 @@ int slaveTryPartialResynchronization(connection *conn, int read_reply) { memcpy(psync_offset,"-1",3); } - /* Issue the PSYNC command */ - reply = sendCommand(conn,"PSYNC",psync_replid,psync_offset,NULL); + /* Issue the PSYNC command, if this is a master with a failover in + * progress then send the failover argument to the replica to cause it + * to become a master */ + if (server.failover_state == FAILOVER_IN_PROGRESS) { + reply = sendCommand(conn,"PSYNC",psync_replid,psync_offset,"FAILOVER",NULL); + } else { + reply = sendCommand(conn,"PSYNC",psync_replid,psync_offset,NULL); + } + if (reply != NULL) { serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply); sdsfree(reply); @@ -2323,6 +2393,7 @@ void syncWithMaster(connection *conn) { if (server.repl_state == REPL_STATE_SEND_PSYNC) { if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) { err = sdsnew("Write error sending the PSYNC command."); + abortFailover("Write error to failover target"); goto write_error; } server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; @@ -2340,6 +2411,18 @@ void syncWithMaster(connection *conn) { psync_result = slaveTryPartialResynchronization(conn,1); if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ + /* Check the status of the planned failover. We expect PSYNC_CONTINUE, + * but there is nothing technically wrong with a full resync which + * could happen in edge cases. */ + if (server.failover_state == FAILOVER_IN_PROGRESS) { + if (psync_result == PSYNC_CONTINUE || psync_result == PSYNC_FULLRESYNC) { + clearFailoverState(); + } else { + abortFailover("Failover target rejected psync request"); + return; + } + } + /* If the master is in an transient error, we should try to PSYNC * from scratch later, so go to the error path. This happens when * the server is loading the dataset or is not connected with its @@ -2645,6 +2728,11 @@ void replicaofCommand(client *c) { return; } + if (server.failover_state != NO_FAILOVER) { + addReplyError(c,"REPLICAOF not allowed while failing over."); + return; + } + /* The special host/port combination "NO" "ONE" turns the instance * into a master. Otherwise the new master address is set. */ if (!strcasecmp(c->argv[1]->ptr,"no") && @@ -3178,6 +3266,10 @@ long long replicationGetSlaveOffset(void) { void replicationCron(void) { static long long replication_cron_loops = 0; + /* Check failover status first, to see if we need to start + * handling the failover. */ + updateFailoverStatus(); + /* Non blocking connection timeout? */ if (server.masterhost && (server.repl_state == REPL_STATE_CONNECTING || @@ -3235,8 +3327,9 @@ void replicationCron(void) { * alter the replication offsets of master and slave, and will no longer * match the one stored into 'mf_master_offset' state. */ int manual_failover_in_progress = - server.cluster_enabled && - server.cluster->mf_end && + ((server.cluster_enabled && + server.cluster->mf_end) || + server.failover_end_time) && checkClientPauseTimeoutAndReturnIfPaused(); if (!manual_failover_in_progress) { @@ -3390,3 +3483,271 @@ void replicationStartPendingFork(void) { } } } + +/* Find replica at IP:PORT from replica list */ +static client *findReplica(char *host, int port) { + listIter li; + listNode *ln; + client *replica; + + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + replica = ln->value; + char ip[NET_IP_STR_LEN], *replicaip = replica->slave_ip; + + if (replicaip[0] == '\0') { + if (connPeerToString(replica->conn, ip, sizeof(ip), NULL) == -1) + continue; + replicaip = ip; + } + + if (!strcasecmp(host, replicaip) && + (port == replica->slave_listening_port)) + return replica; + } + + return NULL; +} + +const char *getFailoverStateString() { + switch(server.failover_state) { + case NO_FAILOVER: return "no-failover"; + case FAILOVER_IN_PROGRESS: return "failover-in-progress"; + case FAILOVER_WAIT_FOR_SYNC: return "waiting-for-sync"; + default: return "unknown"; + } +} + +/* Resets the internal failover configuration, this needs + * to be called after a failover either succeeds or fails + * as it includes the client unpause. */ +void clearFailoverState() { + server.failover_end_time = 0; + server.force_failover = 0; + zfree(server.target_replica_host); + server.target_replica_host = NULL; + server.target_replica_port = 0; + server.failover_state = NO_FAILOVER; + unpauseClients(); +} + +/* Abort an ongoing failover if one is going on. */ +void abortFailover(const char *err) { + if (server.failover_state == NO_FAILOVER) return; + + if (server.target_replica_host) { + serverLog(LL_NOTICE,"FAILOVER to %s:%d aborted: %s", + server.target_replica_host,server.target_replica_port,err); + } else { + serverLog(LL_NOTICE,"FAILOVER to any replica aborted: %s",err); + } + if (server.failover_state == FAILOVER_IN_PROGRESS) { + replicationUnsetMaster(); + } + clearFailoverState(); +} + +/* + * FAILOVER [TO <HOST> <IP> [FORCE]] [ABORT] [TIMEOUT <timeout>] + * + * This command will coordinate a failover between the master and one + * of its replicas. The happy path contains the following steps: + * 1) The master will initiate a client pause write, to stop replication + * traffic. + * 2) The master will periodically check if any of its replicas has + * consumed the entire replication stream through acks. + * 3) Once any replica has caught up, the master will itself become a replica. + * 4) The master will send a PSYNC FAILOVER request to the target replica, which + * if accepted will cause the replica to become the new master and start a sync. + * + * FAILOVER ABORT is the only way to abort a failover command, as replicaof + * will be disabled. This may be needed if the failover is unable to progress. + * + * The optional arguments [TO <HOST> <IP>] allows designating a specific replica + * to be failed over to. + * + * FORCE flag indicates that even if the target replica is not caught up, + * failover to it anyway. This must be specified with a timeout and a target + * HOST and IP. + * + * TIMEOUT <timeout> indicates how long should the primary wait for + * a replica to sync up before aborting. If not specified, the failover + * will attempt forever and must be manually aborted. + */ +void failoverCommand(client *c) { + if (server.cluster_enabled) { + addReplyError(c,"FAILOVER not allowed in cluster mode. " + "Use CLUSTER FAILOVER command instead."); + return; + } + + /* Handle special case for abort */ + if ((c->argc == 2) && !strcasecmp(c->argv[1]->ptr,"abort")) { + if (server.failover_state == NO_FAILOVER) { + addReplyError(c, "No failover in progress."); + return; + } + + abortFailover("Failover manually aborted"); + addReply(c,shared.ok); + return; + } + + long timeout_in_ms = 0; + int force_flag = 0; + long port = 0; + char *host = NULL; + + /* Parse the command for syntax and arguments. */ + for (int j = 1; j < c->argc; j++) { + if (!strcasecmp(c->argv[j]->ptr,"timeout") && (j + 1 < c->argc) && + timeout_in_ms == 0) + { + if (getLongFromObjectOrReply(c,c->argv[j + 1], + &timeout_in_ms,NULL) != C_OK) return; + if (timeout_in_ms <= 0) { + addReplyError(c,"FAILOVER timeout must be greater than 0"); + return; + } + j++; + } else if (!strcasecmp(c->argv[j]->ptr,"to") && (j + 2 < c->argc) && + !host) + { + if (getLongFromObjectOrReply(c,c->argv[j + 2],&port,NULL) != C_OK) + return; + host = c->argv[j + 1]->ptr; + j += 2; + } else if (!strcasecmp(c->argv[j]->ptr,"force") && !force_flag) { + force_flag = 1; + } else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + } + + if (server.failover_state != NO_FAILOVER) { + addReplyError(c,"FAILOVER already in progress."); + return; + } + + if (server.masterhost) { + addReplyError(c,"FAILOVER is not valid when server is a replica."); + return; + } + + if (listLength(server.slaves) == 0) { + addReplyError(c,"FAILOVER requires connected replicas."); + return; + } + + if (force_flag && (!timeout_in_ms || !host)) { + addReplyError(c,"FAILOVER with force option requires both a timeout " + "and target HOST and IP."); + return; + } + + /* If a replica address was provided, validate that it is connected. */ + if (host) { + client *replica = findReplica(host, port); + + if (replica == NULL) { + addReplyError(c,"FAILOVER target HOST and IP is not " + "a replica."); + return; + } + + /* Check if requested replica is online */ + if (replica->replstate != SLAVE_STATE_ONLINE) { + addReplyError(c,"FAILOVER target replica is not online."); + return; + } + + server.target_replica_host = zstrdup(host); + server.target_replica_port = port; + serverLog(LL_NOTICE,"FAILOVER requested to %s:%ld.",host,port); + } else { + serverLog(LL_NOTICE,"FAILOVER requested to any replica."); + } + + mstime_t now = mstime(); + if (timeout_in_ms) { + server.failover_end_time = now + timeout_in_ms; + } + + server.force_failover = force_flag; + server.failover_state = FAILOVER_WAIT_FOR_SYNC; + /* Cluster failover will unpause eventually */ + pauseClients(LLONG_MAX,CLIENT_PAUSE_WRITE); + addReply(c,shared.ok); +} + +/* Failover cron function, checks coordinated failover state. + * + * Implementation note: The current implementation calls replicationSetMaster() + * to start the failover request, this has some unintended side effects if the + * failover doesn't work like blocked clients will be unblocked and replicas will + * be disconnected. This could be optimized further. + */ +void updateFailoverStatus(void) { + if (server.failover_state != FAILOVER_WAIT_FOR_SYNC) return; + mstime_t now = server.mstime; + + /* Check if failover operation has timed out */ + if (server.failover_end_time && server.failover_end_time <= now) { + if (server.force_failover) { + serverLog(LL_NOTICE, + "FAILOVER to %s:%d time out exceeded, failing over.", + server.target_replica_host, server.target_replica_port); + server.failover_state = FAILOVER_IN_PROGRESS; + /* If timeout has expired force a failover if requested. */ + replicationSetMaster(server.target_replica_host, + server.target_replica_port); + return; + } else { + /* Force was not requested, so timeout. */ + abortFailover("Replica never caught up before timeout"); + return; + } + } + + /* Check to see if the replica has caught up so failover can start */ + client *replica = NULL; + if (server.target_replica_host) { + replica = findReplica(server.target_replica_host, + server.target_replica_port); + } else { + listIter li; + listNode *ln; + + listRewind(server.slaves,&li); + /* Find any replica that has matched our repl_offset */ + while((ln = listNext(&li))) { + replica = ln->value; + if (replica->repl_ack_off == server.master_repl_offset) { + char ip[NET_IP_STR_LEN], *replicaip = replica->slave_ip; + + if (replicaip[0] == '\0') { + if (connPeerToString(replica->conn,ip,sizeof(ip),NULL) == -1) + continue; + replicaip = ip; + } + + /* We are now failing over to this specific node */ + server.target_replica_host = zstrdup(replicaip); + server.target_replica_port = replica->slave_listening_port; + break; + } + } + } + + /* We've found a replica that is caught up */ + if (replica && (replica->repl_ack_off == server.master_repl_offset)) { + server.failover_state = FAILOVER_IN_PROGRESS; + serverLog(LL_NOTICE, + "Failover target %s:%d is synced, failing over.", + server.target_replica_host, server.target_replica_port); + /* Designated replica is caught up, failover to it. */ + replicationSetMaster(server.target_replica_host, + server.target_replica_port); + } +} |