diff options
author | Oran Agra <oran@redislabs.com> | 2017-11-29 09:49:29 +0200 |
---|---|---|
committer | antirez <antirez@gmail.com> | 2017-12-11 11:09:33 +0100 |
commit | c01b804d3f614caabaa4853beae1cb7f56e764f0 (patch) | |
tree | 3a6208d5d341c7214dccfbce001fcccc67637397 | |
parent | 9e5224bfd5ec13c14f89fe5eb15441a1d74d8dc5 (diff) | |
download | redis-c01b804d3f614caabaa4853beae1cb7f56e764f0.tar.gz |
Add config options to make diskless slave safer
1) Add diskless slave mode: 'on-empty-db' and 'swapdb'
2) When loading rdb from the network, don't kill the
server on short read (that can be a network error)
3) Fix rdb check when performed on preamble AOF
4) Add test for diskless load swapdb
-rw-r--r-- | redis.conf | 25 | ||||
-rw-r--r-- | src/aof.c | 2 | ||||
-rw-r--r-- | src/config.c | 23 | ||||
-rw-r--r-- | src/db.c | 23 | ||||
-rw-r--r-- | src/rdb.c | 20 | ||||
-rw-r--r-- | src/redis-check-rdb.c | 2 | ||||
-rw-r--r-- | src/replication.c | 69 | ||||
-rw-r--r-- | src/server.h | 18 | ||||
-rw-r--r-- | tests/integration/psync2.tcl | 2 | ||||
-rw-r--r-- | tests/integration/replication-psync.tcl | 2 | ||||
-rw-r--r-- | tests/integration/replication.tcl | 69 |
11 files changed, 215 insertions, 40 deletions
diff --git a/redis.conf b/redis.conf index 7eb692a8d..5f7583b37 100644 --- a/redis.conf +++ b/redis.conf @@ -361,6 +361,31 @@ repl-diskless-sync no # it entirely just set it to 0 seconds and the transfer will start ASAP. repl-diskless-sync-delay 5 +# Slave can load the rdb it reads from the replication link directly from the +# socket, or store the rdb to a file and read that file after it was completely +# recived from the master. +# In many cases the disk is slower than the network, and storing and loading +# the rdb file may increase replication time (and even increase the master's +# Copy on Write memory and salve buffers). +# However, parsing the rdb file directly from the socket may mean that we have +# to flush the contents of the current database before the full rdb was received. +# for this reason we have the following options: +# "disabled" - Don't use diskless load (store the rdb file to the disk first) +# "on-empty-db" - Use diskless load only when it is completely safe. +# "swapdb" - Keep a copy of the current db contents in RAM while parsing +# the data directly from the socket. note that this requires +# sufficient memory, if you don't have it, you risk an OOM kill. +# "flushdb" - Flush the current db contents before parsing. note that if +# there's a problem before the replication succeeded you may +# lose all your data. +# Note that the two settings mentioned above are safe, and the last two are +# risky but more efficient and faster. + +repl-diskless-load disabled +# repl-diskless-load on-empty-db +# repl-diskless-load swapdb +# repl-diskless-load flushdb + # Slaves send PINGs to server in a predefined interval. It's possible to change # this interval with the repl_ping_slave_period option. The default value is 10 # seconds. @@ -667,7 +667,7 @@ int loadAppendOnlyFile(char *filename) { server.aof_state = AOF_OFF; fakeClient = createFakeClient(); - startLoadingFile(fp); + startLoadingFile(fp, filename); /* Check if this AOF file has an RDB preamble. In that case we need to * load the RDB file and later continue loading the AOF tail. */ diff --git a/src/config.c b/src/config.c index a7a14f498..c5e34191f 100644 --- a/src/config.c +++ b/src/config.c @@ -91,6 +91,14 @@ configEnum aof_fsync_enum[] = { {NULL, 0} }; +configEnum repl_diskless_load_enum[] = { + {"disabled", REPL_DISKLESS_LOAD_DISABLED}, + {"on-empty-db", REPL_DISKLESS_LOAD_WHEN_DB_EMPTY}, + {"swapdb", REPL_DISKLESS_LOAD_SWAPDB}, + {"flushdb", REPL_DISKLESS_LOAD_FLUSHDB}, + {NULL, 0} +}; + /* Output buffer limits presets. */ clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = { {0, 0, 0}, /* normal */ @@ -366,8 +374,9 @@ void loadServerConfigFromString(char *config) { err = "argument must be 'yes' or 'no'"; goto loaderr; } } else if (!strcasecmp(argv[0],"repl-diskless-load") && argc==2) { - if ((server.repl_diskless_load = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; + server.repl_diskless_load = configEnumGetValue(repl_diskless_load_enum,argv[1]); + if (server.repl_diskless_load == INT_MIN) { + err = "argument must be 'disabled', 'on-empty-db', 'swapdb' or 'flushdb'"; } } else if (!strcasecmp(argv[0],"repl-diskless-sync-delay") && argc==2) { server.repl_diskless_sync_delay = atoi(argv[1]); @@ -1029,8 +1038,6 @@ void configSetCommand(client *c) { } #endif } config_set_bool_field( - "repl-diskless-load",server.repl_diskless_load) { - } config_set_bool_field( "protected-mode",server.protected_mode) { } config_set_bool_field( "stop-writes-on-bgsave-error",server.stop_writes_on_bgsave_err) { @@ -1160,6 +1167,8 @@ void configSetCommand(client *c) { "maxmemory-policy",server.maxmemory_policy,maxmemory_policy_enum) { } config_set_enum_field( "appendfsync",server.aof_fsync,aof_fsync_enum) { + } config_set_enum_field( + "repl-diskless-load",server.repl_diskless_load,repl_diskless_load_enum) { /* Everyhing else is an error... */ } config_set_else { @@ -1315,8 +1324,6 @@ void configGetCommand(client *c) { server.repl_disable_tcp_nodelay); config_get_bool_field("repl-diskless-sync", server.repl_diskless_sync); - config_get_bool_field("repl-diskless-load", - server.repl_diskless_load); config_get_bool_field("aof-rewrite-incremental-fsync", server.aof_rewrite_incremental_fsync); config_get_bool_field("aof-load-truncated", @@ -1343,6 +1350,8 @@ void configGetCommand(client *c) { server.aof_fsync,aof_fsync_enum); config_get_enum_field("syslog-facility", server.syslog_facility,syslog_facility_enum); + config_get_enum_field("repl-diskless-load", + server.repl_diskless_load,repl_diskless_load_enum); /* Everything we can't handle with macros follows. */ @@ -2003,7 +2012,7 @@ int rewriteConfig(char *path) { rewriteConfigBytesOption(state,"repl-backlog-ttl",server.repl_backlog_time_limit,CONFIG_DEFAULT_REPL_BACKLOG_TIME_LIMIT); rewriteConfigYesNoOption(state,"repl-disable-tcp-nodelay",server.repl_disable_tcp_nodelay,CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY); rewriteConfigYesNoOption(state,"repl-diskless-sync",server.repl_diskless_sync,CONFIG_DEFAULT_REPL_DISKLESS_SYNC); - rewriteConfigYesNoOption(state,"repl-diskless-load",server.repl_diskless_load,CONFIG_DEFAULT_REPL_DISKLESS_LOAD); + rewriteConfigEnumOption(state,"repl-diskless-load",server.repl_diskless_load,repl_diskless_load_enum,CONFIG_DEFAULT_REPL_DISKLESS_LOAD); rewriteConfigNumericalOption(state,"repl-diskless-sync-delay",server.repl_diskless_sync_delay,CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY); rewriteConfigNumericalOption(state,"slave-priority",server.slave_priority,CONFIG_DEFAULT_SLAVE_PRIORITY); rewriteConfigNumericalOption(state,"min-slaves-to-write",server.repl_min_slaves_to_write,CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE); @@ -316,7 +316,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { * On success the fuction returns the number of keys removed from the * database(s). Otherwise -1 is returned in the specific case the * DB number is out of range, and errno is set to EINVAL. */ -long long emptyDb(int dbnum, int flags, void(callback)(void*)) { +long long emptyDbGeneric(redisDb* dbarray, int dbnum, int flags, void(callback)(void*)) { int j, async = (flags & EMPTYDB_ASYNC); long long removed = 0; @@ -327,12 +327,12 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { for (j = 0; j < server.dbnum; j++) { if (dbnum != -1 && dbnum != j) continue; - removed += dictSize(server.db[j].dict); + removed += dictSize(dbarray[j].dict); if (async) { - emptyDbAsync(&server.db[j]); + emptyDbAsync(&dbarray[j]); } else { - dictEmpty(server.db[j].dict,callback); - dictEmpty(server.db[j].expires,callback); + dictEmpty(dbarray[j].dict,callback); + dictEmpty(dbarray[j].expires,callback); } } if (server.cluster_enabled) { @@ -346,6 +346,10 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { return removed; } +long long emptyDb(int dbnum, int flags, void(callback)(void*)) { + return emptyDbGeneric(server.db, dbnum, flags, callback); +} + int selectDb(client *c, int id) { if (id < 0 || id >= server.dbnum) return C_ERR; @@ -353,6 +357,15 @@ int selectDb(client *c, int id) { return C_OK; } +long long totalServerKeyCount() { + long long total = 0; + int j; + for (j = 0; j < server.dbnum; j++) { + total += dictSize(server.db[j].dict); + } + return total; +} + /*----------------------------------------------------------------------------- * Hooks for key space changes. * @@ -44,6 +44,7 @@ #define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__) +char* rdbFileBeingLoaded = NULL; /* used for rdb chekcing on read error */ extern int rdbCheckMode; void rdbCheckError(const char *fmt, ...); void rdbCheckSetError(const char *fmt, ...); @@ -61,11 +62,17 @@ void rdbCheckThenExit(int linenum, char *reason, ...) { if (!rdbCheckMode) { serverLog(LL_WARNING, "%s", msg); - char *argv[2] = {"",server.rdb_filename}; - redis_check_rdb_main(2,argv,NULL); + if (rdbFileBeingLoaded) { + char *argv[2] = {"",rdbFileBeingLoaded}; + redis_check_rdb_main(2,argv,NULL); + } else { + serverLog(LL_WARNING, "Failure loading rdb format from socket, assuming connection error, resuming operation."); + return; + } } else { rdbCheckError("%s",msg); } + serverLog(LL_WARNING, "Terminating server after rdb file reading failure."); exit(1); } @@ -1553,10 +1560,14 @@ void startLoading(size_t size) { server.loading_total_bytes = size; } -void startLoadingFile(FILE *fp) { +/* Mark that we are loading in the global state and setup the fields + * needed to provide loading stats. + * 'filename' is optional and used for rdb-check on error */ +void startLoadingFile(FILE *fp, char* filename) { struct stat sb; if (fstat(fileno(fp), &sb) == -1) sb.st_size = 0; + rdbFileBeingLoaded = filename; startLoading(sb.st_size); } @@ -1570,6 +1581,7 @@ void loadingProgress(off_t pos) { /* Loading finished */ void stopLoading(void) { server.loading = 0; + rdbFileBeingLoaded = NULL; } /* Track loading progress in order to serve client's from time to time @@ -1768,7 +1780,7 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi) { int retval; if ((fp = fopen(filename,"r")) == NULL) return C_ERR; - startLoadingFile(fp); + startLoadingFile(fp, filename); rioInitWithFile(&rdb,fp); retval = rdbLoadRio(&rdb,rsi); fclose(fp); diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c index 243e0035b..011630124 100644 --- a/src/redis-check-rdb.c +++ b/src/redis-check-rdb.c @@ -201,7 +201,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { goto err; } - startLoadingFile(fp); + startLoadingFile(fp, rdbfilename); while(1) { robj *key, *val; expiretime = -1; diff --git a/src/replication.c b/src/replication.c index a53513651..5239c4af2 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1105,11 +1105,23 @@ void restartAOF() { } } +static int useDisklessLoad() { + /* compute boolean decision to use diskless load */ + return server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB || + server.repl_diskless_load == REPL_DISKLESS_LOAD_FLUSHDB || + (server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && totalServerKeyCount()==0); +} + + /* Asynchronously read the SYNC payload we receive from a master */ #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { char buf[4096]; ssize_t nread, readlen; + int use_diskless_load; + redisDb *diskless_load_backup = NULL; + int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; + int i; off_t left; UNUSED(el); UNUSED(privdata); @@ -1166,19 +1178,20 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { server.repl_transfer_size = 0; serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: receiving streamed RDB from master with EOF %s", - server.repl_diskless_load? "to parser":"to disk"); + useDisklessLoad()? "to parser":"to disk"); } else { usemark = 0; server.repl_transfer_size = strtol(buf+1,NULL,10); serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: receiving %lld bytes from master %s", (long long) server.repl_transfer_size, - server.repl_diskless_load? "to parser":"to disk"); + useDisklessLoad()? "to parser":"to disk"); } return; } - if (!server.repl_diskless_load) { + use_diskless_load = useDisklessLoad(); + if (!use_diskless_load) { /* read the data from the socket, store it to a file and search for the EOF */ if (usemark) { @@ -1259,10 +1272,17 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { * RDB, otherwise we'll create a copy-on-write disaster. */ if (server.aof_state != AOF_OFF) stopAppendOnly(); signalFlushedDb(-1); - emptyDb( - -1, - server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS, - replicationEmptyDbCallback); + if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { + /* create a backup of the current db */ + diskless_load_backup = zmalloc(sizeof(redisDb)*server.dbnum); + for (i=0; i<server.dbnum; i++) { + diskless_load_backup[i] = server.db[i]; + server.db[i].dict = dictCreate(&dbDictType,NULL); + server.db[i].expires = dictCreate(&keyptrDictType,NULL); + } + } else { + emptyDb(-1,empty_db_flags,replicationEmptyDbCallback); + } /* Before loading the DB into memory we need to delete the readable * handler, otherwise it will get called recursively since * rdbLoad() will call the event loop to process events from time to @@ -1270,7 +1290,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory"); rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; - if (server.repl_diskless_load) { + if (use_diskless_load) { rio rdb; rioInitWithFd(&rdb,fd,server.repl_transfer_size); /* Put the socket in blocking mode to simplify RDB transfer. @@ -1280,18 +1300,37 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { startLoading(server.repl_transfer_size); if (rdbLoadRio(&rdb,&rsi) != C_OK) { + /* rdbloading failed */ stopLoading(); - serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); + serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from socket"); cancelReplicationHandshake(); rioFreeFd(&rdb, NULL); - /* Remove the half-loaded data */ - emptyDb( - -1, - server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS, - replicationEmptyDbCallback); + if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { + /* restore the backed up db */ + emptyDbGeneric(server.db,-1,empty_db_flags,replicationEmptyDbCallback); + for (i=0; i<server.dbnum; i++) { + dictRelease(server.db[i].dict); + dictRelease(server.db[i].expires); + server.db[i] = diskless_load_backup[i]; + } + zfree(diskless_load_backup); + } else { + /* Remove the half-loaded data */ + emptyDb(-1,empty_db_flags,replicationEmptyDbCallback); + } return; } stopLoading(); + /* rdbloading succeeded */ + if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { + /* delete the backup db that we created before starting to load the new rdb */ + emptyDbGeneric(diskless_load_backup,-1,empty_db_flags,replicationEmptyDbCallback); + for (i=0; i<server.dbnum; i++) { + dictRelease(diskless_load_backup[i].dict); + dictRelease(diskless_load_backup[i].expires); + } + zfree(diskless_load_backup); + } if (usemark) { if (!rioRead(&rdb,buf,CONFIG_RUN_ID_SIZE) || memcmp(buf,eofmark,CONFIG_RUN_ID_SIZE) != 0) { serverLog(LL_WARNING,"Replication stream EOF marker is broken"); @@ -1851,7 +1890,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { } /* Prepare a suitable temp file for bulk transfer */ - if (!server.repl_diskless_load) { + if (!useDisklessLoad()) { while(maxtries--) { snprintf(tmpfile,256, "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); diff --git a/src/server.h b/src/server.h index bd6d3ed5c..afc47043b 100644 --- a/src/server.h +++ b/src/server.h @@ -127,7 +127,6 @@ typedef long long mstime_t; /* millisecond time type. */ #define CONFIG_DEFAULT_RDB_CHECKSUM 1 #define CONFIG_DEFAULT_RDB_FILENAME "dump.rdb" #define CONFIG_DEFAULT_REPL_DISKLESS_SYNC 0 -#define CONFIG_DEFAULT_REPL_DISKLESS_LOAD 0 #define CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY 5 #define CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY 0 #define CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA 1 @@ -346,6 +345,13 @@ typedef long long mstime_t; /* millisecond time type. */ #define AOF_FSYNC_EVERYSEC 2 #define CONFIG_DEFAULT_AOF_FSYNC AOF_FSYNC_EVERYSEC +/* Replication diskless load defines */ +#define REPL_DISKLESS_LOAD_DISABLED 0 +#define REPL_DISKLESS_LOAD_WHEN_DB_EMPTY 1 +#define REPL_DISKLESS_LOAD_SWAPDB 2 +#define REPL_DISKLESS_LOAD_FLUSHDB 3 +#define CONFIG_DEFAULT_REPL_DISKLESS_LOAD REPL_DISKLESS_LOAD_DISABLED + /* Zip structure related defaults */ #define OBJ_HASH_MAX_ZIPLIST_ENTRIES 512 #define OBJ_HASH_MAX_ZIPLIST_VALUE 64 @@ -1055,7 +1061,8 @@ struct redisServer { int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */ int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */ int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */ - int rdb_key_save_delay; /* Delay in microseconds between keys while writing the RDB. (for testings) */ + int rdb_key_save_delay; /* Delay in microseconds between keys while + * writing the RDB. (for testings) */ /* Pipe and data structures for child -> parent info sharing. */ int child_info_pipe[2]; /* Pipe used to write the child_info_data. */ struct { @@ -1092,7 +1099,8 @@ struct redisServer { int repl_min_slaves_max_lag; /* Max lag of <count> slaves to write. */ int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */ int repl_diskless_load; /* Slave parse RDB directly from the socket. */ - int repl_diskless_sync; /* Master send RDB to slaves sockets directly. */ + int repl_diskless_sync; /* Master send RDB to slaves sockets directly. + * see REPL_DISKLESS_LOAD_* enum */ int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ /* Replication (slave) */ char *masterauth; /* AUTH with this password with master */ @@ -1527,7 +1535,7 @@ void replicationCacheMasterUsingMyself(void); void feedReplicationBacklog(void *ptr, size_t len); /* Generic persistence functions */ -void startLoadingFile(FILE* fp); +void startLoadingFile(FILE* fp, char* filename); void startLoading(size_t size); void loadingProgress(off_t pos); void stopLoading(void); @@ -1750,6 +1758,8 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o); #define EMPTYDB_NO_FLAGS 0 /* No flags. */ #define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */ long long emptyDb(int dbnum, int flags, void(callback)(void*)); +long long emptyDbGeneric(redisDb* dbarray, int dbnum, int flags, void(callback)(void*)); +long long totalServerKeyCount(); int selectDb(client *c, int id); void signalModifiedKey(redisDb *db, robj *key); diff --git a/tests/integration/psync2.tcl b/tests/integration/psync2.tcl index 9ad46f389..44a705b4f 100644 --- a/tests/integration/psync2.tcl +++ b/tests/integration/psync2.tcl @@ -256,7 +256,7 @@ start_server {} { foreach mdl {yes no} { foreach sdls {yes no} { - foreach sdll {yes no} { + foreach sdll {disabled swapdb} { test_psync2 $mdl $sdls $sdll } } diff --git a/tests/integration/replication-psync.tcl b/tests/integration/replication-psync.tcl index ac9cca71a..69896a83b 100644 --- a/tests/integration/replication-psync.tcl +++ b/tests/integration/replication-psync.tcl @@ -125,7 +125,7 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reco } foreach mdl {no yes} { - foreach sdl {no yes} { + foreach sdl {disabled swapdb} { test_psync {no reconnection, just sync} 6 1000000 3600 0 { } $mdl $sdl 0 diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index fb05328e9..8765304d6 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -184,7 +184,7 @@ start_server {tags {"repl"}} { } foreach mdl {no yes} { - foreach sdl {no yes} { + foreach sdl {disabled swapdb} { start_server {tags {"repl"}} { set master [srv 0 client] $master config set repl-diskless-sync $mdl @@ -273,3 +273,70 @@ foreach mdl {no yes} { } } } + +test {slave fails full sync and diskless load swapdb recoveres it} { + start_server {tags {"repl"}} { + set slave [srv 0 client] + set slave_host [srv 0 host] + set slave_port [srv 0 port] + set slave_log [srv 0 stdout] + start_server {} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + # Put different data sets on the master and slave + # we need to put large keys on the master since the slave replies to info only once in 2mb + $slave debug populate 2000 slave 10 + $master debug populate 200 master 100000 + $master config set rdbcompression no + + # Set master and slave to use diskless replication + $master config set repl-diskless-sync yes + $master config set repl-diskless-sync-delay 0 + $slave config set repl-diskless-load swapdb + + # Set master with a slow rdb generation, so that we can easily disconnect it mid sync + # 10ms per key, with 200 keys is 2 seconds + $master config set rdb-key-save-delay 10000 + + # Start the replication process... + $slave slaveof $master_host $master_port + + # wait for the slave to start reading the rdb + wait_for_condition 50 100 { + [s -1 loading] eq 1 + } else { + fail "Slave didn't get into loading mode" + } + + # make sure that next sync will not start immediately so that we can catch the slave in betweeen syncs + $master config set repl-diskless-sync-delay 5 + # for faster server shutdown, make rdb saving fast again (the fork is already uses the slow one) + $master config set rdb-key-save-delay 0 + + # waiting slave to do flushdb (key count drop) + wait_for_condition 50 100 { + 2000 != [scan [regexp -inline {keys\=([\d]*)} [$slave info keyspace]] keys=%d] + } else { + fail "Slave didn't flush" + } + + # make sure we're still loading + assert_equal [s -1 loading] 1 + + # kill the slave connection on the master + set killed [$master client kill type slave] + + # wait for loading to stop (fail) + wait_for_condition 50 100 { + [s -1 loading] eq 0 + } else { + fail "Slave didn't disconnect" + } + + # make sure the original keys were restored + assert_equal [$slave dbsize] 2000 + } + } +}
\ No newline at end of file |