summaryrefslogtreecommitdiff
path: root/src/replication.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/replication.c')
-rw-r--r--src/replication.c84
1 files changed, 61 insertions, 23 deletions
diff --git a/src/replication.c b/src/replication.c
index eb5fa54c0..8177eb073 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -892,17 +892,34 @@ void syncCommand(client *c) {
}
/* REPLCONF <option> <value> <option> <value> ...
- * This command is used by a slave in order to configure the replication
+ * This command is used by a replica in order to configure the replication
* process before starting it with the SYNC command.
+ * This command is also used by a master in order to get the replication
+ * offset from a replica.
*
- * Currently the only use of this command is to communicate to the master
- * what is the listening port of the Slave redis instance, so that the
- * master can accurately list slaves and their listening ports in
- * the INFO output.
+ * Currently we support these options:
*
- * In the future the same command can be used in order to configure
- * the replication to initiate an incremental replication instead of a
- * full resync. */
+ * - listening-port <port>
+ * - ip-address <ip>
+ * What is the listening ip and port of the Replica redis instance, so that
+ * the master can accurately lists replicas and their listening ports in the
+ * INFO output.
+ *
+ * - capa <eof|psync2>
+ * What is the capabilities of this instance.
+ * eof: supports EOF-style RDB transfer for diskless replication.
+ * psync2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
+ *
+ * - ack <offset>
+ * Replica informs the master the amount of replication stream that it
+ * processed so far.
+ *
+ * - getack
+ * Unlike other subcommands, this is used by master to get the replication
+ * offset from a replica.
+ *
+ * - rdb-only
+ * Only wants RDB snapshot without replication buffer. */
void replconfCommand(client *c) {
int j;
@@ -1136,6 +1153,8 @@ void rdbPipeWriteHandlerConnRemoved(struct connection *conn) {
if (!connHasWriteHandler(conn))
return;
connSetWriteHandler(conn, NULL);
+ client *slave = connGetPrivateData(conn);
+ slave->repl_last_partial_write = 0;
server.rdb_pipe_numconns_writing--;
/* if there are no more writes for now for this conn, or write error: */
if (server.rdb_pipe_numconns_writing == 0) {
@@ -1163,8 +1182,10 @@ void rdbPipeWriteHandler(struct connection *conn) {
} else {
slave->repldboff += nwritten;
atomicIncr(server.stat_net_output_bytes, nwritten);
- if (slave->repldboff < server.rdb_pipe_bufflen)
+ if (slave->repldboff < server.rdb_pipe_bufflen) {
+ slave->repl_last_partial_write = server.unixtime;
return; /* more data to write.. */
+ }
}
rdbPipeWriteHandlerConnRemoved(conn);
}
@@ -1245,6 +1266,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
/* If we were unable to write all the data to one of the replicas,
* setup write handler (and disable pipe read handler, below) */
if (nwritten != server.rdb_pipe_bufflen) {
+ slave->repl_last_partial_write = server.unixtime;
server.rdb_pipe_numconns_writing++;
connSetWriteHandler(conn, rdbPipeWriteHandler);
}
@@ -1873,8 +1895,7 @@ void readSyncBulkPayload(connection *conn) {
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");
if (server.supervised_mode == SUPERVISED_SYSTEMD) {
- redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Finished with success. Ready to accept connections.\n");
- redisCommunicateSystemd("READY=1\n");
+ redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Finished with success. Ready to accept connections in read-write mode.\n");
}
/* Send the initial ACK immediately to put this replica in online state. */
@@ -2434,8 +2455,7 @@ void syncWithMaster(connection *conn) {
if (psync_result == PSYNC_CONTINUE) {
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");
if (server.supervised_mode == SUPERVISED_SYSTEMD) {
- redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections.\n");
- redisCommunicateSystemd("READY=1\n");
+ redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections in read-write mode.\n");
}
return;
}
@@ -2683,6 +2703,9 @@ void replicationUnsetMaster(void) {
* starting from now. Otherwise the backlog will be freed after a
* failover if slaves do not connect immediately. */
server.repl_no_slaves_since = server.unixtime;
+
+ /* Reset down time so it'll be ready for when we turn into replica again. */
+ server.repl_down_since = 0;
/* Fire the role change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
@@ -2758,7 +2781,7 @@ void replicaofCommand(client *c) {
if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK))
return;
- /* Check if we are already attached to the specified slave */
+ /* Check if we are already attached to the specified master */
if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
&& server.masterport == port) {
serverLog(LL_NOTICE,"REPLICAOF would result into synchronization "
@@ -3375,13 +3398,28 @@ void replicationCron(void) {
while((ln = listNext(&li))) {
client *slave = ln->value;
- if (slave->replstate != SLAVE_STATE_ONLINE) continue;
- if (slave->flags & CLIENT_PRE_PSYNC) continue;
- if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
- {
- serverLog(LL_WARNING, "Disconnecting timedout replica: %s",
- replicationGetSlaveName(slave));
- freeClient(slave);
+ if (slave->replstate == SLAVE_STATE_ONLINE) {
+ if (slave->flags & CLIENT_PRE_PSYNC)
+ continue;
+ if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) {
+ serverLog(LL_WARNING, "Disconnecting timedout replica (streaming sync): %s",
+ replicationGetSlaveName(slave));
+ freeClient(slave);
+ continue;
+ }
+ }
+ /* We consider disconnecting only diskless replicas because disk-based replicas aren't fed
+ * by the fork child so if a disk-based replica is stuck it doesn't prevent the fork child
+ * from terminating. */
+ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) {
+ if (slave->repl_last_partial_write != 0 &&
+ (server.unixtime - slave->repl_last_partial_write) > server.repl_timeout)
+ {
+ serverLog(LL_WARNING, "Disconnecting timedout replica (full sync): %s",
+ replicationGetSlaveName(slave));
+ freeClient(slave);
+ continue;
+ }
}
}
}
@@ -3546,7 +3584,7 @@ void abortFailover(const char *err) {
}
/*
- * FAILOVER [TO <HOST> <IP> [FORCE]] [ABORT] [TIMEOUT <timeout>]
+ * FAILOVER [TO <HOST> <PORT> [FORCE]] [ABORT] [TIMEOUT <timeout>]
*
* This command will coordinate a failover between the master and one
* of its replicas. The happy path contains the following steps:
@@ -3649,7 +3687,7 @@ void failoverCommand(client *c) {
client *replica = findReplica(host, port);
if (replica == NULL) {
- addReplyError(c,"FAILOVER target HOST and IP is not "
+ addReplyError(c,"FAILOVER target HOST and PORT is not "
"a replica.");
return;
}