summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOran Agra <oran@redislabs.com>2015-03-02 11:20:19 +0200
committerantirez <antirez@gmail.com>2017-12-11 11:09:33 +0100
commit9e5224bfd5ec13c14f89fe5eb15441a1d74d8dc5 (patch)
tree3a600448a3a433b5fe1e1d360efa2dcd3ca97a72
parent0f26125841b13c29d89a2b78957262f164b31770 (diff)
downloadredis-9e5224bfd5ec13c14f89fe5eb15441a1d74d8dc5.tar.gz
diskless replication on slave side (don't store rdb to file), plus some other related fixes
The implementation of the diskless replication was currently diskless only on the master side. The slave side still stores the received rdb file to the disk before loading it back in and parsing it. other changes: -------------- don't save rdb / aof file when we're a slave that is not synced (sync failed and dataset is empty), so that we don't override an existing one and end up loosing data on failover. loadAppendOnlyFile (loadDataFromDisk) would have exit() if the file doesn't exist, but that would never happen since the file was always already created in initServer before that check. instad: don't create an empty aof file on startup before reading it and only create it when we start writing to it. this allows us to distinguish between success to load an empty file and a failure to load a non-existing file, currently we don't use the above since the startup should succeed even if the file doesn't exist (first server startup). maybe we need to add another config called "preload-file" or "load-on-startup" or "abort-when-load-fails" distinguish between aof configuration and state so that we can re-enable aof only when sync eventually succeeds (and not when exiting from readSyncBulkPayload after a failed attempt) also a CONFIG GET and INFO during rdb loading would have lied SLAVEOF NO ONE, will have an argument to succeed only if the slave is in sync (a specific offset can be provided) tests: add test for not saving on exit for unsynced slave replication tests for diskless slave and diskless master other replication tests improvements (not related to diskless slave)
-rw-r--r--src/anet.c14
-rw-r--r--src/anet.h1
-rw-r--r--src/aof.c34
-rw-r--r--src/config.c34
-rw-r--r--src/rdb.c34
-rw-r--r--src/redis-check-rdb.c4
-rw-r--r--src/replication.c325
-rw-r--r--src/rio.c109
-rw-r--r--src/rio.h10
-rw-r--r--src/server.c28
-rw-r--r--src/server.h11
-rw-r--r--tests/integration/psync2.tcl18
-rw-r--r--tests/integration/replication-4.tcl93
-rw-r--r--tests/integration/replication-psync.tcl66
-rw-r--r--tests/integration/replication.tcl147
-rw-r--r--tests/support/util.tcl12
16 files changed, 673 insertions, 267 deletions
diff --git a/src/anet.c b/src/anet.c
index e9530398d..c4c4a0b32 100644
--- a/src/anet.c
+++ b/src/anet.c
@@ -193,6 +193,20 @@ int anetSendTimeout(char *err, int fd, long long ms) {
return ANET_OK;
}
+/* Set the socket receive timeout (SO_RCVTIMEO socket option) to the specified
+ * number of milliseconds, or disable it if the 'ms' argument is zero. */
+int anetRecvTimeout(char *err, int fd, long long ms) {
+ struct timeval tv;
+
+ tv.tv_sec = ms/1000;
+ tv.tv_usec = (ms%1000)*1000;
+ if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) {
+ anetSetError(err, "setsockopt SO_RCVTIMEO: %s", strerror(errno));
+ return ANET_ERR;
+ }
+ return ANET_OK;
+}
+
/* anetGenericResolve() is called by anetResolve() and anetResolveIP() to
* do the actual work. It resolves the hostname "host" and set the string
* representation of the IP address into the buffer pointed by "ipbuf".
diff --git a/src/anet.h b/src/anet.h
index 7142f78d2..dd735240d 100644
--- a/src/anet.h
+++ b/src/anet.h
@@ -70,6 +70,7 @@ int anetEnableTcpNoDelay(char *err, int fd);
int anetDisableTcpNoDelay(char *err, int fd);
int anetTcpKeepAlive(char *err, int fd);
int anetSendTimeout(char *err, int fd, long long ms);
+int anetRecvTimeout(char *err, int fd, long long ms);
int anetPeerToString(int fd, char *ip, size_t ip_len, int *port);
int anetKeepAlive(char *err, int fd, int interval);
int anetSockName(int fd, char *ip, size_t ip_len, int *port);
diff --git a/src/aof.c b/src/aof.c
index 79962fd0a..1d8a82d5c 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -40,7 +40,7 @@
#include <sys/wait.h>
#include <sys/param.h>
-void aofUpdateCurrentSize(void);
+void aofUpdateCurrentSize(int fd);
void aofClosePipes(void);
/* ----------------------------------------------------------------------------
@@ -292,6 +292,18 @@ void flushAppendOnlyFile(int force) {
if (sdslen(server.aof_buf) == 0) return;
+ /* We are not supposed to reach here if a slave has an empty data set */
+ serverAssert(!isUnsyncedSlave());
+
+ /* Open the AOF file if needed. */
+ if (server.aof_state == AOF_ON && server.aof_fd == -1) {
+ server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);
+ if (server.aof_fd == -1) {
+ serverLog(LL_WARNING, "Can't open the append-only file: %s", strerror(errno));
+ exit(1);
+ }
+ }
+
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
@@ -636,8 +648,8 @@ int loadAppendOnlyFile(char *filename) {
off_t valid_up_to = 0; /* Offset of latest well-formed command loaded. */
if (fp == NULL) {
- serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
- exit(1);
+ serverLog(LL_NOTICE,"Warning: Can't open the append log file for reading: %s (First run?)",strerror(errno));
+ return C_ERR;
}
/* Handle a zero-length AOF file as a special case. An emtpy AOF file
@@ -647,7 +659,7 @@ int loadAppendOnlyFile(char *filename) {
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
server.aof_current_size = 0;
fclose(fp);
- return C_ERR;
+ return C_OK;
}
/* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
@@ -655,7 +667,7 @@ int loadAppendOnlyFile(char *filename) {
server.aof_state = AOF_OFF;
fakeClient = createFakeClient();
- startLoading(fp);
+ startLoadingFile(fp);
/* Check if this AOF file has an RDB preamble. In that case we need to
* load the RDB file and later continue loading the AOF tail. */
@@ -759,11 +771,11 @@ int loadAppendOnlyFile(char *filename) {
if (fakeClient->flags & CLIENT_MULTI) goto uxeof;
loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
+ aofUpdateCurrentSize(fileno(fp));
fclose(fp);
freeFakeClient(fakeClient);
server.aof_state = old_aof_state;
stopLoading();
- aofUpdateCurrentSize();
server.aof_rewrite_base_size = server.aof_current_size;
return C_OK;
@@ -1372,6 +1384,10 @@ int rewriteAppendOnlyFileBackground(void) {
long long start;
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
+ if (isUnsyncedSlave()) {
+ serverLog(LL_NOTICE,"AOFRW skipped, no data received from master");
+ return C_ERR;
+ }
if (aofCreatePipes() != C_OK) return C_ERR;
openChildInfoPipe();
start = ustime();
@@ -1451,12 +1467,12 @@ void aofRemoveTempFile(pid_t childpid) {
* to check the size of the file. This is useful after a rewrite or after
* a restart, normally the size is updated just adding the write length
* to the current length, that is much faster. */
-void aofUpdateCurrentSize(void) {
+void aofUpdateCurrentSize(int fd) {
struct redis_stat sb;
mstime_t latency;
latencyStartMonitor(latency);
- if (redis_fstat(server.aof_fd,&sb) == -1) {
+ if (redis_fstat(fd,&sb) == -1) {
serverLog(LL_WARNING,"Unable to obtain the AOF file length. stat: %s",
strerror(errno));
} else {
@@ -1570,7 +1586,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
aof_background_fsync(newfd);
server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
- aofUpdateCurrentSize();
+ aofUpdateCurrentSize(newfd);
server.aof_rewrite_base_size = server.aof_current_size;
/* Clear regular AOF buffer since its contents was just written to
diff --git a/src/config.c b/src/config.c
index 24a590caf..a7a14f498 100644
--- a/src/config.c
+++ b/src/config.c
@@ -365,6 +365,10 @@ void loadServerConfigFromString(char *config) {
if ((server.repl_diskless_sync = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
+ } else if (!strcasecmp(argv[0],"repl-diskless-load") && argc==2) {
+ if ((server.repl_diskless_load = yesnotoi(argv[1])) == -1) {
+ err = "argument must be 'yes' or 'no'"; goto loaderr;
+ }
} else if (!strcasecmp(argv[0],"repl-diskless-sync-delay") && argc==2) {
server.repl_diskless_sync_delay = atoi(argv[1]);
if (server.repl_diskless_sync_delay < 0) {
@@ -436,12 +440,10 @@ void loadServerConfigFromString(char *config) {
if (server.hz < CONFIG_MIN_HZ) server.hz = CONFIG_MIN_HZ;
if (server.hz > CONFIG_MAX_HZ) server.hz = CONFIG_MAX_HZ;
} else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
- int yes;
-
- if ((yes = yesnotoi(argv[1])) == -1) {
+ if ((server.aof_enabled = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
- server.aof_state = yes ? AOF_ON : AOF_OFF;
+ server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF;
} else if (!strcasecmp(argv[0],"appendfilename") && argc == 2) {
if (!pathIsBaseName(argv[1])) {
err = "appendfilename can't be a path, just a filename";
@@ -483,6 +485,12 @@ void loadServerConfigFromString(char *config) {
if ((server.aof_load_truncated = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
+ } else if (!strcasecmp(argv[0],"rdb-key-save-delay") && argc==2) {
+ server.rdb_key_save_delay = atoi(argv[1]);
+ if (server.rdb_key_save_delay < 0) {
+ err = "rdb-key-save-delay can't be negative";
+ goto loaderr;
+ }
} else if (!strcasecmp(argv[0],"aof-use-rdb-preamble") && argc == 2) {
if ((server.aof_use_rdb_preamble = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
@@ -880,10 +888,13 @@ void configSetCommand(client *c) {
int enable = yesnotoi(o->ptr);
if (enable == -1) goto badfmt;
+ server.aof_enabled = enable;
if (enable == 0 && server.aof_state != AOF_OFF) {
stopAppendOnly();
} else if (enable && server.aof_state == AOF_OFF) {
- if (startAppendOnly() == C_ERR) {
+ if (isUnsyncedSlave()) {
+ serverLog(LL_NOTICE, "Not yet synced with master, skipping initial AOFRW.");
+ } else if (startAppendOnly() == C_ERR) {
addReplyError(c,
"Unable to turn on AOF. Check server logs.");
return;
@@ -1018,6 +1029,8 @@ void configSetCommand(client *c) {
}
#endif
} config_set_bool_field(
+ "repl-diskless-load",server.repl_diskless_load) {
+ } config_set_bool_field(
"protected-mode",server.protected_mode) {
} config_set_bool_field(
"stop-writes-on-bgsave-error",server.stop_writes_on_bgsave_err) {
@@ -1091,6 +1104,8 @@ void configSetCommand(client *c) {
} config_set_numerical_field(
"repl-diskless-sync-delay",server.repl_diskless_sync_delay,0,LLONG_MAX) {
} config_set_numerical_field(
+ "rdb-key-save-delay",server.rdb_key_save_delay,0,LLONG_MAX) {
+ } config_set_numerical_field(
"slave-priority",server.slave_priority,0,LLONG_MAX) {
} config_set_numerical_field(
"slave-announce-port",server.slave_announce_port,0,65535) {
@@ -1276,6 +1291,7 @@ void configGetCommand(client *c) {
config_get_numerical_field("cluster-migration-barrier",server.cluster_migration_barrier);
config_get_numerical_field("cluster-slave-validity-factor",server.cluster_slave_validity_factor);
config_get_numerical_field("repl-diskless-sync-delay",server.repl_diskless_sync_delay);
+ config_get_numerical_field("rdb-key-save-delay",server.rdb_key_save_delay);
config_get_numerical_field("tcp-keepalive",server.tcpkeepalive);
/* Bool (yes/no) values */
@@ -1299,6 +1315,8 @@ void configGetCommand(client *c) {
server.repl_disable_tcp_nodelay);
config_get_bool_field("repl-diskless-sync",
server.repl_diskless_sync);
+ config_get_bool_field("repl-diskless-load",
+ server.repl_diskless_load);
config_get_bool_field("aof-rewrite-incremental-fsync",
server.aof_rewrite_incremental_fsync);
config_get_bool_field("aof-load-truncated",
@@ -1330,7 +1348,7 @@ void configGetCommand(client *c) {
if (stringmatch(pattern,"appendonly",1)) {
addReplyBulkCString(c,"appendonly");
- addReplyBulkCString(c,server.aof_state == AOF_OFF ? "no" : "yes");
+ addReplyBulkCString(c,server.aof_enabled ? "yes" : "no");
matches++;
}
if (stringmatch(pattern,"dir",1)) {
@@ -1985,6 +2003,7 @@ int rewriteConfig(char *path) {
rewriteConfigBytesOption(state,"repl-backlog-ttl",server.repl_backlog_time_limit,CONFIG_DEFAULT_REPL_BACKLOG_TIME_LIMIT);
rewriteConfigYesNoOption(state,"repl-disable-tcp-nodelay",server.repl_disable_tcp_nodelay,CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY);
rewriteConfigYesNoOption(state,"repl-diskless-sync",server.repl_diskless_sync,CONFIG_DEFAULT_REPL_DISKLESS_SYNC);
+ rewriteConfigYesNoOption(state,"repl-diskless-load",server.repl_diskless_load,CONFIG_DEFAULT_REPL_DISKLESS_LOAD);
rewriteConfigNumericalOption(state,"repl-diskless-sync-delay",server.repl_diskless_sync_delay,CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY);
rewriteConfigNumericalOption(state,"slave-priority",server.slave_priority,CONFIG_DEFAULT_SLAVE_PRIORITY);
rewriteConfigNumericalOption(state,"min-slaves-to-write",server.repl_min_slaves_to_write,CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE);
@@ -2001,7 +2020,7 @@ int rewriteConfig(char *path) {
rewriteConfigBytesOption(state,"active-defrag-ignore-bytes",server.active_defrag_ignore_bytes,CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES);
rewriteConfigNumericalOption(state,"active-defrag-cycle-min",server.active_defrag_cycle_min,CONFIG_DEFAULT_DEFRAG_CYCLE_MIN);
rewriteConfigNumericalOption(state,"active-defrag-cycle-max",server.active_defrag_cycle_max,CONFIG_DEFAULT_DEFRAG_CYCLE_MAX);
- rewriteConfigYesNoOption(state,"appendonly",server.aof_state != AOF_OFF,0);
+ rewriteConfigYesNoOption(state,"appendonly",server.aof_enabled,0);
rewriteConfigStringOption(state,"appendfilename",server.aof_filename,CONFIG_DEFAULT_AOF_FILENAME);
rewriteConfigEnumOption(state,"appendfsync",server.aof_fsync,aof_fsync_enum,CONFIG_DEFAULT_AOF_FSYNC);
rewriteConfigYesNoOption(state,"no-appendfsync-on-rewrite",server.aof_no_fsync_on_rewrite,CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE);
@@ -2039,6 +2058,7 @@ int rewriteConfig(char *path) {
rewriteConfigYesNoOption(state,"lazyfree-lazy-expire",server.lazyfree_lazy_expire,CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE);
rewriteConfigYesNoOption(state,"lazyfree-lazy-server-del",server.lazyfree_lazy_server_del,CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL);
rewriteConfigYesNoOption(state,"slave-lazy-flush",server.repl_slave_lazy_flush,CONFIG_DEFAULT_SLAVE_LAZY_FLUSH);
+ rewriteConfigNumericalOption(state,"rdb-key-save-delay",server.rdb_key_save_delay,CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY);
/* Rewrite Sentinel config if in Sentinel mode. */
if (server.sentinel_mode) rewriteConfigSentinelOption(state);
diff --git a/src/rdb.c b/src/rdb.c
index 4f46d3580..86fc96124 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -857,6 +857,11 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,
if (rdbSaveObjectType(rdb,val) == -1) return -1;
if (rdbSaveStringObject(rdb,key) == -1) return -1;
if (rdbSaveObject(rdb,val) == -1) return -1;
+
+ /* Delay return if required (for testing) */
+ if (server.rdb_key_save_delay)
+ usleep(server.rdb_key_save_delay);
+
return 1;
}
@@ -1042,6 +1047,11 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
rio rdb;
int error = 0;
+ if (isUnsyncedSlave()) {
+ serverLog(LL_NOTICE, "RDB save skipped, no data received from master");
+ return C_ERR;
+ }
+
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
@@ -1100,6 +1110,11 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
+ if (isUnsyncedSlave()) {
+ serverLog(LL_NOTICE,"BGSAVE skipped, no data received from master");
+ return C_ERR;
+ }
+
server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL);
openChildInfoPipe();
@@ -1530,18 +1545,19 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
/* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats. */
-void startLoading(FILE *fp) {
- struct stat sb;
-
+void startLoading(size_t size) {
/* Load the DB */
server.loading = 1;
server.loading_start_time = time(NULL);
server.loading_loaded_bytes = 0;
- if (fstat(fileno(fp), &sb) == -1) {
- server.loading_total_bytes = 0;
- } else {
- server.loading_total_bytes = sb.st_size;
- }
+ server.loading_total_bytes = size;
+}
+
+void startLoadingFile(FILE *fp) {
+ struct stat sb;
+ if (fstat(fileno(fp), &sb) == -1)
+ sb.st_size = 0;
+ startLoading(sb.st_size);
}
/* Refresh the loading progress info */
@@ -1752,7 +1768,7 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi) {
int retval;
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
- startLoading(fp);
+ startLoadingFile(fp);
rioInitWithFile(&rdb,fp);
retval = rdbLoadRio(&rdb,rsi);
fclose(fp);
diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c
index 71ac50d03..243e0035b 100644
--- a/src/redis-check-rdb.c
+++ b/src/redis-check-rdb.c
@@ -201,7 +201,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
goto err;
}
- startLoading(fp);
+ startLoadingFile(fp);
while(1) {
robj *key, *val;
expiretime = -1;
@@ -314,6 +314,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
}
if (closefile) fclose(fp);
+ stopLoading();
return 0;
eoferr: /* unexpected end of file is handled here with a fatal exit */
@@ -324,6 +325,7 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
}
err:
if (closefile) fclose(fp);
+ stopLoading();
return 1;
}
diff --git a/src/replication.c b/src/replication.c
index 328382570..a53513651 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -1165,141 +1165,182 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
* at the next call. */
server.repl_transfer_size = 0;
serverLog(LL_NOTICE,
- "MASTER <-> SLAVE sync: receiving streamed RDB from master");
+ "MASTER <-> SLAVE sync: receiving streamed RDB from master with EOF %s",
+ server.repl_diskless_load? "to parser":"to disk");
} else {
usemark = 0;
server.repl_transfer_size = strtol(buf+1,NULL,10);
serverLog(LL_NOTICE,
- "MASTER <-> SLAVE sync: receiving %lld bytes from master",
- (long long) server.repl_transfer_size);
+ "MASTER <-> SLAVE sync: receiving %lld bytes from master %s",
+ (long long) server.repl_transfer_size,
+ server.repl_diskless_load? "to parser":"to disk");
}
return;
}
- /* Read bulk data */
- if (usemark) {
- readlen = sizeof(buf);
- } else {
- left = server.repl_transfer_size - server.repl_transfer_read;
- readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
- }
-
- nread = read(fd,buf,readlen);
- if (nread <= 0) {
- serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
- (nread == -1) ? strerror(errno) : "connection lost");
- cancelReplicationHandshake();
- return;
- }
- server.stat_net_input_bytes += nread;
-
- /* When a mark is used, we want to detect EOF asap in order to avoid
- * writing the EOF mark into the file... */
- int eof_reached = 0;
+ if (!server.repl_diskless_load) {
- if (usemark) {
- /* Update the last bytes array, and check if it matches our delimiter.*/
- if (nread >= CONFIG_RUN_ID_SIZE) {
- memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE);
+ /* read the data from the socket, store it to a file and search for the EOF */
+ if (usemark) {
+ readlen = sizeof(buf);
} else {
- int rem = CONFIG_RUN_ID_SIZE-nread;
- memmove(lastbytes,lastbytes+nread,rem);
- memcpy(lastbytes+rem,buf,nread);
+ left = server.repl_transfer_size - server.repl_transfer_read;
+ readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
}
- if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1;
- }
- server.repl_transfer_lastio = server.unixtime;
- if (write(server.repl_transfer_fd,buf,nread) != nread) {
- serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
- goto error;
- }
- server.repl_transfer_read += nread;
+ nread = read(fd,buf,readlen);
+ if (nread <= 0) {
+ serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
+ (nread == -1) ? strerror(errno) : "connection lost");
+ cancelReplicationHandshake();
+ return;
+ }
+ server.stat_net_input_bytes += nread;
- /* Delete the last 40 bytes from the file if we reached EOF. */
- if (usemark && eof_reached) {
- if (ftruncate(server.repl_transfer_fd,
- server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
- {
- serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
+ /* When a mark is used, we want to detect EOF asap in order to avoid
+ * writing the EOF mark into the file... */
+ int eof_reached = 0;
+
+ if (usemark) {
+ /* Update the last bytes array, and check if it matches our delimiter.*/
+ if (nread >= CONFIG_RUN_ID_SIZE) {
+ memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE);
+ } else {
+ int rem = CONFIG_RUN_ID_SIZE-nread;
+ memmove(lastbytes,lastbytes+nread,rem);
+ memcpy(lastbytes+rem,buf,nread);
+ }
+ if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1;
+ }
+
+ server.repl_transfer_lastio = server.unixtime;
+ if (write(server.repl_transfer_fd,buf,nread) != nread) {
+ serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
goto error;
}
- }
+ server.repl_transfer_read += nread;
- /* Sync data on disk from time to time, otherwise at the end of the transfer
- * we may suffer a big delay as the memory buffers are copied into the
- * actual disk. */
- if (server.repl_transfer_read >=
- server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
- {
- off_t sync_size = server.repl_transfer_read -
- server.repl_transfer_last_fsync_off;
- rdb_fsync_range(server.repl_transfer_fd,
- server.repl_transfer_last_fsync_off, sync_size);
- server.repl_transfer_last_fsync_off += sync_size;
- }
+ /* Delete the last 40 bytes from the file if we reached EOF. */
+ if (usemark && eof_reached) {
+ if (ftruncate(server.repl_transfer_fd,
+ server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
+ {
+ serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
+ goto error;
+ }
+ }
- /* Check if the transfer is now complete */
- if (!usemark) {
- if (server.repl_transfer_read == server.repl_transfer_size)
- eof_reached = 1;
- }
+ /* Sync data on disk from time to time, otherwise at the end of the transfer
+ * we may suffer a big delay as the memory buffers are copied into the
+ * actual disk. */
+ if (server.repl_transfer_read >=
+ server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
+ {
+ off_t sync_size = server.repl_transfer_read -
+ server.repl_transfer_last_fsync_off;
+ rdb_fsync_range(server.repl_transfer_fd,
+ server.repl_transfer_last_fsync_off, sync_size);
+ server.repl_transfer_last_fsync_off += sync_size;
+ }
- if (eof_reached) {
- int aof_is_enabled = server.aof_state != AOF_OFF;
+ /* Check if the transfer is now complete */
+ if (!usemark) {
+ if (server.repl_transfer_read == server.repl_transfer_size)
+ eof_reached = 1;
+ }
+ if (!eof_reached)
+ return;
+ }
+ /* We reach here when the slave is using diskless replication,
+ * or when we are done reading from the socket to the rdb file. */
+ serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");
+ /* We need to stop any AOFRW fork before flusing and parsing
+ * RDB, otherwise we'll create a copy-on-write disaster. */
+ if (server.aof_state != AOF_OFF) stopAppendOnly();
+ signalFlushedDb(-1);
+ emptyDb(
+ -1,
+ server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS,
+ replicationEmptyDbCallback);
+ /* Before loading the DB into memory we need to delete the readable
+ * handler, otherwise it will get called recursively since
+ * rdbLoad() will call the event loop to process events from time to
+ * time for non blocking loading. */
+ aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
+ serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
+ rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
+ if (server.repl_diskless_load) {
+ rio rdb;
+ rioInitWithFd(&rdb,fd,server.repl_transfer_size);
+ /* Put the socket in blocking mode to simplify RDB transfer.
+ * We'll restore it when the RDB is received. */
+ anetBlock(NULL,fd);
+ anetRecvTimeout(NULL,fd,server.repl_timeout*1000);
+
+ startLoading(server.repl_transfer_size);
+ if (rdbLoadRio(&rdb,&rsi) != C_OK) {
+ stopLoading();
+ serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
+ cancelReplicationHandshake();
+ rioFreeFd(&rdb, NULL);
+ /* Remove the half-loaded data */
+ emptyDb(
+ -1,
+ server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS,
+ replicationEmptyDbCallback);
+ return;
+ }
+ stopLoading();
+ if (usemark) {
+ if (!rioRead(&rdb,buf,CONFIG_RUN_ID_SIZE) || memcmp(buf,eofmark,CONFIG_RUN_ID_SIZE) != 0) {
+ serverLog(LL_WARNING,"Replication stream EOF marker is broken");
+ cancelReplicationHandshake();
+ rioFreeFd(&rdb, NULL);
+ return;
+ }
+ }
+ /* get the unread command stream from the rio buffer */
+ rioFreeFd(&rdb, NULL);
+ /* Restore the socket as non-blocking. */
+ anetNonBlock(NULL,fd);
+ anetRecvTimeout(NULL,fd,0);
+ } else {
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
cancelReplicationHandshake();
return;
}
- serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");
- /* We need to stop any AOFRW fork before flusing and parsing
- * RDB, otherwise we'll create a copy-on-write disaster. */
- if(aof_is_enabled) stopAppendOnly();
- signalFlushedDb(-1);
- emptyDb(
- -1,
- server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS,
- replicationEmptyDbCallback);
- /* Before loading the DB into memory we need to delete the readable
- * handler, otherwise it will get called recursively since
- * rdbLoad() will call the event loop to process events from time to
- * time for non blocking loading. */
- aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
- serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
- rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
cancelReplicationHandshake();
- /* Re-enable the AOF if we disabled it earlier, in order to restore
- * the original configuration. */
- if (aof_is_enabled) restartAOF();
return;
}
- /* Final setup of the connected slave <- master link */
zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd);
- replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
- server.repl_state = REPL_STATE_CONNECTED;
- /* After a full resynchroniziation we use the replication ID and
- * offset of the master. The secondary ID / offset are cleared since
- * we are starting a new history. */
- memcpy(server.replid,server.master->replid,sizeof(server.replid));
- server.master_repl_offset = server.master->reploff;
- clearReplicationId2();
- /* Let's create the replication backlog if needed. Slaves need to
- * accumulate the backlog regardless of the fact they have sub-slaves
- * or not, in order to behave correctly if they are promoted to
- * masters after a failover. */
- if (server.repl_backlog == NULL) createReplicationBacklog();
-
- serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
- /* Restart the AOF subsystem now that we finished the sync. This
- * will trigger an AOF rewrite, and when done will start appending
- * to the new file. */
- if (aof_is_enabled) restartAOF();
+ server.repl_transfer_fd = -1;
+ server.repl_transfer_tmpfile = NULL;
}
+ /* Final setup of the connected slave <- master link */
+ replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
+ server.repl_state = REPL_STATE_CONNECTED;
+ /* After a full resynchroniziation we use the replication ID and
+ * offset of the master. The secondary ID / offset are cleared since
+ * we are starting a new history. */
+ memcpy(server.replid,server.master->replid,sizeof(server.replid));
+ server.master_repl_offset = server.master->reploff;
+ clearReplicationId2();
+ /* Let's create the replication backlog if needed. Slaves need to
+ * accumulate the backlog regardless of the fact they have sub-slaves
+ * or not, in order to behave correctly if they are promoted to
+ * masters after a failover. */
+ if (server.repl_backlog == NULL) createReplicationBacklog();
+
+ serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
+ /* Restart the AOF subsystem now that we finished the sync. This
+ * will trigger an AOF rewrite, and when done will start appending
+ * to the new file. */
+ if (server.aof_enabled) restartAOF();
return;
error:
@@ -1810,16 +1851,20 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
}
/* Prepare a suitable temp file for bulk transfer */
- while(maxtries--) {
- snprintf(tmpfile,256,
- "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
- dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
- if (dfd != -1) break;
- sleep(1);
- }
- if (dfd == -1) {
- serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
- goto error;
+ if (!server.repl_diskless_load) {
+ while(maxtries--) {
+ snprintf(tmpfile,256,
+ "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
+ dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
+ if (dfd != -1) break;
+ sleep(1);
+ }
+ if (dfd == -1) {
+ serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
+ goto error;
+ }
+ server.repl_transfer_tmpfile = zstrdup(tmpfile);
+ server.repl_transfer_fd = dfd;
}
/* Setup the non blocking download of the bulk file. */
@@ -1836,15 +1881,19 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
- server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = server.unixtime;
- server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;
error:
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
if (dfd != -1) close(dfd);
close(fd);
+ if (server.repl_transfer_fd != -1)
+ close(server.repl_transfer_fd);
+ if (server.repl_transfer_tmpfile)
+ zfree(server.repl_transfer_tmpfile);
+ server.repl_transfer_tmpfile = NULL;
+ server.repl_transfer_fd = -1;
server.repl_transfer_s = -1;
server.repl_state = REPL_STATE_CONNECT;
return;
@@ -1898,9 +1947,13 @@ void undoConnectWithMaster(void) {
void replicationAbortSyncTransfer(void) {
serverAssert(server.repl_state == REPL_STATE_TRANSFER);
undoConnectWithMaster();
- close(server.repl_transfer_fd);
- unlink(server.repl_transfer_tmpfile);
- zfree(server.repl_transfer_tmpfile);
+ if (server.repl_transfer_fd!=-1) {
+ close(server.repl_transfer_fd);
+ unlink(server.repl_transfer_tmpfile);
+ zfree(server.repl_transfer_tmpfile);
+ server.repl_transfer_tmpfile = NULL;
+ server.repl_transfer_fd = -1;
+ }
}
/* This function aborts a non blocking replication attempt if there is one
@@ -2000,11 +2053,28 @@ void slaveofCommand(client *c) {
if (!strcasecmp(c->argv[1]->ptr,"no") &&
!strcasecmp(c->argv[2]->ptr,"one")) {
if (server.masterhost) {
+ if (c->argc == 4) {
+ long long expected, slave_repl_offset = replicationGetSlaveOffset();
+
+ /* Get expected replication offset */
+ if (getLongLongFromObjectOrReply(c, c->argv[3], &expected, NULL) != C_OK)
+ return;
+ /* Return error if we are below the expected offset */
+ if (slave_repl_offset < expected) {
+ serverLog(LL_NOTICE,"SLAVEOF NO ONE failed due to unmet replication offset expectation (%lld < %lld)",
+ server.master_repl_offset, expected);
+ addReplyError(c,"Replication offset is smaller than expected.");
+ return;
+ }
+ }
replicationUnsetMaster();
sds client = catClientInfoString(sdsempty(),c);
serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
client);
sdsfree(client);
+ /* Restart the AOF subsystem in case we shut it down during a sync when
+ * we were still a slave. */
+ if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOF();
}
} else {
long port;
@@ -2469,10 +2539,8 @@ void processClientsWaitingReplicas(void) {
}
}
-/* Return the slave replication offset for this instance, that is
- * the offset for which we already processed the master replication stream. */
-long long replicationGetSlaveOffset(void) {
- long long offset = 0;
+long long replicationGetSlaveOffsetRaw(void) {
+ long long offset = -1;
if (server.masterhost != NULL) {
if (server.master) {
@@ -2481,6 +2549,15 @@ long long replicationGetSlaveOffset(void) {
offset = server.cached_master->reploff;
}
}
+
+ return offset;
+}
+
+/* Return the slave replication offset for this instance, that is
+ * the offset for which we already processed the master replication stream. */
+long long replicationGetSlaveOffset(void) {
+ long long offset = replicationGetSlaveOffsetRaw();
+
/* offset may be -1 when the master does not support it at all, however
* this function is designed to return an offset that can express the
* amount of data processed by the master, so we return a positive
@@ -2489,6 +2566,10 @@ long long replicationGetSlaveOffset(void) {
return offset;
}
+int isUnsyncedSlave() {
+ return server.masterhost && replicationGetSlaveOffsetRaw() == -1;
+}
+
/* --------------------------- REPLICATION CRON ---------------------------- */
/* Replication cron function, called 1 time per second. */
diff --git a/src/rio.c b/src/rio.c
index 9c7220fcc..f00b8ba37 100644
--- a/src/rio.c
+++ b/src/rio.c
@@ -157,6 +157,113 @@ void rioInitWithFile(rio *r, FILE *fp) {
r->io.file.autosync = 0;
}
+/* ------------------- File descriptor implementation ------------------- */
+
+static size_t rioFdWrite(rio *r, const void *buf, size_t len) {
+ UNUSED(r);
+ UNUSED(buf);
+ UNUSED(len);
+ return 0; /* Error, this target does not yet support writing. */
+}
+
+/* Returns 1 or 0 for success/failure. */
+static size_t rioFdRead(rio *r, void *buf, size_t len) {
+ size_t avail = sdslen(r->io.fd.buf)-r->io.fd.pos;
+
+ /* if the buffer is too small for the entire request: realloc */
+ if (sdslen(r->io.fd.buf) + sdsavail(r->io.fd.buf) < len)
+ r->io.fd.buf = sdsMakeRoomFor(r->io.fd.buf, len - sdslen(r->io.fd.buf));
+
+ /* if the remaining unused buffer is not large enough: memmove so that we can read the rest */
+ if (len > avail && sdsavail(r->io.fd.buf) < len - avail) {
+ sdsrange(r->io.fd.buf, r->io.fd.pos, -1);
+ r->io.fd.pos = 0;
+ }
+
+ /* if we don't already have all the data in the sds, read more */
+ while (len > sdslen(r->io.fd.buf) - r->io.fd.pos) {
+ size_t buffered = sdslen(r->io.fd.buf) - r->io.fd.pos;
+ size_t toread = len - buffered;
+ /* read either what's missing, or PROTO_IOBUF_LEN, the bigger of the two */
+ if (toread < PROTO_IOBUF_LEN)
+ toread = PROTO_IOBUF_LEN;
+ if (toread > sdsavail(r->io.fd.buf))
+ toread = sdsavail(r->io.fd.buf);
+ if (r->io.fd.read_limit != 0 &&
+ r->io.fd.read_so_far + buffered + toread > r->io.fd.read_limit) {
+ if (r->io.fd.read_limit >= r->io.fd.read_so_far - buffered)
+ toread = r->io.fd.read_limit - r->io.fd.read_so_far - buffered;
+ else {
+ errno = EOVERFLOW;
+ return 0;
+ }
+ }
+ int retval = read(r->io.fd.fd, (char*)r->io.fd.buf + sdslen(r->io.fd.buf), toread);
+ if (retval <= 0) {
+ if (errno == EWOULDBLOCK) errno = ETIMEDOUT;
+ return 0;
+ }
+ sdsIncrLen(r->io.fd.buf, retval);
+ }
+
+ memcpy(buf, (char*)r->io.fd.buf + r->io.fd.pos, len);
+ r->io.fd.read_so_far += len;
+ r->io.fd.pos += len;
+ return len;
+}
+
+/* Returns read/write position in file. */
+static off_t rioFdTell(rio *r) {
+ return r->io.fd.read_so_far;
+}
+
+/* Flushes any buffer to target device if applicable. Returns 1 on success
+ * and 0 on failures. */
+static int rioFdFlush(rio *r) {
+ /* Our flush is implemented by the write method, that recognizes a
+ * buffer set to NULL with a count of zero as a flush request. */
+ return rioFdWrite(r,NULL,0);
+}
+
+static const rio rioFdIO = {
+ rioFdRead,
+ rioFdWrite,
+ rioFdTell,
+ rioFdFlush,
+ NULL, /* update_checksum */
+ 0, /* current checksum */
+ 0, /* bytes read or written */
+ 0, /* read/write chunk size */
+ { { NULL, 0 } } /* union for io-specific vars */
+};
+
+/* create an rio that implements a buffered read from an fd
+ * read_limit argument stops buffering when the reaching the limit */
+void rioInitWithFd(rio *r, int fd, size_t read_limit) {
+ *r = rioFdIO;
+ r->io.fd.fd = fd;
+ r->io.fd.pos = 0;
+ r->io.fd.read_limit = read_limit;
+ r->io.fd.read_so_far = 0;
+ r->io.fd.buf = sdsnewlen(NULL, PROTO_IOBUF_LEN);
+ sdsclear(r->io.fd.buf);
+}
+
+/* release the rio stream.
+ * optionally returns the unread buffered data. */
+void rioFreeFd(rio *r, sds* out_remainingBufferedData) {
+ if(out_remainingBufferedData && (size_t)r->io.fd.pos < sdslen(r->io.fd.buf)) {
+ if (r->io.fd.pos > 0)
+ sdsrange(r->io.fd.buf, r->io.fd.pos, -1);
+ *out_remainingBufferedData = r->io.fd.buf;
+ } else {
+ sdsfree(r->io.fd.buf);
+ if (out_remainingBufferedData)
+ *out_remainingBufferedData = NULL;
+ }
+ r->io.fd.buf = NULL;
+}
+
/* ------------------- File descriptors set implementation ------------------- */
/* Returns 1 or 0 for success/failure.
@@ -300,7 +407,7 @@ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
* disk I/O concentrated in very little time. When we fsync in an explicit
* way instead the I/O pressure is more distributed across time. */
void rioSetAutoSync(rio *r, off_t bytes) {
- serverAssert(r->read == rioFileIO.read);
+ if(r->write != rioFileIO.write) return;
r->io.file.autosync = bytes;
}
diff --git a/src/rio.h b/src/rio.h
index 6749723d2..0c25d7541 100644
--- a/src/rio.h
+++ b/src/rio.h
@@ -73,6 +73,14 @@ struct _rio {
off_t buffered; /* Bytes written since last fsync. */
off_t autosync; /* fsync after 'autosync' bytes written. */
} file;
+ /* file descriptor */
+ struct {
+ int fd; /* File descriptor. */
+ off_t pos; /* pos in buf that was returned */
+ sds buf; /* buffered data */
+ size_t read_limit; /* don't allow to buffer/read more than that */
+ size_t read_so_far; /* amount of data read from the rio (not buffered) */
+ } fd;
/* Multiple FDs target (used to write to N sockets). */
struct {
int *fds; /* File descriptors. */
@@ -126,9 +134,11 @@ static inline int rioFlush(rio *r) {
void rioInitWithFile(rio *r, FILE *fp);
void rioInitWithBuffer(rio *r, sds s);
+void rioInitWithFd(rio *r, int fd, size_t read_limit);
void rioInitWithFdset(rio *r, int *fds, int numfds);
void rioFreeFdset(rio *r);
+void rioFreeFd(rio *r, sds* out_remainingBufferedData);
size_t rioWriteBulkCount(rio *r, char prefix, int count);
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);
diff --git a/src/server.c b/src/server.c
index b8f43b3a0..6b362bac6 100644
--- a/src/server.c
+++ b/src/server.c
@@ -256,7 +256,7 @@ struct redisCommand redisCommandTable[] = {
{"touch",touchCommand,-2,"rF",0,NULL,1,1,1,0,0},
{"pttl",pttlCommand,2,"rF",0,NULL,1,1,1,0,0},
{"persist",persistCommand,2,"wF",0,NULL,1,1,1,0,0},
- {"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0},
+ {"slaveof",slaveofCommand,-3,"ast",0,NULL,0,0,0,0,0},
{"role",roleCommand,1,"lst",0,NULL,0,0,0,0,0},
{"debug",debugCommand,-2,"as",0,NULL,0,0,0,0,0},
{"config",configCommand,-2,"lat",0,NULL,0,0,0,0,0},
@@ -1414,6 +1414,7 @@ void initServerConfig(void) {
server.aof_selected_db = -1; /* Make sure the first time will not match */
server.aof_flush_postponed_start = 0;
server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
+ server.rdb_key_save_delay = CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY;
server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED;
server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE;
server.pidfile = NULL;
@@ -1478,6 +1479,9 @@ void initServerConfig(void) {
server.cached_master = NULL;
server.master_initial_offset = -1;
server.repl_state = REPL_STATE_NONE;
+ server.repl_transfer_tmpfile = NULL;
+ server.repl_transfer_fd = -1;
+ server.repl_transfer_s = -1;
server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA;
server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY;
@@ -1485,6 +1489,7 @@ void initServerConfig(void) {
server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY;
server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC;
+ server.repl_diskless_load = CONFIG_DEFAULT_REPL_DISKLESS_LOAD;
server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY;
server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD;
server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT;
@@ -1957,17 +1962,6 @@ void initServer(void) {
"blocked clients subsystem.");
}
- /* Open the AOF file if needed. */
- if (server.aof_state == AOF_ON) {
- server.aof_fd = open(server.aof_filename,
- O_WRONLY|O_APPEND|O_CREAT,0644);
- if (server.aof_fd == -1) {
- serverLog(LL_WARNING, "Can't open the append-only file: %s",
- strerror(errno));
- exit(1);
- }
- }
-
/* 32 bit instances are limited to 4GB of address space, so if there is
* no explicit limit in the user provided configuration we set a limit
* at 3 GB using maxmemory with 'noeviction' policy'. This avoids
@@ -2549,6 +2543,7 @@ void closeListeningSockets(int unlink_unix_socket) {
int prepareForShutdown(int flags) {
int save = flags & SHUTDOWN_SAVE;
int nosave = flags & SHUTDOWN_NOSAVE;
+ int saveit = (server.saveparamslen > 0 && !nosave) || save;
serverLog(LL_WARNING,"User requested shutdown...");
@@ -2583,9 +2578,14 @@ int prepareForShutdown(int flags) {
flushAppendOnlyFile(1);
aof_fsync(server.aof_fd);
}
+
+ if (saveit && isUnsyncedSlave()) {
+ serverLog(LL_NOTICE,"Server is a slave with no loaded dataset, skipping RDB save on exit.");
+ saveit = 0;
+ }
/* Create a new RDB file before exiting. */
- if ((server.saveparamslen > 0 && !nosave) || save) {
+ if (saveit) {
serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting.");
/* Snapshotting. Perform a SYNC SAVE and exit */
rdbSaveInfo rsi, *rsiptr;
@@ -3063,7 +3063,7 @@ sds genRedisInfoString(char *section) {
(server.aof_last_write_status == C_OK) ? "ok" : "err",
server.stat_aof_cow_bytes);
- if (server.aof_state != AOF_OFF) {
+ if (server.aof_enabled) {
info = sdscatprintf(info,
"aof_current_size:%lld\r\n"
"aof_base_size:%lld\r\n"
diff --git a/src/server.h b/src/server.h
index ee3b7df5c..bd6d3ed5c 100644
--- a/src/server.h
+++ b/src/server.h
@@ -127,7 +127,9 @@ typedef long long mstime_t; /* millisecond time type. */
#define CONFIG_DEFAULT_RDB_CHECKSUM 1
#define CONFIG_DEFAULT_RDB_FILENAME "dump.rdb"
#define CONFIG_DEFAULT_REPL_DISKLESS_SYNC 0
+#define CONFIG_DEFAULT_REPL_DISKLESS_LOAD 0
#define CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY 5
+#define CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY 0
#define CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA 1
#define CONFIG_DEFAULT_SLAVE_READ_ONLY 1
#define CONFIG_DEFAULT_SLAVE_ANNOUNCE_IP NULL
@@ -998,6 +1000,7 @@ struct redisServer {
int daemonize; /* True if running as a daemon */
clientBufferLimitsConfig client_obuf_limits[CLIENT_TYPE_OBUF_COUNT];
/* AOF persistence */
+ int aof_enabled; /* AOF configuration */
int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */
int aof_fsync; /* Kind of fsync() policy */
char *aof_filename; /* Name of the AOF file */
@@ -1052,6 +1055,7 @@ struct redisServer {
int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */
int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */
int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */
+ int rdb_key_save_delay; /* Delay in microseconds between keys while writing the RDB. (for testings) */
/* Pipe and data structures for child -> parent info sharing. */
int child_info_pipe[2]; /* Pipe used to write the child_info_data. */
struct {
@@ -1087,7 +1091,8 @@ struct redisServer {
int repl_min_slaves_to_write; /* Min number of slaves to write. */
int repl_min_slaves_max_lag; /* Max lag of <count> slaves to write. */
int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */
- int repl_diskless_sync; /* Send RDB to slaves sockets directly. */
+ int repl_diskless_load; /* Slave parse RDB directly from the socket. */
+ int repl_diskless_sync; /* Master send RDB to slaves sockets directly. */
int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
/* Replication (slave) */
char *masterauth; /* AUTH with this password with master */
@@ -1522,7 +1527,8 @@ void replicationCacheMasterUsingMyself(void);
void feedReplicationBacklog(void *ptr, size_t len);
/* Generic persistence functions */
-void startLoading(FILE *fp);
+void startLoadingFile(FILE* fp);
+void startLoading(size_t size);
void loadingProgress(off_t pos);
void stopLoading(void);
@@ -1650,6 +1656,7 @@ unsigned int LRU_CLOCK(void);
const char *evictPolicyToString(void);
struct redisMemOverhead *getMemoryOverheadData(void);
void freeMemoryOverheadData(struct redisMemOverhead *mh);
+int isUnsyncedSlave();
#define RESTART_SERVER_NONE 0
#define RESTART_SERVER_GRACEFULLY (1<<0) /* Do proper shutdown. */
diff --git a/tests/integration/psync2.tcl b/tests/integration/psync2.tcl
index 3d9e5527a..9ad46f389 100644
--- a/tests/integration/psync2.tcl
+++ b/tests/integration/psync2.tcl
@@ -1,3 +1,4 @@
+proc test_psync2 {mdl sdls sdll} {
start_server {tags {"psync2"}} {
start_server {} {
start_server {} {
@@ -31,6 +32,9 @@ start_server {} {
if {$debug_msg} {puts "Log file: [srv [expr 0-$j] stdout]"}
}
+ test "PSYNC2: ### SETTING diskless master: $mdl; diskless slave (sync, load): $sdls, $sdll ###" {
+ }
+
set cycle 1
while {([clock seconds]-$start_time) < $duration} {
test "PSYNC2: --- CYCLE $cycle ---" {
@@ -45,6 +49,8 @@ start_server {} {
set used [list $master_id]
test "PSYNC2: \[NEW LAYOUT\] Set #$master_id as master" {
$R($master_id) slaveof no one
+ $R($master_id) config set repl-diskless-sync $mdl
+ $R($master_id) config set repl-diskless-sync-delay 1
if {$counter_value == 0} {
$R($master_id) set x $counter_value
}
@@ -62,6 +68,9 @@ start_server {} {
set master_port $R_port($mid)
test "PSYNC2: Set #$slave_id to replicate from #$mid" {
+ $R($slave_id) config set repl-diskless-load $sdll
+ $R($slave_id) config set repl-diskless-sync $sdls
+ $R($slave_id) config set repl-diskless-sync-delay 1
$R($slave_id) slaveof $master_host $master_port
}
lappend used $slave_id
@@ -243,3 +252,12 @@ start_server {} {
}
}}}}}
+}
+
+foreach mdl {yes no} {
+ foreach sdls {yes no} {
+ foreach sdll {yes no} {
+ test_psync2 $mdl $sdls $sdll
+ }
+ }
+}
diff --git a/tests/integration/replication-4.tcl b/tests/integration/replication-4.tcl
index 1c559b706..348b1cae5 100644
--- a/tests/integration/replication-4.tcl
+++ b/tests/integration/replication-4.tcl
@@ -1,12 +1,3 @@
-proc start_bg_complex_data {host port db ops} {
- set tclsh [info nameofexecutable]
- exec $tclsh tests/helpers/bg_complex_data.tcl $host $port $db $ops &
-}
-
-proc stop_bg_complex_data {handle} {
- catch {exec /bin/kill -9 $handle}
-}
-
start_server {tags {"repl"}} {
start_server {} {
@@ -153,3 +144,87 @@ start_server {tags {"repl"}} {
}
}
}
+
+# test that restart of a slave that is not in sync, doens't override an existing rdb
+start_server {tags {"repl"}} {
+ start_server {} {
+ set master [srv -1 client]
+ set master_host [srv -1 host]
+ set master_port [srv -1 port]
+ set slave [srv 0 client]
+
+ $master select 0
+ $slave select 0
+
+ # Populate master
+ for {set j 0} {$j < 100} {incr j} {
+ $master set key$j $j
+ }
+
+ # Connect slave to master
+ test {First server should have role slave after SLAVEOF} {
+ $slave slaveof $master_host $master_port
+ wait_for_condition 50 100 {
+ [s 0 master_link_status] eq {up}
+ } else {
+ fail "Replication not started."
+ }
+ }
+
+ test {Slave should sync with master} {
+ wait_for_condition 50 100 {
+ [$slave dbsize] == 100
+ } else {
+ fail "Replication not completed."
+ }
+ }
+
+ # Disconnect slave
+ $slave slaveof no one
+ $slave save
+
+ # Make sure no RDB saving is in progress
+ test {Make sure no RDB saving is in progress} {
+ wait_for_condition 50 100 {
+ [s -1 rdb_bgsave_in_progress] eq {0}
+ } else {
+ fail "RDB saving never finished."
+ }
+ }
+
+ # Setup delay to simulate a long RDB transfer time
+ # 50000 microseconds * 100 keys = 5 seconds
+ $master config set rdb-key-save-delay 50000
+
+ # Connect slave to master
+ $slave slaveof $master_host $master_port
+
+ # Make sure master started sending the file
+ test {Make sure master started sending RDB} {
+ wait_for_condition 50 100 {
+ [s -1 rdb_bgsave_in_progress] eq {1}
+ } else {
+ fail "RDB saving never started."
+ }
+ }
+
+ test {Kill master and restart slave} {
+ # Kill the master mid-RDB sending
+ catch {$master shutdown}
+
+ # Restart slave
+ catch {$slave debug restart}
+ }
+
+ after 100
+
+ # Make sure it has all 100 keys
+ test {Slave should load old RDB} {
+ wait_for_condition 50 100 {
+ [$slave dbsize] == 100
+ } else {
+ fail "RDB not loaded."
+ }
+ }
+ }
+}
diff --git a/tests/integration/replication-psync.tcl b/tests/integration/replication-psync.tcl
index 2b9e13f50..ac9cca71a 100644
--- a/tests/integration/replication-psync.tcl
+++ b/tests/integration/replication-psync.tcl
@@ -1,12 +1,3 @@
-proc start_bg_complex_data {host port db ops} {
- set tclsh [info nameofexecutable]
- exec $tclsh tests/helpers/bg_complex_data.tcl $host $port $db $ops &
-}
-
-proc stop_bg_complex_data {handle} {
- catch {exec /bin/kill -9 $handle}
-}
-
# Creates a master-slave pair and breaks the link continuously to force
# partial resyncs attempts, all this while flooding the master with
# write queries.
@@ -17,7 +8,7 @@ proc stop_bg_complex_data {handle} {
# If reconnect is > 0, the test actually try to break the connection and
# reconnect with the master, otherwise just the initial synchronization is
# checked for consistency.
-proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless reconnect} {
+proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} {
start_server {tags {"repl"}} {
start_server {} {
@@ -28,8 +19,9 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec
$master config set repl-backlog-size $backlog_size
$master config set repl-backlog-ttl $backlog_ttl
- $master config set repl-diskless-sync $diskless
+ $master config set repl-diskless-sync $mdl
$master config set repl-diskless-sync-delay 1
+ $slave config set repl-diskless-load $sdl
set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000]
set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000]
@@ -54,7 +46,7 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec
}
}
- test "Test replication partial resync: $descr (diskless: $diskless, reconnect: $reconnect)" {
+ test "Test replication partial resync: $descr (diskless: $mdl, $sdl)" {
# Now while the clients are writing data, break the maste-slave
# link multiple times.
if ($reconnect) {
@@ -79,6 +71,32 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec
stop_bg_complex_data $load_handle0
stop_bg_complex_data $load_handle1
stop_bg_complex_data $load_handle2
+
+ # Wait for the slave to reach the "online"
+ # state from the POV of the master.
+ set retry 5000
+ while {$retry} {
+ set info [$master info]
+ if {[string match {*slave0:*state=online*} $info]} {
+ break
+ } else {
+ incr retry -1
+ after 100
+ }
+ }
+ if {$retry == 0} {
+ error "assertion:Slave not correctly synchronized"
+ }
+
+ # Wait that slave acknowledge it is online so
+ # we are sure that DBSIZE and DEBUG DIGEST will not
+ # fail because of timing issues. (-LOADING error)
+ wait_for_condition 5000 100 {
+ [lindex [$slave role] 3] eq {connected}
+ } else {
+ fail "Slave still not connected after some time"
+ }
+
set retry 10
while {$retry && ([$master debug digest] ne [$slave debug digest])}\
{
@@ -106,23 +124,25 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec
}
}
-foreach diskless {no yes} {
- test_psync {no reconnection, just sync} 6 1000000 3600 0 {
- } $diskless 0
+foreach mdl {no yes} {
+ foreach sdl {no yes} {
+ test_psync {no reconnection, just sync} 6 1000000 3600 0 {
+ } $mdl $sdl 0
- test_psync {ok psync} 6 100000000 3600 0 {
+ test_psync {ok psync} 6 100000000 3600 0 {
assert {[s -1 sync_partial_ok] > 0}
- } $diskless 1
+ } $mdl $sdl 1
- test_psync {no backlog} 6 100 3600 0.5 {
+ test_psync {no backlog} 6 100 3600 0.5 {
assert {[s -1 sync_partial_err] > 0}
- } $diskless 1
+ } $mdl $sdl 1
- test_psync {ok after delay} 3 100000000 3600 3 {
+ test_psync {ok after delay} 3 100000000 3600 3 {
assert {[s -1 sync_partial_ok] > 0}
- } $diskless 1
+ } $mdl $sdl 1
- test_psync {backlog expired} 3 100000000 1 3 {
+ test_psync {backlog expired} 3 100000000 1 3 {
assert {[s -1 sync_partial_err] > 0}
- } $diskless 1
+ } $mdl $sdl 1
+ }
}
diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl
index e811cf0ee..fb05328e9 100644
--- a/tests/integration/replication.tcl
+++ b/tests/integration/replication.tcl
@@ -183,85 +183,92 @@ start_server {tags {"repl"}} {
}
}
-foreach dl {no yes} {
- start_server {tags {"repl"}} {
- set master [srv 0 client]
- $master config set repl-diskless-sync $dl
- set master_host [srv 0 host]
- set master_port [srv 0 port]
- set slaves {}
- set load_handle0 [start_write_load $master_host $master_port 3]
- set load_handle1 [start_write_load $master_host $master_port 5]
- set load_handle2 [start_write_load $master_host $master_port 20]
- set load_handle3 [start_write_load $master_host $master_port 8]
- set load_handle4 [start_write_load $master_host $master_port 4]
- start_server {} {
- lappend slaves [srv 0 client]
+foreach mdl {no yes} {
+ foreach sdl {no yes} {
+ start_server {tags {"repl"}} {
+ set master [srv 0 client]
+ $master config set repl-diskless-sync $mdl
+ $master config set repl-diskless-sync-delay 1
+ set master_host [srv 0 host]
+ set master_port [srv 0 port]
+ set slaves {}
+ set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000]
+ set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000]
+ set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000000]
+ set load_handle3 [start_write_load $master_host $master_port 8]
+ set load_handle4 [start_write_load $master_host $master_port 4]
+ after 5000 ;# wait for some data to accumulate so that we have RDB part for the fork
start_server {} {
lappend slaves [srv 0 client]
start_server {} {
lappend slaves [srv 0 client]
- test "Connect multiple slaves at the same time (issue #141), diskless=$dl" {
- # Send SLAVEOF commands to slaves
- [lindex $slaves 0] slaveof $master_host $master_port
- [lindex $slaves 1] slaveof $master_host $master_port
- [lindex $slaves 2] slaveof $master_host $master_port
-
- # Wait for all the three slaves to reach the "online"
- # state from the POV of the master.
- set retry 500
- while {$retry} {
- set info [r -3 info]
- if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} {
- break
+ start_server {} {
+ lappend slaves [srv 0 client]
+ test "Connect multiple slaves at the same time (issue #141), master diskless=$mdl, slave diskless=$sdl" {
+ # Send SLAVEOF commands to slaves
+ [lindex $slaves 0] config set repl-diskless-load $sdl
+ [lindex $slaves 1] config set repl-diskless-load $sdl
+ [lindex $slaves 2] config set repl-diskless-load $sdl
+ [lindex $slaves 0] slaveof $master_host $master_port
+ [lindex $slaves 1] slaveof $master_host $master_port
+ [lindex $slaves 2] slaveof $master_host $master_port
+
+ # Wait for all the three slaves to reach the "online"
+ # state from the POV of the master.
+ set retry 500
+ while {$retry} {
+ set info [r -3 info]
+ if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} {
+ break
+ } else {
+ incr retry -1
+ after 100
+ }
+ }
+ if {$retry == 0} {
+ error "assertion:Slaves not correctly synchronized"
+ }
+
+ # Wait that slaves acknowledge they are online so
+ # we are sure that DBSIZE and DEBUG DIGEST will not
+ # fail because of timing issues.
+ wait_for_condition 500 100 {
+ [lindex [[lindex $slaves 0] role] 3] eq {connected} &&
+ [lindex [[lindex $slaves 1] role] 3] eq {connected} &&
+ [lindex [[lindex $slaves 2] role] 3] eq {connected}
} else {
- incr retry -1
- after 100
+ fail "Slaves still not connected after some time"
}
- }
- if {$retry == 0} {
- error "assertion:Slaves not correctly synchronized"
- }
- # Wait that slaves acknowledge they are online so
- # we are sure that DBSIZE and DEBUG DIGEST will not
- # fail because of timing issues.
- wait_for_condition 500 100 {
- [lindex [[lindex $slaves 0] role] 3] eq {connected} &&
- [lindex [[lindex $slaves 1] role] 3] eq {connected} &&
- [lindex [[lindex $slaves 2] role] 3] eq {connected}
- } else {
- fail "Slaves still not connected after some time"
- }
+ # Stop the write load
+ stop_bg_complex_data $load_handle0
+ stop_bg_complex_data $load_handle1
+ stop_bg_complex_data $load_handle2
+ stop_write_load $load_handle3
+ stop_write_load $load_handle4
+
+ # Make sure that slaves and master have same
+ # number of keys
+ wait_for_condition 500 100 {
+ [$master dbsize] == [[lindex $slaves 0] dbsize] &&
+ [$master dbsize] == [[lindex $slaves 1] dbsize] &&
+ [$master dbsize] == [[lindex $slaves 2] dbsize]
+ } else {
+ fail "Different number of keys between master and slave after too long time."
+ }
- # Stop the write load
- stop_write_load $load_handle0
- stop_write_load $load_handle1
- stop_write_load $load_handle2
- stop_write_load $load_handle3
- stop_write_load $load_handle4
-
- # Make sure that slaves and master have same
- # number of keys
- wait_for_condition 500 100 {
- [$master dbsize] == [[lindex $slaves 0] dbsize] &&
- [$master dbsize] == [[lindex $slaves 1] dbsize] &&
- [$master dbsize] == [[lindex $slaves 2] dbsize]
- } else {
- fail "Different number of keys between masted and slave after too long time."
+ # Check digests
+ set digest [$master debug digest]
+ set digest0 [[lindex $slaves 0] debug digest]
+ set digest1 [[lindex $slaves 1] debug digest]
+ set digest2 [[lindex $slaves 2] debug digest]
+ assert {$digest ne 0000000000000000000000000000000000000000}
+ assert {$digest eq $digest0}
+ assert {$digest eq $digest1}
+ assert {$digest eq $digest2}
}
-
- # Check digests
- set digest [$master debug digest]
- set digest0 [[lindex $slaves 0] debug digest]
- set digest1 [[lindex $slaves 1] debug digest]
- set digest2 [[lindex $slaves 2] debug digest]
- assert {$digest ne 0000000000000000000000000000000000000000}
- assert {$digest eq $digest0}
- assert {$digest eq $digest1}
- assert {$digest eq $digest2}
- }
- }
+ }
+ }
}
}
}
diff --git a/tests/support/util.tcl b/tests/support/util.tcl
index 64c36b326..c3679ef30 100644
--- a/tests/support/util.tcl
+++ b/tests/support/util.tcl
@@ -375,3 +375,15 @@ proc start_write_load {host port seconds} {
proc stop_write_load {handle} {
catch {exec /bin/kill -9 $handle}
}
+
+# Execute a background process writing complex data for the specified number
+# of ops to the specified Redis instance.
+proc start_bg_complex_data {host port db ops} {
+ set tclsh [info nameofexecutable]
+ exec $tclsh tests/helpers/bg_complex_data.tcl $host $port $db $ops &
+}
+
+# Stop a process generating write load executed with start_bg_complex_data.
+proc stop_bg_complex_data {handle} {
+ catch {exec /bin/kill -9 $handle}
+}