diff options
Diffstat (limited to 'src/replication.c')
-rw-r--r-- | src/replication.c | 84 |
1 files changed, 61 insertions, 23 deletions
diff --git a/src/replication.c b/src/replication.c index eb5fa54c0..8177eb073 100644 --- a/src/replication.c +++ b/src/replication.c @@ -892,17 +892,34 @@ void syncCommand(client *c) { } /* REPLCONF <option> <value> <option> <value> ... - * This command is used by a slave in order to configure the replication + * This command is used by a replica in order to configure the replication * process before starting it with the SYNC command. + * This command is also used by a master in order to get the replication + * offset from a replica. * - * Currently the only use of this command is to communicate to the master - * what is the listening port of the Slave redis instance, so that the - * master can accurately list slaves and their listening ports in - * the INFO output. + * Currently we support these options: * - * In the future the same command can be used in order to configure - * the replication to initiate an incremental replication instead of a - * full resync. */ + * - listening-port <port> + * - ip-address <ip> + * What is the listening ip and port of the Replica redis instance, so that + * the master can accurately lists replicas and their listening ports in the + * INFO output. + * + * - capa <eof|psync2> + * What is the capabilities of this instance. + * eof: supports EOF-style RDB transfer for diskless replication. + * psync2: supports PSYNC v2, so understands +CONTINUE <new repl ID>. + * + * - ack <offset> + * Replica informs the master the amount of replication stream that it + * processed so far. + * + * - getack + * Unlike other subcommands, this is used by master to get the replication + * offset from a replica. + * + * - rdb-only + * Only wants RDB snapshot without replication buffer. */ void replconfCommand(client *c) { int j; @@ -1136,6 +1153,8 @@ void rdbPipeWriteHandlerConnRemoved(struct connection *conn) { if (!connHasWriteHandler(conn)) return; connSetWriteHandler(conn, NULL); + client *slave = connGetPrivateData(conn); + slave->repl_last_partial_write = 0; server.rdb_pipe_numconns_writing--; /* if there are no more writes for now for this conn, or write error: */ if (server.rdb_pipe_numconns_writing == 0) { @@ -1163,8 +1182,10 @@ void rdbPipeWriteHandler(struct connection *conn) { } else { slave->repldboff += nwritten; atomicIncr(server.stat_net_output_bytes, nwritten); - if (slave->repldboff < server.rdb_pipe_bufflen) + if (slave->repldboff < server.rdb_pipe_bufflen) { + slave->repl_last_partial_write = server.unixtime; return; /* more data to write.. */ + } } rdbPipeWriteHandlerConnRemoved(conn); } @@ -1245,6 +1266,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, /* If we were unable to write all the data to one of the replicas, * setup write handler (and disable pipe read handler, below) */ if (nwritten != server.rdb_pipe_bufflen) { + slave->repl_last_partial_write = server.unixtime; server.rdb_pipe_numconns_writing++; connSetWriteHandler(conn, rdbPipeWriteHandler); } @@ -1873,8 +1895,7 @@ void readSyncBulkPayload(connection *conn) { serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success"); if (server.supervised_mode == SUPERVISED_SYSTEMD) { - redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Finished with success. Ready to accept connections.\n"); - redisCommunicateSystemd("READY=1\n"); + redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Finished with success. Ready to accept connections in read-write mode.\n"); } /* Send the initial ACK immediately to put this replica in online state. */ @@ -2434,8 +2455,7 @@ void syncWithMaster(connection *conn) { if (psync_result == PSYNC_CONTINUE) { serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization."); if (server.supervised_mode == SUPERVISED_SYSTEMD) { - redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections.\n"); - redisCommunicateSystemd("READY=1\n"); + redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections in read-write mode.\n"); } return; } @@ -2683,6 +2703,9 @@ void replicationUnsetMaster(void) { * starting from now. Otherwise the backlog will be freed after a * failover if slaves do not connect immediately. */ server.repl_no_slaves_since = server.unixtime; + + /* Reset down time so it'll be ready for when we turn into replica again. */ + server.repl_down_since = 0; /* Fire the role change modules event. */ moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED, @@ -2758,7 +2781,7 @@ void replicaofCommand(client *c) { if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK)) return; - /* Check if we are already attached to the specified slave */ + /* Check if we are already attached to the specified master */ if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) && server.masterport == port) { serverLog(LL_NOTICE,"REPLICAOF would result into synchronization " @@ -3375,13 +3398,28 @@ void replicationCron(void) { while((ln = listNext(&li))) { client *slave = ln->value; - if (slave->replstate != SLAVE_STATE_ONLINE) continue; - if (slave->flags & CLIENT_PRE_PSYNC) continue; - if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) - { - serverLog(LL_WARNING, "Disconnecting timedout replica: %s", - replicationGetSlaveName(slave)); - freeClient(slave); + if (slave->replstate == SLAVE_STATE_ONLINE) { + if (slave->flags & CLIENT_PRE_PSYNC) + continue; + if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) { + serverLog(LL_WARNING, "Disconnecting timedout replica (streaming sync): %s", + replicationGetSlaveName(slave)); + freeClient(slave); + continue; + } + } + /* We consider disconnecting only diskless replicas because disk-based replicas aren't fed + * by the fork child so if a disk-based replica is stuck it doesn't prevent the fork child + * from terminating. */ + if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) { + if (slave->repl_last_partial_write != 0 && + (server.unixtime - slave->repl_last_partial_write) > server.repl_timeout) + { + serverLog(LL_WARNING, "Disconnecting timedout replica (full sync): %s", + replicationGetSlaveName(slave)); + freeClient(slave); + continue; + } } } } @@ -3546,7 +3584,7 @@ void abortFailover(const char *err) { } /* - * FAILOVER [TO <HOST> <IP> [FORCE]] [ABORT] [TIMEOUT <timeout>] + * FAILOVER [TO <HOST> <PORT> [FORCE]] [ABORT] [TIMEOUT <timeout>] * * This command will coordinate a failover between the master and one * of its replicas. The happy path contains the following steps: @@ -3649,7 +3687,7 @@ void failoverCommand(client *c) { client *replica = findReplica(host, port); if (replica == NULL) { - addReplyError(c,"FAILOVER target HOST and IP is not " + addReplyError(c,"FAILOVER target HOST and PORT is not " "a replica."); return; } |