summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2015-08-06 09:23:23 +0200
committerantirez <antirez@gmail.com>2015-08-06 09:23:23 +0200
commit3e6d4d599a07ec347ef2f77c6b292223e7be6a16 (patch)
tree52e3576f35401f361708726b060f6ef255d3f764
parent7ab3af0edc238c869827c185f997c0a99f4506b5 (diff)
downloadredis-slaves_capa.tar.gz
Replication: add REPLCONF CAPA EOF support.slaves_capa
Add the concept of slaves capabilities to Redis, the slave now presents to the Redis master with a set of capabilities in the form: REPLCONF capa SOMECAPA capa OTHERCAPA ... This has the effect of setting slave->slave_capa with the corresponding SLAVE_CAPA macros that the master can test later to understand if it the slave will understand certain formats and protocols of the replication process. This makes it much simpler to introduce new replication capabilities in the future in a way that don't break old slaves or masters. This patch was designed and implemented together with Oran Agra (@oranagra).
-rw-r--r--src/networking.c1
-rw-r--r--src/replication.c56
-rw-r--r--src/server.h5
3 files changed, 51 insertions, 11 deletions
diff --git a/src/networking.c b/src/networking.c
index 7a86bf80d..782dc6c4a 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -105,6 +105,7 @@ client *createClient(int fd) {
c->repl_ack_off = 0;
c->repl_ack_time = 0;
c->slave_listening_port = 0;
+ c->slave_capa = SLAVE_CAPA_NONE;
c->reply = listCreate();
c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
diff --git a/src/replication.c b/src/replication.c
index 30d1e2994..48811f3dd 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -483,14 +483,18 @@ need_full_resync:
* socket target depending on the configuration, and making sure that
* the script cache is flushed before to start.
*
+ * The mincapa argument is the bitwise AND among all the slaves capabilities
+ * of the slaves waiting for this BGSAVE, so represents the slave capabilities
+ * all the slaves support. Can be tested via SLAVE_CAPA_* macros.
+ *
* Returns C_OK on success or C_ERR otherwise. */
-int startBgsaveForReplication(void) {
+int startBgsaveForReplication(int mincapa) {
int retval;
serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
server.repl_diskless_sync ? "slaves sockets" : "disk");
- if (server.repl_diskless_sync)
+ if (server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF))
retval = rdbSaveToSlavesSockets();
else
retval = rdbSaveBackground(server.rdb_filename);
@@ -560,7 +564,7 @@ void syncCommand(client *c) {
/* Here we need to check if there is a background saving operation
* in progress, or if it is required to start one */
- /* CASE 1: BGSAVE is in progress and replication target is disk. */
+ /* CASE 1: BGSAVE is in progress, with disk target. */
if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_DISK)
{
@@ -576,7 +580,9 @@ void syncCommand(client *c) {
slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
}
- if (ln) {
+ /* To attach this slave, we check that it has at least all the
+ * capabilities of the slave that triggered the current BGSAVE. */
+ if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer. */
copyClientOutputBuffer(c,slave);
@@ -589,7 +595,7 @@ void syncCommand(client *c) {
serverLog(LL_NOTICE,"Waiting for next BGSAVE for SYNC");
}
- /* CASE 2: BGSAVE is in progress and replication target is socket. */
+ /* CASE 2: BGSAVE is in progress, with socket target. */
} else if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
{
@@ -601,7 +607,7 @@ void syncCommand(client *c) {
/* CASE 3: There is no BGSAVE is progress. */
} else {
- if (server.repl_diskless_sync) {
+ if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
/* Diskless replication RDB child is created inside
* replicationCron() since we want to delay its start a
* few seconds to wait for more slaves to arrive. */
@@ -609,9 +615,10 @@ void syncCommand(client *c) {
if (server.repl_diskless_sync_delay)
serverLog(LL_NOTICE,"Delay next BGSAVE for SYNC");
} else {
- /* Target is disk and we don't have a BGSAVE in progress,
+ /* Target is disk (or the slave is not capable of supporting
+ * diskless replication) and we don't have a BGSAVE in progress,
* let's start one. */
- if (startBgsaveForReplication() != C_OK) {
+ if (startBgsaveForReplication(c->slave_capa) != C_OK) {
serverLog(LL_NOTICE,"Replication failed, can't BGSAVE");
addReplyError(c,"Unable to perform background save");
return;
@@ -661,6 +668,10 @@ void replconfCommand(client *c) {
&port,NULL) != C_OK))
return;
c->slave_listening_port = port;
+ } else if (!strcasecmp(c->argv[j]->ptr,"capa")) {
+ /* Ignore capabilities not understood by this master. */
+ if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
+ c->slave_capa |= SLAVE_CAPA_EOF;
} else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
/* REPLCONF ACK is used by slave to inform the master the amount
* of replication stream that it processed so far. It is an
@@ -794,6 +805,7 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
listNode *ln;
int startbgsave = 0;
+ int mincapa = -1;
listIter li;
listRewind(server.slaves,&li);
@@ -802,6 +814,8 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
startbgsave = 1;
+ mincapa = (mincapa == -1) ? slave->slave_capa :
+ (mincapa & slave->slave_capa);
replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());
} else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
struct redis_stat buf;
@@ -850,7 +864,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
}
}
if (startbgsave) {
- if (startBgsaveForReplication() != C_OK) {
+ if (startBgsaveForReplication(mincapa) != C_OK) {
listIter li;
listRewind(server.slaves,&li);
@@ -1362,7 +1376,24 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err[0] == '-') {
- serverLog(LL_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);
+ serverLog(LL_NOTICE,"(Non critical) Master does not understand "
+ "REPLCONF listening-port: %s", err);
+ }
+ sdsfree(err);
+ }
+
+ /* Inform the master of our capabilities. While we currently send
+ * just one capability, it is possible to chain new capabilities here
+ * in the form of REPLCONF capa X capa Y capa Z ...
+ * The master will ignore capabilities it does not understand. */
+ {
+ err = sendSynchronousCommand(fd,"REPLCONF","capa","eof",NULL);
+
+ /* Ignore the error if any, not all the Redis versions support
+ * REPLCONF capa. */
+ if (err[0] == '-') {
+ serverLog(LL_NOTICE,"(Non critical) Master does not understand "
+ "REPLCONF capa: %s", err);
}
sdsfree(err);
}
@@ -2145,6 +2176,7 @@ void replicationCron(void) {
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
time_t idle, max_idle = 0;
int slaves_waiting = 0;
+ int mincapa = -1;
listNode *ln;
listIter li;
@@ -2155,13 +2187,15 @@ void replicationCron(void) {
idle = server.unixtime - slave->lastinteraction;
if (idle > max_idle) max_idle = idle;
slaves_waiting++;
+ mincapa = (mincapa == -1) ? slave->slave_capa :
+ (mincapa & slave->slave_capa);
}
}
if (slaves_waiting && max_idle > server.repl_diskless_sync_delay) {
/* Start a BGSAVE. Usually with socket target, or with disk target
* if there was a recent socket -> disk config change. */
- if (startBgsaveForReplication() == C_OK) {
+ if (startBgsaveForReplication(mincapa) == C_OK) {
/* It started! We need to change the state of slaves
* from WAIT_BGSAVE_START to WAIT_BGSAVE_END in case
* the current target is disk. Otherwise it was already done
diff --git a/src/server.h b/src/server.h
index 91465cd2e..024b3e254 100644
--- a/src/server.h
+++ b/src/server.h
@@ -299,6 +299,10 @@ typedef long long mstime_t; /* millisecond time type. */
#define SLAVE_STATE_SEND_BULK 8 /* Sending RDB file to slave. */
#define SLAVE_STATE_ONLINE 9 /* RDB file transmitted, sending just updates. */
+/* Slave capabilities. */
+#define SLAVE_CAPA_NONE 0
+#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
+
/* Synchronous read timeout - slave side */
#define CONFIG_REPL_SYNCIO_TIMEOUT 5
@@ -569,6 +573,7 @@ typedef struct client {
should use. */
char replrunid[CONFIG_RUN_ID_SIZE+1]; /* master run id if this is a master */
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
+ int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
multiState mstate; /* MULTI/EXEC state */
int btype; /* Type of blocking op if CLIENT_BLOCKED. */
blockingState bpop; /* blocking state */