summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2013-01-30 18:33:16 +0100
committerantirez <antirez@gmail.com>2013-02-12 12:52:21 +0100
commit078882025ea50e0ed888713fba8ce28299de626a (patch)
treec88e4bbde92c4ff9c6f9a51d002cbfb9456893d0
parente34a35a51194650ce7ef7df1047012c7eaad2957 (diff)
downloadredis-078882025ea50e0ed888713fba8ce28299de626a.tar.gz
PSYNC: work in progress, preview #2, rebased to unstable.
-rw-r--r--redis.conf22
-rw-r--r--src/config.c20
-rw-r--r--src/db.c3
-rw-r--r--src/multi.c3
-rw-r--r--src/networking.c68
-rw-r--r--src/redis.c30
-rw-r--r--src/redis.h34
-rw-r--r--src/replication.c651
8 files changed, 758 insertions, 73 deletions
diff --git a/redis.conf b/redis.conf
index 2f176f247..db1402795 100644
--- a/redis.conf
+++ b/redis.conf
@@ -227,6 +227,28 @@ slave-read-only yes
# be a good idea.
repl-disable-tcp-nodelay no
+# Set the replication backlog size. The backlog is a buffer that accumulates
+# slave data when slaves are disconnected for some time, so that when a slave
+# wants to reconnect again, often a full resync is not needed, but a partial
+# resync is enough, just passing the portion of data the slave missed while
+# disconnected.
+#
+# The biggest the replication backlog, the longer the time the slave can be
+# disconnected and later be able to perform a partial resynchronization.
+#
+# The backlog is only allocated once there is at least a slave connected.
+#
+# repl-backlog-size 1mb
+
+# After a master has no longer connected slaves for some time, the backlog
+# will be freed. The following option configures the amount of seconds that
+# need to elapse, starting from the time the last slave disconnected, for
+# the backlog buffer to be freed.
+#
+# A value of 0 means to never release the backlog.
+#
+# repl-backlog-ttl 3600
+
# The slave priority is an integer number published by Redis in the INFO output.
# It is used by Redis Sentinel in order to select a slave to promote into a
# master if the master is no longer working correctly.
diff --git a/src/config.c b/src/config.c
index fea3a0349..32b949411 100644
--- a/src/config.c
+++ b/src/config.c
@@ -238,6 +238,18 @@ void loadServerConfigFromString(char *config) {
} else if (!strcasecmp(argv[0],"repl-disable-tcp-nodelay") && argc==2) {
if ((server.repl_disable_tcp_nodelay = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
+ } else if (!strcasecmp(argv[0],"repl-backlog-size") && argc == 2) {
+ long long size = strtoll(argv[0],NULL,10);
+ if (size <= 0) {
+ err = "repl-backlog-size must be 1 or greater.";
+ goto loaderr;
+ }
+ resizeReplicationBacklog(size);
+ } else if (!strcasecmp(argv[0],"repl-backlog-ttl") && argc == 2) {
+ server.repl_backlog_time_limit = atoi(argv[1]);
+ if (server.repl_backlog_time_limit < 0) {
+ err = "repl-backlog-ttl can't be negative ";
+ goto loaderr;
}
} else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
server.masterauth = zstrdup(argv[1]);
@@ -719,6 +731,12 @@ void configSetCommand(redisClient *c) {
} else if (!strcasecmp(c->argv[2]->ptr,"repl-timeout")) {
if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll <= 0) goto badfmt;
server.repl_timeout = ll;
+ } else if (!strcasecmp(c->argv[2]->ptr,"repl-backlog-size")) {
+ if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll <= 0) goto badfmt;
+ resizeReplicationBacklog(ll);
+ } else if (!strcasecmp(c->argv[2]->ptr,"repl-backlog-ttl")) {
+ if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt;
+ server.repl_backlog_time_limit = ll;
} else if (!strcasecmp(c->argv[2]->ptr,"watchdog-period")) {
if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt;
if (ll)
@@ -832,6 +850,8 @@ void configGetCommand(redisClient *c) {
config_get_numerical_field("databases",server.dbnum);
config_get_numerical_field("repl-ping-slave-period",server.repl_ping_slave_period);
config_get_numerical_field("repl-timeout",server.repl_timeout);
+ config_get_numerical_field("repl-backlog-size",server.repl_backlog_size);
+ config_get_numerical_field("repl-backlog-ttl",server.repl_backlog_time_limit);
config_get_numerical_field("maxclients",server.maxclients);
config_get_numerical_field("watchdog-period",server.watchdog_period);
config_get_numerical_field("slave-priority",server.slave_priority);
diff --git a/src/db.c b/src/db.c
index 8429aa4b5..2de7e2c4c 100644
--- a/src/db.c
+++ b/src/db.c
@@ -511,8 +511,7 @@ void propagateExpire(redisDb *db, robj *key) {
if (server.aof_state != REDIS_AOF_OFF)
feedAppendOnlyFile(server.delCommand,db->id,argv,2);
- if (listLength(server.slaves))
- replicationFeedSlaves(server.slaves,db->id,argv,2);
+ replicationFeedSlaves(server.slaves,db->id,argv,2);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
diff --git a/src/multi.c b/src/multi.c
index dfac15c34..ba8baf0a2 100644
--- a/src/multi.c
+++ b/src/multi.c
@@ -108,8 +108,7 @@ void execCommandReplicateMulti(redisClient *c) {
if (server.aof_state != REDIS_AOF_OFF)
feedAppendOnlyFile(server.multiCommand,c->db->id,&multistring,1);
- if (listLength(server.slaves))
- replicationFeedSlaves(server.slaves,c->db->id,&multistring,1);
+ replicationFeedSlaves(server.slaves,c->db->id,&multistring,1);
decrRefCount(multistring);
}
diff --git a/src/networking.c b/src/networking.c
index 66a2702c5..b90936011 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -87,6 +87,7 @@ redisClient *createClient(int fd) {
c->ctime = c->lastinteraction = server.unixtime;
c->authenticated = 0;
c->replstate = REDIS_REPL_NONE;
+ c->reploff = 0;
c->slave_listening_port = 0;
c->reply = listCreate();
c->reply_bytes = 0;
@@ -595,12 +596,42 @@ void disconnectSlaves(void) {
}
}
+/* This function is called when the slave lose the connection with the
+ * master into an unexpected way. */
+void replicationHandleMasterDisconnection(void) {
+ server.master = NULL;
+ server.repl_state = REDIS_REPL_CONNECT;
+ server.repl_down_since = server.unixtime;
+ /* We lost connection with our master, force our slaves to resync
+ * with us as well to load the new data set.
+ *
+ * If server.masterhost is NULL the user called SLAVEOF NO ONE so
+ * slave resync is not needed. */
+ if (server.masterhost != NULL) disconnectSlaves();
+}
+
void freeClient(redisClient *c) {
listNode *ln;
/* If this is marked as current client unset it */
if (server.current_client == c) server.current_client = NULL;
+ /* If it is our master that's beging disconnected we should make sure
+ * to cache the state to try a partial resynchronization later.
+ *
+ * Note that before doing this we make sure that the client is not in
+ * some unexpected state, by checking its flags. */
+ if (server.master &&
+ (c->flags & REDIS_MASTER) &&
+ !(c->flags & (REDIS_CLOSE_AFTER_REPLY|
+ REDIS_CLOSE_ASAP|
+ REDIS_BLOCKED|
+ REDIS_UNBLOCKED)))
+ {
+ replicationCacheMaster(c);
+ return;
+ }
+
/* Note that if the client we are freeing is blocked into a blocking
* call, we have to set querybuf to NULL *before* to call
* unblockClientWaitingData() to avoid processInputBuffer() will get
@@ -620,16 +651,21 @@ void freeClient(redisClient *c) {
pubsubUnsubscribeAllPatterns(c,0);
dictRelease(c->pubsub_channels);
listRelease(c->pubsub_patterns);
- /* Obvious cleanup */
- aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
- aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
+ /* Close socket, unregister events, and remove list of replies and
+ * accumulated arguments. */
+ if (c->fd != -1) {
+ aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
+ aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
+ close(c->fd);
+ }
listRelease(c->reply);
freeClientArgv(c);
- close(c->fd);
/* Remove from the list of clients */
- ln = listSearchKey(server.clients,c);
- redisAssert(ln != NULL);
- listDelNode(server.clients,ln);
+ if (c->fd != -1) {
+ ln = listSearchKey(server.clients,c);
+ redisAssert(ln != NULL);
+ listDelNode(server.clients,ln);
+ }
/* When client was just unblocked because of a blocking operation,
* remove it from the list with unblocked clients. */
if (c->flags & REDIS_UNBLOCKED) {
@@ -647,20 +683,15 @@ void freeClient(redisClient *c) {
ln = listSearchKey(l,c);
redisAssert(ln != NULL);
listDelNode(l,ln);
+ /* We need to remember the time when we started to have zero
+ * attached slaves, as after some time we'll free the replication
+ * backlog. */
+ if (c->flags & REDIS_SLAVE && listLength(server.slaves) == 0)
+ server.repl_no_slaves_since = server.unixtime;
}
/* Case 2: we lost the connection with the master. */
- if (c->flags & REDIS_MASTER) {
- server.master = NULL;
- server.repl_state = REDIS_REPL_CONNECT;
- server.repl_down_since = server.unixtime;
- /* We lost connection with our master, force our slaves to resync
- * with us as well to load the new data set.
- *
- * If server.masterhost is NULL the user called SLAVEOF NO ONE so
- * slave resync is not needed. */
- if (server.masterhost != NULL) disconnectSlaves();
- }
+ if (c->flags & REDIS_MASTER) replicationHandleMasterDisconnection();
/* If this client was scheduled for async freeing we need to remove it
* from the queue. */
@@ -1059,6 +1090,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
if (nread) {
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
+ if (c->flags & REDIS_MASTER) c->reploff += nread;
} else {
server.current_client = NULL;
return;
diff --git a/src/redis.c b/src/redis.c
index 863c54e25..7300cd89f 100644
--- a/src/redis.c
+++ b/src/redis.c
@@ -220,6 +220,7 @@ struct redisCommand redisCommandTable[] = {
{"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0},
{"discard",discardCommand,1,"rs",0,NULL,0,0,0,0,0},
{"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
+ {"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0},
{"replconf",replconfCommand,-1,"ars",0,NULL,0,0,0,0,0},
{"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0},
{"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0},
@@ -1202,6 +1203,8 @@ void initServerConfig() {
server.masterhost = NULL;
server.masterport = 6379;
server.master = NULL;
+ server.cached_master = NULL;
+ server.repl_master_initial_offset = -1;
server.repl_state = REDIS_REPL_NONE;
server.repl_syncio_timeout = REDIS_REPL_SYNCIO_TIMEOUT;
server.repl_serve_stale_data = 1;
@@ -1209,6 +1212,16 @@ void initServerConfig() {
server.repl_down_since = time(NULL);
server.repl_disable_tcp_nodelay = 0;
server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY;
+ server.master_repl_offset = 0;
+
+ /* Replication partial resync backlog */
+ server.repl_backlog = NULL;
+ server.repl_backlog_size = REDIS_DEFAULT_REPL_BACKLOG_SIZE;
+ server.repl_backlog_histlen = 0;
+ server.repl_backlog_idx = 0;
+ server.repl_backlog_off = 0;
+ server.repl_backlog_time_limit = REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT;
+ server.repl_no_slaves_since = time(NULL);
/* Client output buffer limits */
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].hard_limit_bytes = 0;
@@ -1522,7 +1535,7 @@ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
{
if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
- if (flags & REDIS_PROPAGATE_REPL && listLength(server.slaves))
+ if (flags & REDIS_PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
@@ -2151,13 +2164,15 @@ sds genRedisInfoString(char *section) {
"master_link_status:%s\r\n"
"master_last_io_seconds_ago:%d\r\n"
"master_sync_in_progress:%d\r\n"
+ "slave_repl_offset:%lld\r\n"
,server.masterhost,
server.masterport,
(server.repl_state == REDIS_REPL_CONNECTED) ?
"up" : "down",
server.master ?
((int)(server.unixtime-server.master->lastinteraction)) : -1,
- server.repl_state == REDIS_REPL_TRANSFER
+ server.repl_state == REDIS_REPL_TRANSFER,
+ server.master ? server.master->reploff : -1
);
if (server.repl_state == REDIS_REPL_TRANSFER) {
@@ -2215,6 +2230,17 @@ sds genRedisInfoString(char *section) {
slaveid++;
}
}
+ info = sdscatprintf(info,
+ "master_repl_offset:%lld\r\n"
+ "repl_backlog_active:%d\r\n"
+ "repl_backlog_size:%lld\r\n"
+ "repl_backlog_first_byte_offset:%lld\r\n"
+ "repl_backlog_histlen:%lld\r\n",
+ server.master_repl_offset,
+ server.repl_backlog != NULL,
+ server.repl_backlog_size,
+ server.repl_backlog_off,
+ server.repl_backlog_histlen);
}
/* CPU */
diff --git a/src/redis.h b/src/redis.h
index 8cb682fb0..90b95c5cf 100644
--- a/src/redis.h
+++ b/src/redis.h
@@ -93,6 +93,9 @@
#define REDIS_REPL_PING_SLAVE_PERIOD 10
#define REDIS_RUN_ID_SIZE 40
#define REDIS_OPS_SEC_SAMPLES 16
+#define REDIS_DEFAULT_REPL_BACKLOG_SIZE (1024*1024) /* 1mb */
+#define REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT (60*60) /* 1 hour */
+#define REDIS_REPL_BACKLOG_MIN_SIZE (1024*16) /* 16k */
/* Protocol and I/O related defines */
#define REDIS_MAX_QUERYBUF_LEN (1024*1024*1024) /* 1GB max query buffer. */
@@ -100,6 +103,7 @@
#define REDIS_REPLY_CHUNK_BYTES (16*1024) /* 16k output buffer */
#define REDIS_INLINE_MAX_SIZE (1024*64) /* Max size of inline reads */
#define REDIS_MBULK_BIG_ARG (1024*32)
+#define REDIS_LONGSTR_SIZE 21 /* Bytes needed for long -> str */
/* Hash table parameters */
#define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
@@ -407,7 +411,8 @@ typedef struct redisClient {
long bulklen; /* length of bulk argument in multi bulk request */
list *reply;
unsigned long reply_bytes; /* Tot bytes of objects in reply list */
- int sentlen;
+ int sentlen; /* Amount of bytes already sent in the current
+ buffer or object being sent. */
time_t ctime; /* Client creation time */
time_t lastinteraction; /* time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time;
@@ -417,6 +422,8 @@ typedef struct redisClient {
int repldbfd; /* replication DB file descriptor */
long repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
+ long long reploff; /* replication offset if this is our master */
+ char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
multiState mstate; /* MULTI/EXEC state */
blockingState bpop; /* blocking state */
@@ -662,7 +669,6 @@ struct redisServer {
list *clients; /* List of active clients */
list *clients_to_close; /* Clients to close asynchronously */
list *slaves, *monitors; /* List of slaves and MONITORs */
- int slaveseldb; /* Last SELECTed DB in replication output */
redisClient *current_client; /* Current client, only used on crash report */
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
@@ -745,13 +751,27 @@ struct redisServer {
int syslog_enabled; /* Is syslog enabled? */
char *syslog_ident; /* Syslog ident */
int syslog_facility; /* Syslog facility */
- /* Slave specific fields */
+ /* Replication (master) */
+ int slaveseldb; /* Last SELECTed DB in replication output */
+ long long master_repl_offset; /* Global replication offset */
+ int repl_ping_slave_period; /* Master pings the slave every N seconds */
+ char *repl_backlog; /* Replication backlog for partial syncs */
+ long long repl_backlog_size; /* Backlog circular buffer size */
+ long long repl_backlog_histlen; /* Backlog actual data length */
+ long long repl_backlog_idx; /* Backlog circular buffer current offset */
+ long long repl_backlog_off; /* Replication offset of first byte in the
+ backlog buffer. */
+ time_t repl_backlog_time_limit; /* Time without slaves after the backlog
+ gets released. */
+ time_t repl_no_slaves_since; /* We have no slaves since that time.
+ Only valid if server.slaves len is 0. */
+ /* Replication (slave) */
char *masterauth; /* AUTH with this password with master */
char *masterhost; /* Hostname of master */
int masterport; /* Port of master */
- int repl_ping_slave_period; /* Master pings the slave every N seconds */
int repl_timeout; /* Timeout after N seconds of master idle */
redisClient *master; /* Client that is master for this slave */
+ redisClient *cached_master; /* Cached master to be reused for PSYNC. */
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
int repl_state; /* Replication status if the instance is a slave */
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
@@ -766,6 +786,8 @@ struct redisServer {
time_t repl_down_since; /* Unix time at which link with master went down */
int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */
int slave_priority; /* Reported in INFO and used by Sentinel. */
+ char repl_master_runid[REDIS_RUN_ID_SIZE+1]; /* Master run id for PSYNC. */
+ long long repl_master_initial_offset; /* Master PSYNC offset. */
/* Limits */
unsigned int maxclients; /* Max number of simultaneous clients */
unsigned long long maxmemory; /* Max number of memory bytes to use */
@@ -930,6 +952,7 @@ void exitFromChild(int retcode);
redisClient *createClient(int fd);
void closeTimedoutClients(void);
void freeClient(redisClient *c);
+void freeClientAsync(redisClient *c);
void resetClient(redisClient *c);
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask);
void addReply(redisClient *c, robj *obj);
@@ -1053,6 +1076,9 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc);
void updateSlavesWaitingBgsave(int bgsaveerr);
void replicationCron(void);
+void replicationHandleMasterDisconnection(void);
+void replicationCacheMaster(redisClient *c);
+void resizeReplicationBacklog(long long newsize);
/* Generic persistence functions */
void startLoading(FILE *fp);
diff --git a/src/replication.c b/src/replication.c
index 872cf4735..0458dc0ee 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -37,13 +37,234 @@
#include <sys/socket.h>
#include <sys/stat.h>
+void replicationDiscardCachedMaster(void);
+void replicationResurrectCachedMaster(int newfd);
+
/* ---------------------------------- MASTER -------------------------------- */
+void createReplicationBacklog(void) {
+ redisAssert(server.repl_backlog == NULL);
+ server.repl_backlog = zmalloc(server.repl_backlog_size);
+ server.repl_backlog_histlen = 0;
+ server.repl_backlog_idx = 0;
+ /* When a new backlog buffer is created, we increment the replication
+ * offset by one to make sure we'll not be able to PSYNC with any
+ * previous slave. This is needed because we avoid incrementing the
+ * master_repl_offset if no backlog exists nor slaves are attached. */
+ server.master_repl_offset++;
+
+ /* We don't have any data inside our buffer, but virtually the first
+ * byte we have is the next byte that will be generated for the
+ * replication stream. */
+ server.repl_backlog_off = server.master_repl_offset+1;
+}
+
+/* This function is called when the user modifies the replication backlog
+ * size at runtime. It is up to the function to both update the
+ * server.repl_backlog_size and to resize the buffer and setup it so that
+ * it contains the same data as the previous one (possibly less data, but
+ * the most recent bytes, or the same data and more free space in case the
+ * buffer is enlarged). */
+void resizeReplicationBacklog(long long newsize) {
+ if (newsize < REDIS_REPL_BACKLOG_MIN_SIZE)
+ newsize = REDIS_REPL_BACKLOG_MIN_SIZE;
+ if (server.repl_backlog_size == newsize) return;
+
+ server.repl_backlog_size = newsize;
+ if (server.repl_backlog != NULL) {
+ /* What we actually do is to flush the old buffer and realloc a new
+ * empty one. It will refill with new data incrementally.
+ * The reason is that copying a few gigabytes adds latency and even
+ * worse often we need to alloc additional space before freeing the
+ * old buffer. */
+ zfree(server.repl_backlog);
+ server.repl_backlog = zmalloc(server.repl_backlog_size);
+ server.repl_backlog_histlen = 0;
+ server.repl_backlog_idx = 0;
+ /* Next byte we have is... the next since the buffer is emtpy. */
+ server.repl_backlog_off = server.master_repl_offset+1;
+ }
+}
+
+void freeReplicationBacklog(void) {
+ redisAssert(server.repl_backlog != NULL);
+ zfree(server.repl_backlog);
+ server.repl_backlog = NULL;
+}
+
+/* Add data to the replication backlog.
+ * This function also increments the global replication offset stored at
+ * server.master_repl_offset, because there is no case where we want to feed
+ * the backlog without incrementing the buffer. */
+void feedReplicationBacklog(void *ptr, size_t len) {
+ unsigned char *p = ptr;
+
+ server.master_repl_offset += len;
+
+ /* This is a circular buffer, so write as much data we can at every
+ * iteration and rewind the "idx" index if we reach the limit. */
+ while(len) {
+ size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
+ if (thislen > len) thislen = len;
+ memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
+ server.repl_backlog_idx += thislen;
+ if (server.repl_backlog_idx == server.repl_backlog_size)
+ server.repl_backlog_idx = 0;
+ len -= thislen;
+ p += thislen;
+ server.repl_backlog_histlen += thislen;
+ }
+ if (server.repl_backlog_histlen > server.repl_backlog_size)
+ server.repl_backlog_histlen = server.repl_backlog_size;
+ /* Set the offset of the first byte we have in the backlog. */
+ server.repl_backlog_off = server.master_repl_offset -
+ server.repl_backlog_histlen + 1;
+}
+
+/* Wrapper for feedReplicationBacklog() that takes Redis string objects
+ * as input. */
+void feedReplicationBacklogWithObject(robj *o) {
+ char llstr[REDIS_LONGSTR_SIZE];
+ void *p;
+ size_t len;
+
+ if (o->encoding == REDIS_ENCODING_INT) {
+ len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
+ p = llstr;
+ } else {
+ len = sdslen(o->ptr);
+ p = o->ptr;
+ }
+ feedReplicationBacklog(p,len);
+}
+
+#define FEEDSLAVE_BUF_SIZE (1024*64)
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
- int j;
+ int j, i, len;
+ char buf[FEEDSLAVE_BUF_SIZE], *b = buf;
+ char llstr[REDIS_LONGSTR_SIZE];
+ int buf_left = FEEDSLAVE_BUF_SIZE;
+ robj *o;
+
+ /* If there aren't slaves, and there is no backlog buffer to populate,
+ * we can return ASAP. */
+ if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
+
+ /* We can't have slaves attached and no backlog. */
+ redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
+
+ /* What we do here is to try to write as much data as possible in a static
+ * buffer "buf" that is used to create an object that is later sent to all
+ * the slaves. This way we do the decoding only one time for most commands
+ * not containing big payloads. */
+
+ /* Create the SELECT command into the static buffer if needed. */
+ if (server.slaveseldb != dictid) {
+ char *selectcmd;
+ size_t sclen;
+
+ if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
+ selectcmd = shared.select[dictid]->ptr;
+ sclen = sdslen(selectcmd);
+ memcpy(b,selectcmd,sclen);
+ b += sclen;
+ buf_left -= sclen;
+ } else {
+ int dictid_len;
+
+ dictid_len = ll2string(llstr,sizeof(llstr),dictid);
+ sclen = snprintf(b,buf_left,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
+ dictid_len, llstr);
+ b += sclen;
+ buf_left -= sclen;
+ }
+ }
+ server.slaveseldb = dictid;
+
+ /* Add the multi bulk reply size to the static buffer, that is, the number
+ * of arguments of the command to send to every slave. */
+ b[0] = '*';
+ len = ll2string(b+1,REDIS_LONGSTR_SIZE,argc);
+ b += len+1;
+ buf_left -= len;
+ b[0] = '\r';
+ b[1] = '\n';
+ b += 2;
+ buf_left -= 2;
+
+ /* Try to use the static buffer for as much arguments is possible. */
+ for (j = 0; j < argc; j++) {
+ int objlen;
+ char *objptr;
+
+ if (argv[j]->encoding != REDIS_ENCODING_RAW &&
+ argv[j]->encoding != REDIS_ENCODING_INT) {
+ redisPanic("Unexpected encoding");
+ }
+ if (argv[j]->encoding == REDIS_ENCODING_RAW) {
+ objlen = sdslen(argv[j]->ptr);
+ objptr = argv[j]->ptr;
+ } else {
+ objlen = ll2string(llstr,REDIS_LONGSTR_SIZE,(long)argv[j]->ptr);
+ objptr = llstr;
+ }
+ /* We need enough space for bulk reply encoding, newlines, and
+ * the data itself. */
+ if (buf_left < objlen+REDIS_LONGSTR_SIZE+32) break;
+
+ /* Write $...CRLF */
+ b[0] = '$';
+ len = ll2string(b+1,REDIS_LONGSTR_SIZE,objlen);
+ b += len+1;
+ buf_left -= len;
+ b[0] = '\r';
+ b[1] = '\n';
+ b += 2;
+ buf_left -= 2;
+
+ /* And data plus CRLF */
+ memcpy(b,objptr,objlen);
+ b += objlen;
+ buf_left -= objlen;
+ b[0] = '\r';
+ b[1] = '\n';
+ b += 2;
+ buf_left -= 2;
+ }
+
+ /* Create an object with the static buffer content. */
+ redisAssert(buf_left < FEEDSLAVE_BUF_SIZE);
+ o = createStringObject(buf,b-buf);
+
+ /* If we have a backlog, populate it with data and increment
+ * the global replication offset. */
+ if (server.repl_backlog) {
+ feedReplicationBacklogWithObject(o);
+ for (i = j; i < argc; i++) {
+ char aux[REDIS_LONGSTR_SIZE+3];
+ long objlen = stringObjectLen(argv[i]);
+
+ /* We need to feed the buffer with the object as a bulk reply
+ * not just as a plain string, so create the $..CRLF payload len
+ * ad add the final CRLF */
+ aux[0] = '$';
+ len = ll2string(aux+1,objlen,sizeof(aux)-1);
+ aux[len+1] = '\r';
+ aux[len+2] = '\n';
+ feedReplicationBacklog(aux,len+3);
+ feedReplicationBacklogWithObject(argv[j]);
+ feedReplicationBacklogWithObject(shared.crlf);
+ }
+ }
+ /* Write data to slaves. Here we do two things:
+ * 1) We write the "o" object that was created using the accumulated
+ * static buffer.
+ * 2) We write any additional argument of the command to replicate that
+ * was not written inside the static buffer for lack of space.
+ */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
@@ -54,29 +275,16 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
/* Feed slaves that are waiting for the initial SYNC (so these commands
* are queued in the output buffer until the initial SYNC completes),
* or are already in sync with the master. */
- if (server.slaveseldb != dictid) {
- robj *selectcmd;
-
- if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
- selectcmd = shared.select[dictid];
- incrRefCount(selectcmd);
- } else {
- char dictid_str[64];
- int dictid_len;
-
- dictid_len = ll2string(dictid_str,sizeof(dictid_str),dictid);
- selectcmd = createObject(REDIS_STRING,
- sdscatprintf(sdsempty(),
- "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
- dictid_len, dictid_str));
- }
- addReply(slave,selectcmd);
- decrRefCount(selectcmd);
- }
- addReplyMultiBulkLen(slave,argc);
- for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]);
+
+ /* First, trasmit the object created from the static buffer. */
+ addReply(slave,o);
+
+ /* Finally any additional argument that was not stored inside the
+ * static buffer if any (from j to argc). */
+ for (i = j; i < argc; i++)
+ addReplyBulk(slave,argv[i]);
}
- server.slaveseldb = dictid;
+ decrRefCount(o);
}
void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) {
@@ -120,6 +328,120 @@ void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **
decrRefCount(cmdobj);
}
+/* Feed the slave 'c' with the replication backlog starting from the
+ * specified 'offset' up to the end of the backlog. */
+long long addReplyReplicationBacklog(redisClient *c, long long offset) {
+ long long j, skip, len;
+
+// printf("SLAVE REQUEST %lld\n", offset);
+
+ if (server.repl_backlog_histlen == 0) {
+// printf("NO HISTORY\n");
+ return 0;
+ }
+
+// printf("FIRST BYTE WE HAVE %lld\n", server.repl_backlog_off);
+// printf("HISTLEN %lld\n", server.repl_backlog_histlen);
+// printf("IDX %lld\n", server.repl_backlog_idx);
+
+ /* Compute the amount of bytes we need to discard. */
+ skip = offset - server.repl_backlog_off;
+// printf("SKIP %lld\n", skip);
+
+ /* Point j to the oldest byte, that is actaully our
+ * server.repl_backlog_off byte. */
+ j = (server.repl_backlog_idx +
+ (server.repl_backlog_size-server.repl_backlog_histlen)) %
+ server.repl_backlog_size;
+// printf("J %lld\n", j);
+
+ /* Discard the amount of data to seek to the specified 'offset'. */
+ j = (j + skip) % server.repl_backlog_size;
+
+ /* Feed slave with data. Since it is a circular buffer we have to
+ * split the reply in two parts if we are cross-boundary. */
+ len = server.repl_backlog_histlen - skip;
+// printf("LEN %lld\n", len);
+ while(len) {
+ long long thislen =
+ ((server.repl_backlog_size - j) < len) ?
+ (server.repl_backlog_size - j) : len;
+
+// printf("WRITE %lld\n", thislen);
+ addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));
+ len -= thislen;
+ j = 0;
+ }
+ return server.repl_backlog_histlen - skip;
+}
+
+/* This function handles the PSYNC command from the point of view of a
+ * master receiving a request for partial resynchronization.
+ *
+ * On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed
+ * with the usual full resync. */
+int masterTryPartialResynchronization(redisClient *c) {
+ long long psync_offset, psync_len;
+ char *master_runid = c->argv[1]->ptr;
+
+ /* Is the runid of this master the same advertised by the wannabe slave
+ * via PSYNC? If runid changed this master is a different instance and
+ * there is no way to continue. */
+ if (strcasecmp(master_runid, server.runid)) {
+ /* Run id "?" is used by slaves that want to force a full resync. */
+ if (master_runid[0] != '?') {
+ redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
+ "Runid mismatch (Client asked for '%s', I'm '%s')",
+ master_runid, server.runid);
+ } else {
+ redisLog(REDIS_NOTICE,"Full resync requested by slave.");
+ }
+ goto need_full_resync;
+ }
+
+ /* We still have the data our slave is asking for? */
+ if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
+ REDIS_OK) goto need_full_resync;
+ if (!server.repl_backlog ||
+ psync_offset < server.repl_backlog_off ||
+ psync_offset >= (server.repl_backlog_off + server.repl_backlog_size))
+ {
+ redisLog(REDIS_NOTICE,
+ "Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset);
+ goto need_full_resync;
+ }
+
+ /* If we reached this point, we are able to perform a partial resync:
+ * 1) Set client state to make it a slave.
+ * 2) Inform the client we can continue with +CONTINUE
+ * 3) Send the backlog data (from the offset to the end) to the slave. */
+ c->flags |= REDIS_SLAVE;
+ c->replstate = REDIS_REPL_ONLINE;
+ listAddNodeTail(server.slaves,c);
+ addReplySds(c,sdsnew("+CONTINUE\r\n"));
+ psync_len = addReplyReplicationBacklog(c,psync_offset);
+ redisLog(REDIS_NOTICE,
+ "Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset);
+ /* Note that we don't need to set the selected DB at server.slaveseldb
+ * to -1 to force the master to emit SELECT, since the slave already
+ * has this state from the previous connection with the master. */
+
+ return REDIS_OK; /* The caller can return, no full resync needed. */
+
+need_full_resync:
+ /* We need a full resync for some reason... notify the client. */
+ psync_offset = server.master_repl_offset;
+ /* Add 1 to psync_offset if it the replication backlog does not exists
+ * as when it will be created later we'll increment the offset by one. */
+ if (server.repl_backlog == NULL) psync_offset++;
+ addReplySds(c,
+ sdscatprintf(sdsempty(),"+FULLRESYNC %s %lld\r\n",
+ server.runid,
+ psync_offset));
+ return REDIS_ERR;
+}
+
+/* SYNC ad PSYNC command implemenation. */
void syncCommand(redisClient *c) {
/* ignore SYNC if already slave or in monitor mode */
if (c->flags & REDIS_SLAVE) return;
@@ -136,11 +458,24 @@ void syncCommand(redisClient *c) {
* buffer registering the differences between the BGSAVE and the current
* dataset, so that we can copy to other slaves if needed. */
if (listLength(c->reply) != 0) {
- addReplyError(c,"SYNC is invalid with pending input");
+ addReplyError(c,"SYNC and PSYNC are invalid with pending input");
return;
}
- redisLog(REDIS_NOTICE,"Slave ask for synchronization");
+ redisLog(REDIS_NOTICE,"Slave asks for synchronization");
+
+ /* Try a partial resynchronization if this is a PSYNC command.
+ * If it fails, we continue with usual full resynchronization, however
+ * when this happens masterTryPartialResynchronization() already
+ * replied with:
+ *
+ * +FULLRESYNC <runid> <offset>
+ *
+ * So the slave knows the new runid and offset to try a PSYNC later
+ * if the connection with the master is lost. */
+ if (!strcasecmp(c->argv[0]->ptr,"psync") &&
+ masterTryPartialResynchronization(c) == REDIS_OK) return;
+
/* Here we need to check if there is a background saving operation
* in progress, or if it is required to start one */
if (server.rdb_child_pid != -1) {
@@ -185,6 +520,8 @@ void syncCommand(redisClient *c) {
c->flags |= REDIS_SLAVE;
server.slaveseldb = -1; /* Force to re-emit the SELECT command. */
listAddNodeTail(server.slaves,c);
+ if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
+ createReplicationBacklog();
return;
}
@@ -452,6 +789,9 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
server.master->flags |= REDIS_MASTER;
server.master->authenticated = 1;
server.repl_state = REDIS_REPL_CONNECTED;
+ server.master->reploff = server.repl_master_initial_offset;
+ memcpy(server.master->replrunid, server.repl_master_runid,
+ sizeof(server.repl_master_runid));
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
/* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite, and when done will start appending
@@ -481,8 +821,8 @@ error:
/* Send a synchronous command to the master. Used to send AUTH and
* REPLCONF commands before starting the replication with SYNC.
*
- * On success NULL is returned.
- * On error an sds string describing the error is returned.
+ * The command returns an sds string representing the result of the
+ * operation. On error the first byte is a "-".
*/
char *sendSynchronousCommand(int fd, ...) {
va_list ap;
@@ -504,7 +844,7 @@ char *sendSynchronousCommand(int fd, ...) {
/* Transfer command to the server. */
if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) == -1) {
sdsfree(cmd);
- return sdscatprintf(sdsempty(),"Writing to master: %s",
+ return sdscatprintf(sdsempty(),"-Writing to master: %s",
strerror(errno));
}
sdsfree(cmd);
@@ -512,22 +852,123 @@ char *sendSynchronousCommand(int fd, ...) {
/* Read the reply from the server. */
if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1)
{
- return sdscatprintf(sdsempty(),"Reading from master: %s",
+ return sdscatprintf(sdsempty(),"-Reading from master: %s",
strerror(errno));
}
+ return sdsnew(buf);
+}
+
+/* Try a partial resynchronization with the master if we are about to reconnect.
+ * If there is no cached master structure, at least try to issue a
+ * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
+ * command in order to obtain the master run id and the master replication
+ * global offset.
+ *
+ * This function is designed to be called from syncWithMaster(), so the
+ * following assumptions are made:
+ *
+ * 1) We pass the function an already connected socket "fd".
+ * 2) This function does not close the file descriptor "fd". However in case
+ * of successful partial resynchronization, the function will reuse
+ * 'fd' as file descriptor of the server.master client structure.
+ *
+ * The function returns:
+ *
+ * PSYNC_CONTINUE: If the PSYNC command succeded and we can continue.
+ * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
+ * In this case the master run_id and global replication
+ * offset is saved.
+ * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
+ * the caller should fall back to SYNC.
+ */
+
+#define PSYNC_CONTINUE 0
+#define PSYNC_FULLRESYNC 1
+#define PSYNC_NOT_SUPPORTED 2
+int slaveTryPartialResynchronization(int fd) {
+ char *psync_runid;
+ char psync_offset[32];
+ sds reply;
+
+ /* Initially set repl_master_initial_offset to -1 to mark the current
+ * master run_id and offset as not valid. Later if we'll be able to do
+ * a FULL resync using the PSYNC command we'll set the offset at the
+ * right value, so that this information will be propagated to the
+ * client structure representing the master into server.master. */
+ server.repl_master_initial_offset = -1;
+
+ if (server.cached_master) {
+ psync_runid = server.cached_master->replrunid;
+ snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
+ redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
+ } else {
+ redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)");
+ psync_runid = "?";
+ memcpy(psync_offset,"-1",3);
+ }
+
+ /* Issue the PSYNC command */
+ reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);
+
+ if (!strncmp(reply,"+FULLRESYNC",11)) {
+ char *runid, *offset;
+
+ /* FULL RESYNC, parse the reply in order to extract the run id
+ * and the replication offset. */
+ runid = strchr(reply,' ');
+ if (runid) {
+ runid++;
+ offset = strchr(runid,' ');
+ if (offset) offset++;
+ }
+ if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
+ redisLog(REDIS_WARNING,
+ "Master replied with wrong +FULLRESYNC syntax.");
+ } else {
+ memcpy(server.repl_master_runid, runid, offset-runid-1);
+ server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
+ server.repl_master_initial_offset = strtoll(offset,NULL,10);
+ redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
+ server.repl_master_runid,
+ server.repl_master_initial_offset);
+ }
+ /* We are going to full resync, discard the cached master structure. */
+ replicationDiscardCachedMaster();
+ sdsfree(reply);
+ return PSYNC_FULLRESYNC;
+ }
- /* Check for errors from the server. */
- if (buf[0] != '+') {
- return sdscatprintf(sdsempty(),"Error from master: %s", buf);
+ if (!strncmp(reply,"+CONTINUE",9)) {
+ /* Partial resync was accepted, set the replication state accordingly */
+ redisLog(REDIS_NOTICE,
+ "Successful partial resynchronization with master.");
+ sdsfree(reply);
+ replicationResurrectCachedMaster(fd);
+ return PSYNC_CONTINUE;
}
- return NULL; /* No errors. */
+ /* If we reach this point we receied either an error since the master does
+ * not understand PSYNC, or an unexpected reply from the master.
+ * Reply with PSYNC_NOT_SUPPORTED in both cases. */
+
+ if (strncmp(reply,"-ERR",4)) {
+ /* If it's not an error, log the unexpected event. */
+ redisLog(REDIS_WARNING,
+ "Unexpected reply to PSYNC from master: %s", reply);
+ } else {
+ redisLog(REDIS_NOTICE,
+ "Master does not support PSYNC or is in "
+ "error state (reply: %s)", reply);
+ }
+ sdsfree(reply);
+ replicationDiscardCachedMaster();
+ return PSYNC_NOT_SUPPORTED;
}
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
char tmpfile[256], *err;
int dfd, maxtries = 5;
- int sockerr = 0;
+ int sockerr = 0, psync_result;
socklen_t errlen = sizeof(sockerr);
REDIS_NOTUSED(el);
REDIS_NOTUSED(privdata);
@@ -600,11 +1041,12 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
/* AUTH with the master if required. */
if(server.masterauth) {
err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
- if (err) {
+ if (err[0] == '-') {
redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
sdsfree(err);
goto error;
}
+ sdsfree(err);
}
/* Set the slave port, so that Master's INFO command can list the
@@ -616,17 +1058,33 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
sdsfree(port);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
- if (err) {
- redisLog(REDIS_NOTICE,"(non critical): Master does not understand REPLCONF listening-port: %s", err);
- sdsfree(err);
+ if (err[0] == '-') {
+ redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);
}
+ sdsfree(err);
}
- /* Issue the SYNC command */
- if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
- redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
- strerror(errno));
- goto error;
+ /* Try a partial resynchonization. If we don't have a cached master
+ * slaveTryPartialResynchronization() will at least try to use PSYNC
+ * to start a full resynchronization so that we get the master run id
+ * and the global offset, to try a partial resync at the next
+ * reconnection attempt. */
+ psync_result = slaveTryPartialResynchronization(fd);
+ if (psync_result == PSYNC_CONTINUE) {
+ redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
+ return;
+ }
+
+ /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
+ * and the server.repl_master_runid and repl_master_initial_offset are
+ * already populated. */
+ if (psync_result == PSYNC_NOT_SUPPORTED) {
+ redisLog(REDIS_NOTICE,"Retrying with SYNC...");
+ if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
+ redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
+ strerror(errno));
+ goto error;
+ }
}
/* Prepare a suitable temp file for bulk transfer */
@@ -733,6 +1191,7 @@ void slaveofCommand(redisClient *c) {
sdsfree(server.masterhost);
server.masterhost = NULL;
if (server.master) freeClient(server.master);
+ replicationDiscardCachedMaster();
cancelReplicationHandshake();
server.repl_state = REDIS_REPL_NONE;
redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
@@ -757,6 +1216,7 @@ void slaveofCommand(redisClient *c) {
server.masterport = port;
if (server.master) freeClient(server.master);
disconnectSlaves(); /* Force our slaves to resync with us as well. */
+ replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
cancelReplicationHandshake();
server.repl_state = REDIS_REPL_CONNECT;
redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
@@ -765,6 +1225,92 @@ void slaveofCommand(redisClient *c) {
addReply(c,shared.ok);
}
+/* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */
+
+/* In order to implement partial synchronization we need to be able to cache
+ * our master's client structure after a transient disconnection.
+ * It is cached into server.cached_master and flushed away using the following
+ * functions. */
+
+/* This function is called by freeClient() in order to cache the master
+ * client structure instead of destryoing it. freeClient() will return
+ * ASAP after this function returns, so every action needed to avoid problems
+ * with a client that is really "suspended" has to be done by this function.
+ *
+ * The other functions that will deal with the cached master are:
+ *
+ * replicationDiscardCachedMaster() that will make sure to kill the client
+ * as for some reason we don't want to use it in the future.
+ *
+ * replicationResurrectCachedMaster() that is used after a successful PSYNC
+ * handshake in order to reactivate the cached master.
+ */
+void replicationCacheMaster(redisClient *c) {
+ listNode *ln;
+
+ redisAssert(server.master != NULL && server.cached_master == NULL);
+ redisLog(REDIS_NOTICE,"Caching the disconnected master state.");
+
+ /* Remove from the list of clients, we don't want this client to be
+ * listed by CLIENT LIST or processed in any way by batch operations. */
+ ln = listSearchKey(server.clients,c);
+ redisAssert(ln != NULL);
+ listDelNode(server.clients,ln);
+
+ /* Save the master. Server.master will be set to null later by
+ * replicationHandleMasterDisconnection(). */
+ server.cached_master = server.master;
+
+ /* Remove the event handlers and close the socket. We'll later reuse
+ * the socket of the new connection with the master during PSYNC. */
+ aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
+ aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
+ close(c->fd);
+
+ /* Set fd to -1 so that we can safely call freeClient(c) later. */
+ c->fd = -1;
+
+ /* Caching the master happens instead of the actual freeClient() call,
+ * so make sure to adjust the replication state. This function will
+ * also set server.master to NULL. */
+ replicationHandleMasterDisconnection();
+}
+
+/* Free a cached master, called when there are no longer the conditions for
+ * a partial resync on reconnection. */
+void replicationDiscardCachedMaster(void) {
+ if (server.cached_master == NULL) return;
+
+ redisLog(REDIS_NOTICE,"Discarding previously cached master state.");
+ server.cached_master->flags &= ~REDIS_MASTER;
+ freeClient(server.cached_master);
+ server.cached_master = NULL;
+}
+
+/* Turn the cached master into the current master, using the file descriptor
+ * passed as argument as the socket for the new master.
+ *
+ * This funciton is called when successfully setup a partial resynchronization
+ * so the stream of data that we'll receive will start from were this
+ * master left. */
+void replicationResurrectCachedMaster(int newfd) {
+ server.master = server.cached_master;
+ server.cached_master = NULL;
+ server.master->fd = newfd;
+ server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP);
+ server.master->authenticated = 1;
+ server.master->lastinteraction = server.unixtime;
+ server.repl_state = REDIS_REPL_CONNECTED;
+
+ /* Re-add to the list of clients. */
+ listAddNodeTail(server.clients,server.master);
+ if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
+ readQueryFromClient, server.master)) {
+ redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
+ freeClientAsync(server.master); /* Close ASAP. */
+ }
+}
+
/* --------------------------- REPLICATION CRON ---------------------------- */
void replicationCron(void) {
@@ -816,8 +1362,8 @@ void replicationCron(void) {
replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);
decrRefCount(ping_argv[0]);
- /* Second, send a newline to all the slaves in pre-synchronization stage,
- * that is, slaves waiting for the master to create the RDB file.
+ /* Second, send a newline to all the slaves in pre-synchronization
+ * stage, that is, slaves waiting for the master to create the RDB file.
* The newline will be ignored by the slave but will refresh the
* last-io timer preventing a timeout. */
listRewind(server.slaves,&li);
@@ -832,4 +1378,19 @@ void replicationCron(void) {
}
}
}
+
+ /* If we have no attached slaves and there is a replication backlog
+ * using memory, free it after some (configured) time. */
+ if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
+ server.repl_backlog)
+ {
+ time_t idle = server.unixtime - server.repl_no_slaves_since;
+
+ if (idle > server.repl_backlog_time_limit) {
+ freeReplicationBacklog();
+ redisLog(REDIS_NOTICE,
+ "Replication backlog freed after %d seconds "
+ "without connected slaves.", server.repl_backlog_time_limit);
+ }
+ }
}