summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--redis.conf16
-rw-r--r--src/Makefile2
-rw-r--r--src/anet.c14
-rw-r--r--src/anet.h1
-rw-r--r--src/aof.c2
-rw-r--r--src/config.c388
-rw-r--r--src/db.c55
-rw-r--r--src/debug.c2
-rw-r--r--src/expire.c1
-rw-r--r--src/module.c14
-rw-r--r--src/networking.c83
-rw-r--r--src/object.c4
-rw-r--r--src/rdb.c40
-rw-r--r--src/redis-check-rdb.c4
-rw-r--r--src/redismodule.h4
-rw-r--r--src/replication.c417
-rw-r--r--src/rio.c120
-rw-r--r--src/rio.h10
-rw-r--r--src/server.c24
-rw-r--r--src/server.h53
-rw-r--r--src/tracking.c175
-rw-r--r--tests/integration/replication-4.tcl9
-rw-r--r--tests/integration/replication-psync.tcl40
-rw-r--r--tests/integration/replication.tcl214
-rw-r--r--tests/support/util.tcl12
-rw-r--r--tests/unit/geo.tcl14
-rw-r--r--tests/unit/scan.tcl45
27 files changed, 1214 insertions, 549 deletions
diff --git a/redis.conf b/redis.conf
index 060510768..74b6c018f 100644
--- a/redis.conf
+++ b/redis.conf
@@ -377,6 +377,22 @@ repl-diskless-sync no
# it entirely just set it to 0 seconds and the transfer will start ASAP.
repl-diskless-sync-delay 5
+# Replica can load the rdb it reads from the replication link directly from the
+# socket, or store the rdb to a file and read that file after it was completely
+# recived from the master.
+# In many cases the disk is slower than the network, and storing and loading
+# the rdb file may increase replication time (and even increase the master's
+# Copy on Write memory and salve buffers).
+# However, parsing the rdb file directly from the socket may mean that we have
+# to flush the contents of the current database before the full rdb was received.
+# for this reason we have the following options:
+# "disabled" - Don't use diskless load (store the rdb file to the disk first)
+# "on-empty-db" - Use diskless load only when it is completely safe.
+# "swapdb" - Keep a copy of the current db contents in RAM while parsing
+# the data directly from the socket. note that this requires
+# sufficient memory, if you don't have it, you risk an OOM kill.
+repl-diskless-load disabled
+
# Replicas send PINGs to server in a predefined interval. It's possible to change
# this interval with the repl_ping_replica_period option. The default value is 10
# seconds.
diff --git a/src/Makefile b/src/Makefile
index f35685eff..b6cc69e2f 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -164,7 +164,7 @@ endif
REDIS_SERVER_NAME=redis-server
REDIS_SENTINEL_NAME=redis-sentinel
-REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o acl.o gopher.o
+REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o acl.o gopher.o tracking.o
REDIS_CLI_NAME=redis-cli
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o
REDIS_BENCHMARK_NAME=redis-benchmark
diff --git a/src/anet.c b/src/anet.c
index 2981fca13..2088f4fb1 100644
--- a/src/anet.c
+++ b/src/anet.c
@@ -193,6 +193,20 @@ int anetSendTimeout(char *err, int fd, long long ms) {
return ANET_OK;
}
+/* Set the socket receive timeout (SO_RCVTIMEO socket option) to the specified
+ * number of milliseconds, or disable it if the 'ms' argument is zero. */
+int anetRecvTimeout(char *err, int fd, long long ms) {
+ struct timeval tv;
+
+ tv.tv_sec = ms/1000;
+ tv.tv_usec = (ms%1000)*1000;
+ if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) {
+ anetSetError(err, "setsockopt SO_RCVTIMEO: %s", strerror(errno));
+ return ANET_ERR;
+ }
+ return ANET_OK;
+}
+
/* anetGenericResolve() is called by anetResolve() and anetResolveIP() to
* do the actual work. It resolves the hostname "host" and set the string
* representation of the IP address into the buffer pointed by "ipbuf".
diff --git a/src/anet.h b/src/anet.h
index 7142f78d2..dd735240d 100644
--- a/src/anet.h
+++ b/src/anet.h
@@ -70,6 +70,7 @@ int anetEnableTcpNoDelay(char *err, int fd);
int anetDisableTcpNoDelay(char *err, int fd);
int anetTcpKeepAlive(char *err, int fd);
int anetSendTimeout(char *err, int fd, long long ms);
+int anetRecvTimeout(char *err, int fd, long long ms);
int anetPeerToString(int fd, char *ip, size_t ip_len, int *port);
int anetKeepAlive(char *err, int fd, int interval);
int anetSockName(int fd, char *ip, size_t ip_len, int *port);
diff --git a/src/aof.c b/src/aof.c
index 4744847d2..565ee8073 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -729,7 +729,7 @@ int loadAppendOnlyFile(char *filename) {
server.aof_state = AOF_OFF;
fakeClient = createFakeClient();
- startLoading(fp);
+ startLoadingFile(fp, filename);
/* Check if this AOF file has an RDB preamble. In that case we need to
* load the RDB file and later continue loading the AOF tail. */
diff --git a/src/config.c b/src/config.c
index 7f0e9af89..fde00ddf5 100644
--- a/src/config.c
+++ b/src/config.c
@@ -91,6 +91,13 @@ configEnum aof_fsync_enum[] = {
{NULL, 0}
};
+configEnum repl_diskless_load_enum[] = {
+ {"disabled", REPL_DISKLESS_LOAD_DISABLED},
+ {"on-empty-db", REPL_DISKLESS_LOAD_WHEN_DB_EMPTY},
+ {"swapdb", REPL_DISKLESS_LOAD_SWAPDB},
+ {NULL, 0}
+};
+
/* Output buffer limits presets. */
clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = {
{0, 0, 0}, /* normal */
@@ -98,6 +105,48 @@ clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = {
{1024*1024*32, 1024*1024*8, 60} /* pubsub */
};
+/* Configuration values that require no special handling to set, get, load or
+ * rewrite. */
+typedef struct configYesNo {
+ const char *name; /* The user visible name of this config */
+ const char *alias; /* An alias that can also be used for this config */
+ int *config; /* The pointer to the server config this value is stored in */
+ const int modifiable; /* Can this value be updated by CONFIG SET? */
+ const int default_value; /* The default value of the config on rewrite */
+} configYesNo;
+
+configYesNo configs_yesno[] = {
+ /* Non-Modifiable */
+ {"rdbchecksum",NULL,&server.rdb_checksum,0,CONFIG_DEFAULT_RDB_CHECKSUM},
+ {"daemonize",NULL,&server.daemonize,0,0},
+ {"io-threads-do-reads",NULL,&server.io_threads_do_reads, 0, CONFIG_DEFAULT_IO_THREADS_DO_READS},
+ {"always-show-logo",NULL,&server.always_show_logo,0,CONFIG_DEFAULT_ALWAYS_SHOW_LOGO},
+ /* Modifiable */
+ {"protected-mode",NULL,&server.protected_mode,1,CONFIG_DEFAULT_PROTECTED_MODE},
+ {"rdbcompression",NULL,&server.rdb_compression,1,CONFIG_DEFAULT_RDB_COMPRESSION},
+ {"activerehashing",NULL,&server.activerehashing,1,CONFIG_DEFAULT_ACTIVE_REHASHING},
+ {"stop-writes-on-bgsave-error",NULL,&server.stop_writes_on_bgsave_err,1,CONFIG_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR},
+ {"dynamic-hz",NULL,&server.dynamic_hz,1,CONFIG_DEFAULT_DYNAMIC_HZ},
+ {"lazyfree-lazy-eviction",NULL,&server.lazyfree_lazy_eviction,1,CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION},
+ {"lazyfree-lazy-expire",NULL,&server.lazyfree_lazy_expire,1,CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE},
+ {"lazyfree-lazy-server-del",NULL,&server.lazyfree_lazy_server_del,1,CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL},
+ {"repl-disable-tcp-nodelay",NULL,&server.repl_disable_tcp_nodelay,1,CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY},
+ {"repl-diskless-sync",NULL,&server.repl_diskless_sync,1,CONFIG_DEFAULT_REPL_DISKLESS_SYNC},
+ {"gopher-enabled",NULL,&server.gopher_enabled,1,CONFIG_DEFAULT_GOPHER_ENABLED},
+ {"aof-rewrite-incremental-fsync",NULL,&server.aof_rewrite_incremental_fsync,1,CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC},
+ {"no-appendfsync-on-rewrite",NULL,&server.aof_no_fsync_on_rewrite,1,CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE},
+ {"cluster-require-full-coverage",NULL,&server.cluster_require_full_coverage,CLUSTER_DEFAULT_REQUIRE_FULL_COVERAGE},
+ {"rdb-save-incremental-fsync",NULL,&server.rdb_save_incremental_fsync,1,CONFIG_DEFAULT_RDB_SAVE_INCREMENTAL_FSYNC},
+ {"aof-load-truncated",NULL,&server.aof_load_truncated,1,CONFIG_DEFAULT_AOF_LOAD_TRUNCATED},
+ {"aof-use-rdb-preamble",NULL,&server.aof_use_rdb_preamble,1,CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE},
+ {"cluster-replica-no-failover","cluster-slave-no-failover",&server.cluster_slave_no_failover,1,CLUSTER_DEFAULT_SLAVE_NO_FAILOVER},
+ {"replica-lazy-flush","slave-lazy-flush",&server.repl_slave_lazy_flush,1,CONFIG_DEFAULT_SLAVE_LAZY_FLUSH},
+ {"replica-serve-stale-data","slave-serve-stale-data",&server.repl_serve_stale_data,1,CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA},
+ {"replica-read-only","slave-read-only",&server.repl_slave_ro,1,CONFIG_DEFAULT_SLAVE_READ_ONLY},
+ {"replica-ignore-maxmemory","slave-ignore-maxmemory",&server.repl_slave_ignore_maxmemory,1,CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY},
+ {NULL, NULL, 0, 0}
+};
+
/*-----------------------------------------------------------------------------
* Enum access functions
*----------------------------------------------------------------------------*/
@@ -201,6 +250,26 @@ void loadServerConfigFromString(char *config) {
}
sdstolower(argv[0]);
+ /* Iterate the configs that are standard */
+ int match = 0;
+ for (configYesNo *config = configs_yesno; config->name != NULL; config++) {
+ if ((!strcasecmp(argv[0],config->name) ||
+ (config->alias && !strcasecmp(argv[0],config->alias))) &&
+ (argc == 2))
+ {
+ if ((*(config->config) = yesnotoi(argv[1])) == -1) {
+ err = "argument must be 'yes' or 'no'"; goto loaderr;
+ }
+ match = 1;
+ break;
+ }
+ }
+
+ if (match) {
+ sdsfreesplitres(argv,argc);
+ continue;
+ }
+
/* Execute config directives */
if (!strcasecmp(argv[0],"timeout") && argc == 2) {
server.maxidletime = atoi(argv[1]);
@@ -212,14 +281,6 @@ void loadServerConfigFromString(char *config) {
if (server.tcpkeepalive < 0) {
err = "Invalid tcp-keepalive value"; goto loaderr;
}
- } else if (!strcasecmp(argv[0],"protected-mode") && argc == 2) {
- if ((server.protected_mode = yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
- } else if (!strcasecmp(argv[0],"gopher-enabled") && argc == 2) {
- if ((server.gopher_enabled = yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
} else if (!strcasecmp(argv[0],"port") && argc == 2) {
server.port = atoi(argv[1]);
if (server.port < 0 || server.port > 65535) {
@@ -290,10 +351,6 @@ void loadServerConfigFromString(char *config) {
} else if (!strcasecmp(argv[0],"aclfile") && argc == 2) {
zfree(server.acl_filename);
server.acl_filename = zstrdup(argv[1]);
- } else if (!strcasecmp(argv[0],"always-show-logo") && argc == 2) {
- if ((server.always_show_logo = yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
} else if (!strcasecmp(argv[0],"syslog-enabled") && argc == 2) {
if ((server.syslog_enabled = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
@@ -318,10 +375,6 @@ void loadServerConfigFromString(char *config) {
if (server.io_threads_num < 1 || server.io_threads_num > 512) {
err = "Invalid number of I/O threads"; goto loaderr;
}
- } else if (!strcasecmp(argv[0],"io-threads-do-reads") && argc == 2) {
- if ((server.io_threads_do_reads = yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
} else if (!strcasecmp(argv[0],"include") && argc == 2) {
loadServerConfig(argv[1],NULL);
} else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
@@ -381,13 +434,10 @@ void loadServerConfigFromString(char *config) {
err = "repl-timeout must be 1 or greater";
goto loaderr;
}
- } else if (!strcasecmp(argv[0],"repl-disable-tcp-nodelay") && argc==2) {
- if ((server.repl_disable_tcp_nodelay = yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
- } else if (!strcasecmp(argv[0],"repl-diskless-sync") && argc==2) {
- if ((server.repl_diskless_sync = yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
+ } else if (!strcasecmp(argv[0],"repl-diskless-load") && argc==2) {
+ server.repl_diskless_load = configEnumGetValue(repl_diskless_load_enum,argv[1]);
+ if (server.repl_diskless_load == INT_MIN) {
+ err = "argument must be 'disabled', 'on-empty-db', 'swapdb' or 'flushdb'";
}
} else if (!strcasecmp(argv[0],"repl-diskless-sync-delay") && argc==2) {
server.repl_diskless_sync_delay = atoi(argv[1]);
@@ -414,57 +464,6 @@ void loadServerConfigFromString(char *config) {
} else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
zfree(server.masterauth);
server.masterauth = argv[1][0] ? zstrdup(argv[1]) : NULL;
- } else if ((!strcasecmp(argv[0],"slave-serve-stale-data") ||
- !strcasecmp(argv[0],"replica-serve-stale-data"))
- && argc == 2)
- {
- if ((server.repl_serve_stale_data = yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
- } else if ((!strcasecmp(argv[0],"slave-read-only") ||
- !strcasecmp(argv[0],"replica-read-only"))
- && argc == 2)
- {
- if ((server.repl_slave_ro = yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
- } else if ((!strcasecmp(argv[0],"slave-ignore-maxmemory") ||
- !strcasecmp(argv[0],"replica-ignore-maxmemory"))
- && argc == 2)
- {
- if ((server.repl_slave_ignore_maxmemory = yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
- } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) {
- if ((server.rdb_compression = yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
- } else if (!strcasecmp(argv[0],"rdbchecksum") && argc == 2) {
- if ((server.rdb_checksum = yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
- } else if (!strcasecmp(argv[0],"activerehashing") && argc == 2) {
- if ((server.activerehashing = yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
- } else if (!strcasecmp(argv[0],"lazyfree-lazy-eviction") && argc == 2) {
- if ((server.lazyfree_lazy_eviction = yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
- } else if (!strcasecmp(argv[0],"lazyfree-lazy-expire") && argc == 2) {
- if ((server.lazyfree_lazy_expire = yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
- } else if (!strcasecmp(argv[0],"lazyfree-lazy-server-del") && argc == 2){
- if ((server.lazyfree_lazy_server_del = yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
- } else if ((!strcasecmp(argv[0],"slave-lazy-flush") ||
- !strcasecmp(argv[0],"replica-lazy-flush")) && argc == 2)
- {
- if ((server.repl_slave_lazy_flush = yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
} else if (!strcasecmp(argv[0],"activedefrag") && argc == 2) {
if ((server.active_defrag_enabled = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
@@ -474,25 +473,15 @@ void loadServerConfigFromString(char *config) {
err = "active defrag can't be enabled without proper jemalloc support"; goto loaderr;
#endif
}
- } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
- if ((server.daemonize = yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
- } else if (!strcasecmp(argv[0],"dynamic-hz") && argc == 2) {
- if ((server.dynamic_hz = yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
} else if (!strcasecmp(argv[0],"hz") && argc == 2) {
server.config_hz = atoi(argv[1]);
if (server.config_hz < CONFIG_MIN_HZ) server.config_hz = CONFIG_MIN_HZ;
if (server.config_hz > CONFIG_MAX_HZ) server.config_hz = CONFIG_MAX_HZ;
} else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
- int yes;
-
- if ((yes = yesnotoi(argv[1])) == -1) {
+ if ((server.aof_enabled = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
- server.aof_state = yes ? AOF_ON : AOF_OFF;
+ server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF;
} else if (!strcasecmp(argv[0],"appendfilename") && argc == 2) {
if (!pathIsBaseName(argv[1])) {
err = "appendfilename can't be a path, just a filename";
@@ -500,11 +489,6 @@ void loadServerConfigFromString(char *config) {
}
zfree(server.aof_filename);
server.aof_filename = zstrdup(argv[1]);
- } else if (!strcasecmp(argv[0],"no-appendfsync-on-rewrite")
- && argc == 2) {
- if ((server.aof_no_fsync_on_rewrite= yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
} else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
server.aof_fsync = configEnumGetValue(aof_fsync_enum,argv[1]);
if (server.aof_fsync == INT_MIN) {
@@ -523,27 +507,11 @@ void loadServerConfigFromString(char *config) {
argc == 2)
{
server.aof_rewrite_min_size = memtoll(argv[1],NULL);
- } else if (!strcasecmp(argv[0],"aof-rewrite-incremental-fsync") &&
- argc == 2)
- {
- if ((server.aof_rewrite_incremental_fsync =
- yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
- } else if (!strcasecmp(argv[0],"rdb-save-incremental-fsync") &&
- argc == 2)
- {
- if ((server.rdb_save_incremental_fsync =
- yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
- } else if (!strcasecmp(argv[0],"aof-load-truncated") && argc == 2) {
- if ((server.aof_load_truncated = yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
- } else if (!strcasecmp(argv[0],"aof-use-rdb-preamble") && argc == 2) {
- if ((server.aof_use_rdb_preamble = yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
+ } else if (!strcasecmp(argv[0],"rdb-key-save-delay") && argc==2) {
+ server.rdb_key_save_delay = atoi(argv[1]);
+ if (server.rdb_key_save_delay < 0) {
+ err = "rdb-key-save-delay can't be negative";
+ goto loaderr;
}
} else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
if (strlen(argv[1]) > CONFIG_AUTHPASS_MAX_LEN) {
@@ -678,13 +646,6 @@ void loadServerConfigFromString(char *config) {
{
err = "Invalid port"; goto loaderr;
}
- } else if (!strcasecmp(argv[0],"cluster-require-full-coverage") &&
- argc == 2)
- {
- if ((server.cluster_require_full_coverage = yesnotoi(argv[1])) == -1)
- {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
} else if (!strcasecmp(argv[0],"cluster-node-timeout") && argc == 2) {
server.cluster_node_timeout = strtoll(argv[1],NULL,10);
if (server.cluster_node_timeout <= 0) {
@@ -707,15 +668,6 @@ void loadServerConfigFromString(char *config) {
err = "cluster replica validity factor must be zero or positive";
goto loaderr;
}
- } else if ((!strcasecmp(argv[0],"cluster-slave-no-failover") ||
- !strcasecmp(argv[0],"cluster-replica-no-failover")) &&
- argc == 2)
- {
- server.cluster_slave_no_failover = yesnotoi(argv[1]);
- if (server.cluster_slave_no_failover == -1) {
- err = "argument must be 'yes' or 'no'";
- goto loaderr;
- }
} else if (!strcasecmp(argv[0],"lua-time-limit") && argc == 2) {
server.lua_time_limit = strtoll(argv[1],NULL,10);
} else if (!strcasecmp(argv[0],"lua-replicate-commands") && argc == 2) {
@@ -756,11 +708,6 @@ void loadServerConfigFromString(char *config) {
server.client_obuf_limits[class].hard_limit_bytes = hard;
server.client_obuf_limits[class].soft_limit_bytes = soft;
server.client_obuf_limits[class].soft_limit_seconds = soft_seconds;
- } else if (!strcasecmp(argv[0],"stop-writes-on-bgsave-error") &&
- argc == 2) {
- if ((server.stop_writes_on_bgsave_err = yesnotoi(argv[1])) == -1) {
- err = "argument must be 'yes' or 'no'"; goto loaderr;
- }
} else if ((!strcasecmp(argv[0],"slave-priority") ||
!strcasecmp(argv[0],"replica-priority")) && argc == 2)
{
@@ -941,6 +888,19 @@ void configSetCommand(client *c) {
serverAssertWithInfo(c,c->argv[3],sdsEncodedObject(c->argv[3]));
o = c->argv[3];
+ /* Iterate the configs that are standard */
+ for (configYesNo *config = configs_yesno; config->name != NULL; config++) {
+ if(config->modifiable && (!strcasecmp(c->argv[2]->ptr,config->name) ||
+ (config->alias && !strcasecmp(c->argv[2]->ptr,config->alias))))
+ {
+ int yn = yesnotoi(o->ptr);
+ if (yn == -1) goto badfmt;
+ *(config->config) = yn;
+ addReply(c,shared.ok);
+ return;
+ }
+ }
+
if (0) { /* this starts the config_set macros else-if chain. */
/* Special fields that can't be handled with general macros. */
@@ -998,6 +958,7 @@ void configSetCommand(client *c) {
int enable = yesnotoi(o->ptr);
if (enable == -1) goto badfmt;
+ server.aof_enabled = enable;
if (enable == 0 && server.aof_state != AOF_OFF) {
stopAppendOnly();
} else if (enable && server.aof_state == AOF_OFF) {
@@ -1106,40 +1067,6 @@ void configSetCommand(client *c) {
/* Boolean fields.
* config_set_bool_field(name,var). */
} config_set_bool_field(
- "rdbcompression", server.rdb_compression) {
- } config_set_bool_field(
- "repl-disable-tcp-nodelay",server.repl_disable_tcp_nodelay) {
- } config_set_bool_field(
- "repl-diskless-sync",server.repl_diskless_sync) {
- } config_set_bool_field(
- "cluster-require-full-coverage",server.cluster_require_full_coverage) {
- } config_set_bool_field(
- "cluster-slave-no-failover",server.cluster_slave_no_failover) {
- } config_set_bool_field(
- "cluster-replica-no-failover",server.cluster_slave_no_failover) {
- } config_set_bool_field(
- "aof-rewrite-incremental-fsync",server.aof_rewrite_incremental_fsync) {
- } config_set_bool_field(
- "rdb-save-incremental-fsync",server.rdb_save_incremental_fsync) {
- } config_set_bool_field(
- "aof-load-truncated",server.aof_load_truncated) {
- } config_set_bool_field(
- "aof-use-rdb-preamble",server.aof_use_rdb_preamble) {
- } config_set_bool_field(
- "slave-serve-stale-data",server.repl_serve_stale_data) {
- } config_set_bool_field(
- "replica-serve-stale-data",server.repl_serve_stale_data) {
- } config_set_bool_field(
- "slave-read-only",server.repl_slave_ro) {
- } config_set_bool_field(
- "replica-read-only",server.repl_slave_ro) {
- } config_set_bool_field(
- "slave-ignore-maxmemory",server.repl_slave_ignore_maxmemory) {
- } config_set_bool_field(
- "replica-ignore-maxmemory",server.repl_slave_ignore_maxmemory) {
- } config_set_bool_field(
- "activerehashing",server.activerehashing) {
- } config_set_bool_field(
"activedefrag",server.active_defrag_enabled) {
#ifndef HAVE_DEFRAG
if (server.active_defrag_enabled) {
@@ -1152,27 +1079,6 @@ void configSetCommand(client *c) {
return;
}
#endif
- } config_set_bool_field(
- "protected-mode",server.protected_mode) {
- } config_set_bool_field(
- "gopher-enabled",server.gopher_enabled) {
- } config_set_bool_field(
- "stop-writes-on-bgsave-error",server.stop_writes_on_bgsave_err) {
- } config_set_bool_field(
- "lazyfree-lazy-eviction",server.lazyfree_lazy_eviction) {
- } config_set_bool_field(
- "lazyfree-lazy-expire",server.lazyfree_lazy_expire) {
- } config_set_bool_field(
- "lazyfree-lazy-server-del",server.lazyfree_lazy_server_del) {
- } config_set_bool_field(
- "slave-lazy-flush",server.repl_slave_lazy_flush) {
- } config_set_bool_field(
- "replica-lazy-flush",server.repl_slave_lazy_flush) {
- } config_set_bool_field(
- "no-appendfsync-on-rewrite",server.aof_no_fsync_on_rewrite) {
- } config_set_bool_field(
- "dynamic-hz",server.dynamic_hz) {
-
/* Numerical fields.
* config_set_numerical_field(name,var,min,max) */
} config_set_numerical_field(
@@ -1244,6 +1150,8 @@ void configSetCommand(client *c) {
} config_set_numerical_field(
"replica-priority",server.slave_priority,0,INT_MAX) {
} config_set_numerical_field(
+ "rdb-key-save-delay",server.rdb_key_save_delay,0,LLONG_MAX) {
+ } config_set_numerical_field(
"slave-announce-port",server.slave_announce_port,0,65535) {
} config_set_numerical_field(
"replica-announce-port",server.slave_announce_port,0,65535) {
@@ -1310,6 +1218,8 @@ void configSetCommand(client *c) {
"maxmemory-policy",server.maxmemory_policy,maxmemory_policy_enum) {
} config_set_enum_field(
"appendfsync",server.aof_fsync,aof_fsync_enum) {
+ } config_set_enum_field(
+ "repl-diskless-load",server.repl_diskless_load,repl_diskless_load_enum) {
/* Everyhing else is an error... */
} config_set_else {
@@ -1457,63 +1367,19 @@ void configGetCommand(client *c) {
config_get_numerical_field("cluster-slave-validity-factor",server.cluster_slave_validity_factor);
config_get_numerical_field("cluster-replica-validity-factor",server.cluster_slave_validity_factor);
config_get_numerical_field("repl-diskless-sync-delay",server.repl_diskless_sync_delay);
+ config_get_numerical_field("rdb-key-save-delay",server.rdb_key_save_delay);
config_get_numerical_field("tcp-keepalive",server.tcpkeepalive);
/* Bool (yes/no) values */
- config_get_bool_field("cluster-require-full-coverage",
- server.cluster_require_full_coverage);
- config_get_bool_field("cluster-slave-no-failover",
- server.cluster_slave_no_failover);
- config_get_bool_field("cluster-replica-no-failover",
- server.cluster_slave_no_failover);
- config_get_bool_field("no-appendfsync-on-rewrite",
- server.aof_no_fsync_on_rewrite);
- config_get_bool_field("slave-serve-stale-data",
- server.repl_serve_stale_data);
- config_get_bool_field("replica-serve-stale-data",
- server.repl_serve_stale_data);
- config_get_bool_field("slave-read-only",
- server.repl_slave_ro);
- config_get_bool_field("replica-read-only",
- server.repl_slave_ro);
- config_get_bool_field("slave-ignore-maxmemory",
- server.repl_slave_ignore_maxmemory);
- config_get_bool_field("replica-ignore-maxmemory",
- server.repl_slave_ignore_maxmemory);
- config_get_bool_field("stop-writes-on-bgsave-error",
- server.stop_writes_on_bgsave_err);
- config_get_bool_field("daemonize", server.daemonize);
- config_get_bool_field("rdbcompression", server.rdb_compression);
- config_get_bool_field("rdbchecksum", server.rdb_checksum);
- config_get_bool_field("activerehashing", server.activerehashing);
+ /* Iterate the configs that are standard */
+ for (configYesNo *config = configs_yesno; config->name != NULL; config++) {
+ config_get_bool_field(config->name, *(config->config));
+ if (config->alias) {
+ config_get_bool_field(config->alias, *(config->config));
+ }
+ }
+
config_get_bool_field("activedefrag", server.active_defrag_enabled);
- config_get_bool_field("protected-mode", server.protected_mode);
- config_get_bool_field("gopher-enabled", server.gopher_enabled);
- config_get_bool_field("io-threads-do-reads", server.io_threads_do_reads);
- config_get_bool_field("repl-disable-tcp-nodelay",
- server.repl_disable_tcp_nodelay);
- config_get_bool_field("repl-diskless-sync",
- server.repl_diskless_sync);
- config_get_bool_field("aof-rewrite-incremental-fsync",
- server.aof_rewrite_incremental_fsync);
- config_get_bool_field("rdb-save-incremental-fsync",
- server.rdb_save_incremental_fsync);
- config_get_bool_field("aof-load-truncated",
- server.aof_load_truncated);
- config_get_bool_field("aof-use-rdb-preamble",
- server.aof_use_rdb_preamble);
- config_get_bool_field("lazyfree-lazy-eviction",
- server.lazyfree_lazy_eviction);
- config_get_bool_field("lazyfree-lazy-expire",
- server.lazyfree_lazy_expire);
- config_get_bool_field("lazyfree-lazy-server-del",
- server.lazyfree_lazy_server_del);
- config_get_bool_field("slave-lazy-flush",
- server.repl_slave_lazy_flush);
- config_get_bool_field("replica-lazy-flush",
- server.repl_slave_lazy_flush);
- config_get_bool_field("dynamic-hz",
- server.dynamic_hz);
/* Enum values */
config_get_enum_field("maxmemory-policy",
@@ -1526,12 +1392,14 @@ void configGetCommand(client *c) {
server.aof_fsync,aof_fsync_enum);
config_get_enum_field("syslog-facility",
server.syslog_facility,syslog_facility_enum);
+ config_get_enum_field("repl-diskless-load",
+ server.repl_diskless_load,repl_diskless_load_enum);
/* Everything we can't handle with macros follows. */
if (stringmatch(pattern,"appendonly",1)) {
addReplyBulkCString(c,"appendonly");
- addReplyBulkCString(c,server.aof_state == AOF_OFF ? "no" : "yes");
+ addReplyBulkCString(c,server.aof_enabled ? "yes" : "no");
matches++;
}
if (stringmatch(pattern,"dir",1)) {
@@ -1858,7 +1726,7 @@ void rewriteConfigBytesOption(struct rewriteConfigState *state, char *option, lo
}
/* Rewrite a yes/no option. */
-void rewriteConfigYesNoOption(struct rewriteConfigState *state, char *option, int value, int defvalue) {
+void rewriteConfigYesNoOption(struct rewriteConfigState *state, const char *option, int value, int defvalue) {
int force = value != defvalue;
sds line = sdscatprintf(sdsempty(),"%s %s",option,
value ? "yes" : "no");
@@ -2228,7 +2096,11 @@ int rewriteConfig(char *path) {
/* Step 2: rewrite every single option, replacing or appending it inside
* the rewrite state. */
- rewriteConfigYesNoOption(state,"daemonize",server.daemonize,0);
+ /* Iterate the configs that are standard */
+ for (configYesNo *config = configs_yesno; config->name != NULL; config++) {
+ rewriteConfigYesNoOption(state,config->name,*(config->config),config->default_value);
+ }
+
rewriteConfigStringOption(state,"pidfile",server.pidfile,CONFIG_DEFAULT_PID_FILE);
rewriteConfigNumericalOption(state,"port",server.port,CONFIG_DEFAULT_SERVER_PORT);
rewriteConfigNumericalOption(state,"cluster-announce-port",server.cluster_announce_port,CONFIG_DEFAULT_CLUSTER_ANNOUNCE_PORT);
@@ -2250,9 +2122,6 @@ int rewriteConfig(char *path) {
rewriteConfigUserOption(state);
rewriteConfigNumericalOption(state,"databases",server.dbnum,CONFIG_DEFAULT_DBNUM);
rewriteConfigNumericalOption(state,"io-threads",server.dbnum,CONFIG_DEFAULT_IO_THREADS_NUM);
- rewriteConfigYesNoOption(state,"stop-writes-on-bgsave-error",server.stop_writes_on_bgsave_err,CONFIG_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR);
- rewriteConfigYesNoOption(state,"rdbcompression",server.rdb_compression,CONFIG_DEFAULT_RDB_COMPRESSION);
- rewriteConfigYesNoOption(state,"rdbchecksum",server.rdb_checksum,CONFIG_DEFAULT_RDB_CHECKSUM);
rewriteConfigStringOption(state,"dbfilename",server.rdb_filename,CONFIG_DEFAULT_RDB_FILENAME);
rewriteConfigDirOption(state);
rewriteConfigSlaveofOption(state,"replicaof");
@@ -2260,15 +2129,11 @@ int rewriteConfig(char *path) {
rewriteConfigStringOption(state,"masteruser",server.masteruser,NULL);
rewriteConfigStringOption(state,"masterauth",server.masterauth,NULL);
rewriteConfigStringOption(state,"cluster-announce-ip",server.cluster_announce_ip,NULL);
- rewriteConfigYesNoOption(state,"replica-serve-stale-data",server.repl_serve_stale_data,CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA);
- rewriteConfigYesNoOption(state,"replica-read-only",server.repl_slave_ro,CONFIG_DEFAULT_SLAVE_READ_ONLY);
- rewriteConfigYesNoOption(state,"replica-ignore-maxmemory",server.repl_slave_ignore_maxmemory,CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY);
rewriteConfigNumericalOption(state,"repl-ping-replica-period",server.repl_ping_slave_period,CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD);
rewriteConfigNumericalOption(state,"repl-timeout",server.repl_timeout,CONFIG_DEFAULT_REPL_TIMEOUT);
rewriteConfigBytesOption(state,"repl-backlog-size",server.repl_backlog_size,CONFIG_DEFAULT_REPL_BACKLOG_SIZE);
rewriteConfigBytesOption(state,"repl-backlog-ttl",server.repl_backlog_time_limit,CONFIG_DEFAULT_REPL_BACKLOG_TIME_LIMIT);
- rewriteConfigYesNoOption(state,"repl-disable-tcp-nodelay",server.repl_disable_tcp_nodelay,CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY);
- rewriteConfigYesNoOption(state,"repl-diskless-sync",server.repl_diskless_sync,CONFIG_DEFAULT_REPL_DISKLESS_SYNC);
+ rewriteConfigEnumOption(state,"repl-diskless-load",server.repl_diskless_load,repl_diskless_load_enum,CONFIG_DEFAULT_REPL_DISKLESS_LOAD);
rewriteConfigNumericalOption(state,"repl-diskless-sync-delay",server.repl_diskless_sync_delay,CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY);
rewriteConfigNumericalOption(state,"replica-priority",server.slave_priority,CONFIG_DEFAULT_SLAVE_PRIORITY);
rewriteConfigNumericalOption(state,"min-replicas-to-write",server.repl_min_slaves_to_write,CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE);
@@ -2288,17 +2153,14 @@ int rewriteConfig(char *path) {
rewriteConfigNumericalOption(state,"active-defrag-cycle-min",server.active_defrag_cycle_min,CONFIG_DEFAULT_DEFRAG_CYCLE_MIN);
rewriteConfigNumericalOption(state,"active-defrag-cycle-max",server.active_defrag_cycle_max,CONFIG_DEFAULT_DEFRAG_CYCLE_MAX);
rewriteConfigNumericalOption(state,"active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS);
- rewriteConfigYesNoOption(state,"appendonly",server.aof_state != AOF_OFF,0);
+ rewriteConfigYesNoOption(state,"appendonly",server.aof_enabled,0);
rewriteConfigStringOption(state,"appendfilename",server.aof_filename,CONFIG_DEFAULT_AOF_FILENAME);
rewriteConfigEnumOption(state,"appendfsync",server.aof_fsync,aof_fsync_enum,CONFIG_DEFAULT_AOF_FSYNC);
- rewriteConfigYesNoOption(state,"no-appendfsync-on-rewrite",server.aof_no_fsync_on_rewrite,CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE);
rewriteConfigNumericalOption(state,"auto-aof-rewrite-percentage",server.aof_rewrite_perc,AOF_REWRITE_PERC);
rewriteConfigBytesOption(state,"auto-aof-rewrite-min-size",server.aof_rewrite_min_size,AOF_REWRITE_MIN_SIZE);
rewriteConfigNumericalOption(state,"lua-time-limit",server.lua_time_limit,LUA_SCRIPT_TIME_LIMIT);
rewriteConfigYesNoOption(state,"cluster-enabled",server.cluster_enabled,0);
rewriteConfigStringOption(state,"cluster-config-file",server.cluster_configfile,CONFIG_DEFAULT_CLUSTER_CONFIG_FILE);
- rewriteConfigYesNoOption(state,"cluster-require-full-coverage",server.cluster_require_full_coverage,CLUSTER_DEFAULT_REQUIRE_FULL_COVERAGE);
- rewriteConfigYesNoOption(state,"cluster-replica-no-failover",server.cluster_slave_no_failover,CLUSTER_DEFAULT_SLAVE_NO_FAILOVER);
rewriteConfigNumericalOption(state,"cluster-node-timeout",server.cluster_node_timeout,CLUSTER_DEFAULT_NODE_TIMEOUT);
rewriteConfigNumericalOption(state,"cluster-migration-barrier",server.cluster_migration_barrier,CLUSTER_DEFAULT_MIGRATION_BARRIER);
rewriteConfigNumericalOption(state,"cluster-replica-validity-factor",server.cluster_slave_validity_factor,CLUSTER_DEFAULT_SLAVE_VALIDITY);
@@ -2316,23 +2178,11 @@ int rewriteConfig(char *path) {
rewriteConfigNumericalOption(state,"zset-max-ziplist-entries",server.zset_max_ziplist_entries,OBJ_ZSET_MAX_ZIPLIST_ENTRIES);
rewriteConfigNumericalOption(state,"zset-max-ziplist-value",server.zset_max_ziplist_value,OBJ_ZSET_MAX_ZIPLIST_VALUE);
rewriteConfigNumericalOption(state,"hll-sparse-max-bytes",server.hll_sparse_max_bytes,CONFIG_DEFAULT_HLL_SPARSE_MAX_BYTES);
- rewriteConfigYesNoOption(state,"activerehashing",server.activerehashing,CONFIG_DEFAULT_ACTIVE_REHASHING);
rewriteConfigYesNoOption(state,"activedefrag",server.active_defrag_enabled,CONFIG_DEFAULT_ACTIVE_DEFRAG);
- rewriteConfigYesNoOption(state,"protected-mode",server.protected_mode,CONFIG_DEFAULT_PROTECTED_MODE);
- rewriteConfigYesNoOption(state,"gopher-enabled",server.gopher_enabled,CONFIG_DEFAULT_GOPHER_ENABLED);
- rewriteConfigYesNoOption(state,"io-threads-do-reads",server.io_threads_do_reads,CONFIG_DEFAULT_IO_THREADS_DO_READS);
rewriteConfigClientoutputbufferlimitOption(state);
rewriteConfigNumericalOption(state,"hz",server.config_hz,CONFIG_DEFAULT_HZ);
- rewriteConfigYesNoOption(state,"aof-rewrite-incremental-fsync",server.aof_rewrite_incremental_fsync,CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC);
- rewriteConfigYesNoOption(state,"rdb-save-incremental-fsync",server.rdb_save_incremental_fsync,CONFIG_DEFAULT_RDB_SAVE_INCREMENTAL_FSYNC);
- rewriteConfigYesNoOption(state,"aof-load-truncated",server.aof_load_truncated,CONFIG_DEFAULT_AOF_LOAD_TRUNCATED);
- rewriteConfigYesNoOption(state,"aof-use-rdb-preamble",server.aof_use_rdb_preamble,CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE);
rewriteConfigEnumOption(state,"supervised",server.supervised_mode,supervised_mode_enum,SUPERVISED_NONE);
- rewriteConfigYesNoOption(state,"lazyfree-lazy-eviction",server.lazyfree_lazy_eviction,CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION);
- rewriteConfigYesNoOption(state,"lazyfree-lazy-expire",server.lazyfree_lazy_expire,CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE);
- rewriteConfigYesNoOption(state,"lazyfree-lazy-server-del",server.lazyfree_lazy_server_del,CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL);
- rewriteConfigYesNoOption(state,"replica-lazy-flush",server.repl_slave_lazy_flush,CONFIG_DEFAULT_SLAVE_LAZY_FLUSH);
- rewriteConfigYesNoOption(state,"dynamic-hz",server.dynamic_hz,CONFIG_DEFAULT_DYNAMIC_HZ);
+ rewriteConfigNumericalOption(state,"rdb-key-save-delay",server.rdb_key_save_delay,CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY);
/* Rewrite Sentinel config if in Sentinel mode. */
if (server.sentinel_mode) rewriteConfigSentinelOption(state);
diff --git a/src/db.c b/src/db.c
index b537a29a4..51f5a12b4 100644
--- a/src/db.c
+++ b/src/db.c
@@ -344,7 +344,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
* On success the fuction returns the number of keys removed from the
* database(s). Otherwise -1 is returned in the specific case the
* DB number is out of range, and errno is set to EINVAL. */
-long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
+long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)) {
int async = (flags & EMPTYDB_ASYNC);
long long removed = 0;
@@ -362,12 +362,12 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
}
for (int j = startdb; j <= enddb; j++) {
- removed += dictSize(server.db[j].dict);
+ removed += dictSize(dbarray[j].dict);
if (async) {
- emptyDbAsync(&server.db[j]);
+ emptyDbAsync(&dbarray[j]);
} else {
- dictEmpty(server.db[j].dict,callback);
- dictEmpty(server.db[j].expires,callback);
+ dictEmpty(dbarray[j].dict,callback);
+ dictEmpty(dbarray[j].expires,callback);
}
}
if (server.cluster_enabled) {
@@ -381,6 +381,10 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
return removed;
}
+long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
+ return emptyDbGeneric(server.db, dbnum, flags, callback);
+}
+
int selectDb(client *c, int id) {
if (id < 0 || id >= server.dbnum)
return C_ERR;
@@ -388,6 +392,15 @@ int selectDb(client *c, int id) {
return C_OK;
}
+long long dbTotalServerKeyCount() {
+ long long total = 0;
+ int j;
+ for (j = 0; j < server.dbnum; j++) {
+ total += dictSize(server.db[j].dict);
+ }
+ return total;
+}
+
/*-----------------------------------------------------------------------------
* Hooks for key space changes.
*
@@ -399,6 +412,7 @@ int selectDb(client *c, int id) {
void signalModifiedKey(redisDb *db, robj *key) {
touchWatchedKey(db,key);
+ if (server.tracking_clients) trackingInvalidateKey(key);
}
void signalFlushedDb(int dbid) {
@@ -613,7 +627,7 @@ int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor) {
}
/* This command implements SCAN, HSCAN and SSCAN commands.
- * If object 'o' is passed, then it must be a Hash or Set object, otherwise
+ * If object 'o' is passed, then it must be a Hash, Set or Zset object, otherwise
* if 'o' is NULL the command will operate on the dictionary associated with
* the current database.
*
@@ -629,6 +643,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
listNode *node, *nextnode;
long count = 10;
sds pat = NULL;
+ sds typename = NULL;
int patlen = 0, use_pattern = 0;
dict *ht;
@@ -665,6 +680,10 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
use_pattern = !(pat[0] == '*' && patlen == 1);
i += 2;
+ } else if (!strcasecmp(c->argv[i]->ptr, "type") && o == NULL && j >= 2) {
+ /* SCAN for a particular type only applies to the db dict */
+ typename = c->argv[i+1]->ptr;
+ i+= 2;
} else {
addReply(c,shared.syntaxerr);
goto cleanup;
@@ -759,6 +778,13 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
}
}
+ /* Filter an element if it isn't the type we want. */
+ if (!filter && o == NULL && typename){
+ robj* typecheck = lookupKeyReadWithFlags(c->db, kobj, LOOKUP_NOTOUCH);
+ char* type = getObjectTypeName(typecheck);
+ if (strcasecmp((char*) typename, type)) filter = 1;
+ }
+
/* Filter element if it is an expired key. */
if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1;
@@ -815,11 +841,8 @@ void lastsaveCommand(client *c) {
addReplyLongLong(c,server.lastsave);
}
-void typeCommand(client *c) {
- robj *o;
- char *type;
-
- o = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH);
+char* getObjectTypeName(robj *o) {
+ char* type;
if (o == NULL) {
type = "none";
} else {
@@ -837,7 +860,13 @@ void typeCommand(client *c) {
default: type = "unknown"; break;
}
}
- addReplyStatus(c,type);
+ return type;
+}
+
+void typeCommand(client *c) {
+ robj *o;
+ o = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH);
+ addReplyStatus(c, getObjectTypeName(o));
}
void shutdownCommand(client *c) {
@@ -999,7 +1028,7 @@ void scanDatabaseForReadyLists(redisDb *db) {
*
* Returns C_ERR if at least one of the DB ids are out of range, otherwise
* C_OK is returned. */
-int dbSwapDatabases(int id1, int id2) {
+int dbSwapDatabases(long id1, long id2) {
if (id1 < 0 || id1 >= server.dbnum ||
id2 < 0 || id2 >= server.dbnum) return C_ERR;
if (id1 == id2) return C_OK;
diff --git a/src/debug.c b/src/debug.c
index 0c6b5630c..1f1157d4a 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -702,7 +702,7 @@ void _serverAssertPrintClientInfo(const client *c) {
bugReportStart();
serverLog(LL_WARNING,"=== ASSERTION FAILED CLIENT CONTEXT ===");
- serverLog(LL_WARNING,"client->flags = %d", c->flags);
+ serverLog(LL_WARNING,"client->flags = %llu", (unsigned long long)c->flags);
serverLog(LL_WARNING,"client->fd = %d", c->fd);
serverLog(LL_WARNING,"client->argc = %d", c->argc);
for (j=0; j < c->argc; j++) {
diff --git a/src/expire.c b/src/expire.c
index 0b92ee3fe..b23117a3c 100644
--- a/src/expire.c
+++ b/src/expire.c
@@ -64,6 +64,7 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
dbSyncDelete(db,keyobj);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",keyobj,db->id);
+ if (server.tracking_clients) trackingInvalidateKey(keyobj);
decrRefCount(keyobj);
server.stat_expiredkeys++;
return 1;
diff --git a/src/module.c b/src/module.c
index 7dee7e776..f4f753c00 100644
--- a/src/module.c
+++ b/src/module.c
@@ -1242,6 +1242,17 @@ int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) {
return REDISMODULE_OK;
}
+/* Reply with a bulk string, taking in input a C buffer pointer that is
+ * assumed to be null-terminated.
+ *
+ * The function always returns REDISMODULE_OK. */
+int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) {
+ client *c = moduleGetReplyClient(ctx);
+ if (c == NULL) return REDISMODULE_OK;
+ addReplyBulkCString(c,(char*)buf);
+ return REDISMODULE_OK;
+}
+
/* Reply with a bulk string, taking in input a RedisModuleString object.
*
* The function always returns REDISMODULE_OK. */
@@ -1455,6 +1466,9 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) {
if (server.cluster_enabled)
flags |= REDISMODULE_CTX_FLAGS_CLUSTER;
+ if (server.loading)
+ flags |= REDISMODULE_CTX_FLAGS_LOADING;
+
/* Maxmemory and eviction policy */
if (server.maxmemory > 0) {
flags |= REDISMODULE_CTX_FLAGS_MAXMEMORY;
diff --git a/src/networking.c b/src/networking.c
index 4bc22120a..7976caf29 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -158,6 +158,7 @@ client *createClient(int fd) {
c->pubsub_patterns = listCreate();
c->peerid = NULL;
c->client_list_node = NULL;
+ c->client_tracking_redirection = 0;
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
if (fd != -1) linkClient(c);
@@ -506,7 +507,7 @@ void addReplyDouble(client *c, double d) {
if (c->resp == 2) {
addReplyBulkCString(c, d > 0 ? "inf" : "-inf");
} else {
- addReplyProto(c, d > 0 ? ",inf\r\n" : "-inf\r\n",
+ addReplyProto(c, d > 0 ? ",inf\r\n" : ",-inf\r\n",
d > 0 ? 6 : 7);
}
} else {
@@ -966,6 +967,9 @@ void unlinkClient(client *c) {
listDelNode(server.unblocked_clients,ln);
c->flags &= ~CLIENT_UNBLOCKED;
}
+
+ /* Clear the tracking status. */
+ if (c->flags & CLIENT_TRACKING) disableTracking(c);
}
void freeClient(client *c) {
@@ -1849,6 +1853,8 @@ sds catClientInfoString(sds s, client *client) {
if (client->flags & CLIENT_PUBSUB) *p++ = 'P';
if (client->flags & CLIENT_MULTI) *p++ = 'x';
if (client->flags & CLIENT_BLOCKED) *p++ = 'b';
+ if (client->flags & CLIENT_TRACKING) *p++ = 't';
+ if (client->flags & CLIENT_TRACKING_BROKEN_REDIR) *p++ = 'R';
if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd';
if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c';
if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u';
@@ -1948,19 +1954,21 @@ void clientCommand(client *c) {
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
const char *help[] = {
-"id -- Return the ID of the current connection.",
-"getname -- Return the name of the current connection.",
-"kill <ip:port> -- Kill connection made from <ip:port>.",
-"kill <option> <value> [option value ...] -- Kill connections. Options are:",
-" addr <ip:port> -- Kill connection made from <ip:port>",
-" type (normal|master|replica|pubsub) -- Kill connections by type.",
-" skipme (yes|no) -- Skip killing current connection (default: yes).",
-"list [options ...] -- Return information about client connections. Options:",
-" type (normal|master|replica|pubsub) -- Return clients of specified type.",
-"pause <timeout> -- Suspend all Redis clients for <timout> milliseconds.",
-"reply (on|off|skip) -- Control the replies sent to the current connection.",
-"setname <name> -- Assign the name <name> to the current connection.",
-"unblock <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client.",
+"ID -- Return the ID of the current connection.",
+"GETNAME -- Return the name of the current connection.",
+"KILL <ip:port> -- Kill connection made from <ip:port>.",
+"KILL <option> <value> [option value ...] -- Kill connections. Options are:",
+" ADDR <ip:port> -- Kill connection made from <ip:port>",
+" TYPE (normal|master|replica|pubsub) -- Kill connections by type.",
+" SKIPME (yes|no) -- Skip killing current connection (default: yes).",
+"LIST [options ...] -- Return information about client connections. Options:",
+" TYPE (normal|master|replica|pubsub) -- Return clients of specified type.",
+"PAUSE <timeout> -- Suspend all Redis clients for <timout> milliseconds.",
+"REPLY (on|off|skip) -- Control the replies sent to the current connection.",
+"SETNAME <name> -- Assign the name <name> to the current connection.",
+"UNBLOCK <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client.",
+"TRACKING (on|off) [REDIRECT <id>] -- Enable client keys tracking for client side caching.",
+"GETREDIR -- Return the client ID we are redirecting to when tracking is enabled.",
NULL
};
addReplyHelp(c, help);
@@ -2117,20 +2125,63 @@ NULL
addReply(c,shared.czero);
}
} else if (!strcasecmp(c->argv[1]->ptr,"setname") && c->argc == 3) {
+ /* CLIENT SETNAME */
if (clientSetNameOrReply(c,c->argv[2]) == C_OK)
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"getname") && c->argc == 2) {
+ /* CLIENT GETNAME */
if (c->name)
addReplyBulk(c,c->name);
else
addReplyNull(c);
} else if (!strcasecmp(c->argv[1]->ptr,"pause") && c->argc == 3) {
+ /* CLIENT PAUSE */
long long duration;
- if (getTimeoutFromObjectOrReply(c,c->argv[2],&duration,UNIT_MILLISECONDS)
- != C_OK) return;
+ if (getTimeoutFromObjectOrReply(c,c->argv[2],&duration,
+ UNIT_MILLISECONDS) != C_OK) return;
pauseClients(duration);
addReply(c,shared.ok);
+ } else if (!strcasecmp(c->argv[1]->ptr,"tracking") &&
+ (c->argc == 3 || c->argc == 5))
+ {
+ /* CLIENT TRACKING (on|off) [REDIRECT <id>] */
+ long long redir = 0;
+
+ /* Parse the redirection option: we'll require the client with
+ * the specified ID to exist right now, even if it is possible
+ * it will get disconnected later. */
+ if (c->argc == 5) {
+ if (strcasecmp(c->argv[3]->ptr,"redirect") != 0) {
+ addReply(c,shared.syntaxerr);
+ return;
+ } else {
+ if (getLongLongFromObjectOrReply(c,c->argv[4],&redir,NULL) !=
+ C_OK) return;
+ if (lookupClientByID(redir) == NULL) {
+ addReplyError(c,"The client ID you want redirect to "
+ "does not exist");
+ return;
+ }
+ }
+ }
+
+ if (!strcasecmp(c->argv[2]->ptr,"on")) {
+ enableTracking(c,redir);
+ } else if (!strcasecmp(c->argv[2]->ptr,"off")) {
+ disableTracking(c);
+ } else {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+ addReply(c,shared.ok);
+ } else if (!strcasecmp(c->argv[1]->ptr,"getredir") && c->argc == 2) {
+ /* CLIENT GETREDIR */
+ if (c->flags & CLIENT_TRACKING) {
+ addReplyLongLong(c,c->client_tracking_redirection);
+ } else {
+ addReplyLongLong(c,-1);
+ }
} else {
addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try CLIENT HELP", (char*)c->argv[1]->ptr);
}
diff --git a/src/object.c b/src/object.c
index 234e11f8a..10209a6c8 100644
--- a/src/object.c
+++ b/src/object.c
@@ -834,7 +834,9 @@ size_t objectComputeSize(robj *o, size_t sample_size) {
d = ((zset*)o->ptr)->dict;
zskiplist *zsl = ((zset*)o->ptr)->zsl;
zskiplistNode *znode = zsl->header->level[0].forward;
- asize = sizeof(*o)+sizeof(zset)+(sizeof(struct dictEntry*)*dictSlots(d));
+ asize = sizeof(*o)+sizeof(zset)+sizeof(zskiplist)+sizeof(dict)+
+ (sizeof(struct dictEntry*)*dictSlots(d))+
+ zmalloc_size(zsl->header);
while(znode != NULL && samples < sample_size) {
elesize += sdsAllocSize(znode->ele);
elesize += sizeof(struct dictEntry) + zmalloc_size(znode);
diff --git a/src/rdb.c b/src/rdb.c
index 95e4766ea..c566378fb 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -44,6 +44,7 @@
#define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__)
+char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */
extern int rdbCheckMode;
void rdbCheckError(const char *fmt, ...);
void rdbCheckSetError(const char *fmt, ...);
@@ -61,11 +62,17 @@ void rdbCheckThenExit(int linenum, char *reason, ...) {
if (!rdbCheckMode) {
serverLog(LL_WARNING, "%s", msg);
- char *argv[2] = {"",server.rdb_filename};
- redis_check_rdb_main(2,argv,NULL);
+ if (rdbFileBeingLoaded) {
+ char *argv[2] = {"",rdbFileBeingLoaded};
+ redis_check_rdb_main(2,argv,NULL);
+ } else {
+ serverLog(LL_WARNING, "Failure loading rdb format from socket, assuming connection error, resuming operation.");
+ return;
+ }
} else {
rdbCheckError("%s",msg);
}
+ serverLog(LL_WARNING, "Terminating server after rdb file reading failure.");
exit(1);
}
@@ -1039,6 +1046,11 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
if (rdbSaveObjectType(rdb,val) == -1) return -1;
if (rdbSaveStringObject(rdb,key) == -1) return -1;
if (rdbSaveObject(rdb,val,key) == -1) return -1;
+
+ /* Delay return if required (for testing) */
+ if (server.rdb_key_save_delay)
+ usleep(server.rdb_key_save_delay);
+
return 1;
}
@@ -1800,18 +1812,23 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
/* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats. */
-void startLoading(FILE *fp) {
- struct stat sb;
-
+void startLoading(size_t size) {
/* Load the DB */
server.loading = 1;
server.loading_start_time = time(NULL);
server.loading_loaded_bytes = 0;
- if (fstat(fileno(fp), &sb) == -1) {
- server.loading_total_bytes = 0;
- } else {
- server.loading_total_bytes = sb.st_size;
- }
+ server.loading_total_bytes = size;
+}
+
+/* Mark that we are loading in the global state and setup the fields
+ * needed to provide loading stats.
+ * 'filename' is optional and used for rdb-check on error */
+void startLoadingFile(FILE *fp, char* filename) {
+ struct stat sb;
+ if (fstat(fileno(fp), &sb) == -1)
+ sb.st_size = 0;
+ rdbFileBeingLoaded = filename;
+ startLoading(sb.st_size);
}
/* Refresh the loading progress info */
@@ -1824,6 +1841,7 @@ void loadingProgress(off_t pos) {
/* Loading finished */
void stopLoading(void) {
server.loading = 0;
+ rdbFileBeingLoaded = NULL;
}
/* Track loading progress in order to serve client's from time to time
@@ -2089,7 +2107,7 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi) {
int retval;
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
- startLoading(fp);
+ startLoadingFile(fp, filename);
rioInitWithFile(&rdb,fp);
retval = rdbLoadRio(&rdb,rsi,0);
fclose(fp);
diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c
index ec00ee71c..e2d71b5a5 100644
--- a/src/redis-check-rdb.c
+++ b/src/redis-check-rdb.c
@@ -202,7 +202,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
}
expiretime = -1;
- startLoading(fp);
+ startLoadingFile(fp, rdbfilename);
while(1) {
robj *key, *val;
@@ -314,6 +314,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
}
if (closefile) fclose(fp);
+ stopLoading();
return 0;
eoferr: /* unexpected end of file is handled here with a fatal exit */
@@ -324,6 +325,7 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
}
err:
if (closefile) fclose(fp);
+ stopLoading();
return 1;
}
diff --git a/src/redismodule.h b/src/redismodule.h
index 259a5f1db..b9c73957b 100644
--- a/src/redismodule.h
+++ b/src/redismodule.h
@@ -87,6 +87,8 @@
#define REDISMODULE_CTX_FLAGS_OOM_WARNING (1<<11)
/* The command was sent over the replication link. */
#define REDISMODULE_CTX_FLAGS_REPLICATED (1<<12)
+/* Redis is currently loading either from AOF or RDB. */
+#define REDISMODULE_CTX_FLAGS_LOADING (1<<13)
#define REDISMODULE_NOTIFY_GENERIC (1<<2) /* g */
@@ -226,6 +228,7 @@ int REDISMODULE_API_FUNC(RedisModule_ReplyWithSimpleString)(RedisModuleCtx *ctx,
int REDISMODULE_API_FUNC(RedisModule_ReplyWithArray)(RedisModuleCtx *ctx, long len);
void REDISMODULE_API_FUNC(RedisModule_ReplySetArrayLength)(RedisModuleCtx *ctx, long len);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithStringBuffer)(RedisModuleCtx *ctx, const char *buf, size_t len);
+int REDISMODULE_API_FUNC(RedisModule_ReplyWithCString)(RedisModuleCtx *ctx, const char *buf);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithString)(RedisModuleCtx *ctx, RedisModuleString *str);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithNull)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithDouble)(RedisModuleCtx *ctx, double d);
@@ -376,6 +379,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(ReplyWithArray);
REDISMODULE_GET_API(ReplySetArrayLength);
REDISMODULE_GET_API(ReplyWithStringBuffer);
+ REDISMODULE_GET_API(ReplyWithCString);
REDISMODULE_GET_API(ReplyWithString);
REDISMODULE_GET_API(ReplyWithNull);
REDISMODULE_GET_API(ReplyWithCallReply);
diff --git a/src/replication.c b/src/replication.c
index 63a67a06a..26e7cf8f0 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -1113,11 +1113,65 @@ void restartAOFAfterSYNC() {
}
}
+static int useDisklessLoad() {
+ /* compute boolean decision to use diskless load */
+ return server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB ||
+ (server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0);
+}
+
+/* Helper function for readSyncBulkPayload() to make backups of the current
+ * DBs before socket-loading the new ones. The backups may be restored later
+ * or freed by disklessLoadRestoreBackups(). */
+redisDb *disklessLoadMakeBackups(void) {
+ redisDb *backups = zmalloc(sizeof(redisDb)*server.dbnum);
+ for (int i=0; i<server.dbnum; i++) {
+ backups[i] = server.db[i];
+ server.db[i].dict = dictCreate(&dbDictType,NULL);
+ server.db[i].expires = dictCreate(&keyptrDictType,NULL);
+ }
+ return backups;
+}
+
+/* Helper function for readSyncBulkPayload(): when replica-side diskless
+ * database loading is used, Redis makes a backup of the existing databases
+ * before loading the new ones from the socket.
+ *
+ * If the socket loading went wrong, we want to restore the old backups
+ * into the server databases. This function does just that in the case
+ * the 'restore' argument (the number of DBs to replace) is non-zero.
+ *
+ * When instead the loading succeeded we want just to free our old backups,
+ * in that case the funciton will do just that when 'restore' is 0. */
+void disklessLoadRestoreBackups(redisDb *backup, int restore, int empty_db_flags)
+{
+ if (restore) {
+ /* Restore. */
+ emptyDbGeneric(server.db,-1,empty_db_flags,replicationEmptyDbCallback);
+ for (int i=0; i<server.dbnum; i++) {
+ dictRelease(server.db[i].dict);
+ dictRelease(server.db[i].expires);
+ server.db[i] = backup[i];
+ }
+ } else {
+ /* Delete. */
+ emptyDbGeneric(backup,-1,empty_db_flags,replicationEmptyDbCallback);
+ for (int i=0; i<server.dbnum; i++) {
+ dictRelease(backup[i].dict);
+ dictRelease(backup[i].expires);
+ }
+ }
+ zfree(backup);
+}
+
/* Asynchronously read the SYNC payload we receive from a master */
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[4096];
ssize_t nread, readlen, nwritten;
+ int use_diskless_load;
+ redisDb *diskless_load_backup = NULL;
+ int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC :
+ EMPTYDB_NO_FLAGS;
off_t left;
UNUSED(el);
UNUSED(privdata);
@@ -1173,90 +1227,202 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
* at the next call. */
server.repl_transfer_size = 0;
serverLog(LL_NOTICE,
- "MASTER <-> REPLICA sync: receiving streamed RDB from master");
+ "MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF %s",
+ useDisklessLoad()? "to parser":"to disk");
} else {
usemark = 0;
server.repl_transfer_size = strtol(buf+1,NULL,10);
serverLog(LL_NOTICE,
- "MASTER <-> REPLICA sync: receiving %lld bytes from master",
- (long long) server.repl_transfer_size);
+ "MASTER <-> REPLICA sync: receiving %lld bytes from master %s",
+ (long long) server.repl_transfer_size,
+ useDisklessLoad()? "to parser":"to disk");
}
return;
}
- /* Read bulk data */
- if (usemark) {
- readlen = sizeof(buf);
- } else {
- left = server.repl_transfer_size - server.repl_transfer_read;
- readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
- }
-
- nread = read(fd,buf,readlen);
- if (nread <= 0) {
- serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
- (nread == -1) ? strerror(errno) : "connection lost");
- cancelReplicationHandshake();
- return;
- }
- server.stat_net_input_bytes += nread;
+ use_diskless_load = useDisklessLoad();
+ if (!use_diskless_load) {
+ /* Read the data from the socket, store it to a file and search
+ * for the EOF. */
+ if (usemark) {
+ readlen = sizeof(buf);
+ } else {
+ left = server.repl_transfer_size - server.repl_transfer_read;
+ readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
+ }
- /* When a mark is used, we want to detect EOF asap in order to avoid
- * writing the EOF mark into the file... */
- int eof_reached = 0;
+ nread = read(fd,buf,readlen);
+ if (nread <= 0) {
+ serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
+ (nread == -1) ? strerror(errno) : "connection lost");
+ cancelReplicationHandshake();
+ return;
+ }
+ server.stat_net_input_bytes += nread;
+
+ /* When a mark is used, we want to detect EOF asap in order to avoid
+ * writing the EOF mark into the file... */
+ int eof_reached = 0;
+
+ if (usemark) {
+ /* Update the last bytes array, and check if it matches our
+ * delimiter. */
+ if (nread >= CONFIG_RUN_ID_SIZE) {
+ memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,
+ CONFIG_RUN_ID_SIZE);
+ } else {
+ int rem = CONFIG_RUN_ID_SIZE-nread;
+ memmove(lastbytes,lastbytes+nread,rem);
+ memcpy(lastbytes+rem,buf,nread);
+ }
+ if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0)
+ eof_reached = 1;
+ }
- if (usemark) {
- /* Update the last bytes array, and check if it matches our delimiter.*/
- if (nread >= CONFIG_RUN_ID_SIZE) {
- memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE);
- } else {
- int rem = CONFIG_RUN_ID_SIZE-nread;
- memmove(lastbytes,lastbytes+nread,rem);
- memcpy(lastbytes+rem,buf,nread);
+ /* Update the last I/O time for the replication transfer (used in
+ * order to detect timeouts during replication), and write what we
+ * got from the socket to the dump file on disk. */
+ server.repl_transfer_lastio = server.unixtime;
+ if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) {
+ serverLog(LL_WARNING,
+ "Write error or short write writing to the DB dump file "
+ "needed for MASTER <-> REPLICA synchronization: %s",
+ (nwritten == -1) ? strerror(errno) : "short write");
+ goto error;
}
- if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1;
- }
+ server.repl_transfer_read += nread;
- server.repl_transfer_lastio = server.unixtime;
- if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) {
- serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> REPLICA synchronization: %s",
- (nwritten == -1) ? strerror(errno) : "short write");
- goto error;
- }
- server.repl_transfer_read += nread;
+ /* Delete the last 40 bytes from the file if we reached EOF. */
+ if (usemark && eof_reached) {
+ if (ftruncate(server.repl_transfer_fd,
+ server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
+ {
+ serverLog(LL_WARNING,
+ "Error truncating the RDB file received from the master "
+ "for SYNC: %s", strerror(errno));
+ goto error;
+ }
+ }
- /* Delete the last 40 bytes from the file if we reached EOF. */
- if (usemark && eof_reached) {
- if (ftruncate(server.repl_transfer_fd,
- server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
+ /* Sync data on disk from time to time, otherwise at the end of the
+ * transfer we may suffer a big delay as the memory buffers are copied
+ * into the actual disk. */
+ if (server.repl_transfer_read >=
+ server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
{
- serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
- goto error;
+ off_t sync_size = server.repl_transfer_read -
+ server.repl_transfer_last_fsync_off;
+ rdb_fsync_range(server.repl_transfer_fd,
+ server.repl_transfer_last_fsync_off, sync_size);
+ server.repl_transfer_last_fsync_off += sync_size;
}
+
+ /* Check if the transfer is now complete */
+ if (!usemark) {
+ if (server.repl_transfer_read == server.repl_transfer_size)
+ eof_reached = 1;
+ }
+
+ /* If the transfer is yet not complete, we need to read more, so
+ * return ASAP and wait for the handler to be called again. */
+ if (!eof_reached) return;
}
- /* Sync data on disk from time to time, otherwise at the end of the transfer
- * we may suffer a big delay as the memory buffers are copied into the
- * actual disk. */
- if (server.repl_transfer_read >=
- server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
+ /* We reach this point in one of the following cases:
+ *
+ * 1. The replica is using diskless replication, that is, it reads data
+ * directly from the socket to the Redis memory, without using
+ * a temporary RDB file on disk. In that case we just block and
+ * read everything from the socket.
+ *
+ * 2. Or when we are done reading from the socket to the RDB file, in
+ * such case we want just to read the RDB file in memory. */
+ serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
+
+ /* We need to stop any AOF rewriting child before flusing and parsing
+ * the RDB, otherwise we'll create a copy-on-write disaster. */
+ if (server.aof_state != AOF_OFF) stopAppendOnly();
+ signalFlushedDb(-1);
+
+ /* When diskless RDB loading is used by replicas, it may be configured
+ * in order to save the current DB instead of throwing it away,
+ * so that we can restore it in case of failed transfer. */
+ if (use_diskless_load &&
+ server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB)
{
- off_t sync_size = server.repl_transfer_read -
- server.repl_transfer_last_fsync_off;
- rdb_fsync_range(server.repl_transfer_fd,
- server.repl_transfer_last_fsync_off, sync_size);
- server.repl_transfer_last_fsync_off += sync_size;
- }
+ diskless_load_backup = disklessLoadMakeBackups();
+ } else {
+ emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
+ }
+
+ /* Before loading the DB into memory we need to delete the readable
+ * handler, otherwise it will get called recursively since
+ * rdbLoad() will call the event loop to process events from time to
+ * time for non blocking loading. */
+ aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
+ serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
+ rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
+ if (use_diskless_load) {
+ rio rdb;
+ rioInitWithFd(&rdb,fd,server.repl_transfer_size);
+
+ /* Put the socket in blocking mode to simplify RDB transfer.
+ * We'll restore it when the RDB is received. */
+ anetBlock(NULL,fd);
+ anetRecvTimeout(NULL,fd,server.repl_timeout*1000);
+ startLoading(server.repl_transfer_size);
+
+ if (rdbLoadRio(&rdb,&rsi,0) != C_OK) {
+ /* RDB loading failed. */
+ stopLoading();
+ serverLog(LL_WARNING,
+ "Failed trying to load the MASTER synchronization DB "
+ "from socket");
+ cancelReplicationHandshake();
+ rioFreeFd(&rdb, NULL);
+ if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
+ /* Restore the backed up databases. */
+ disklessLoadRestoreBackups(diskless_load_backup,1,
+ empty_db_flags);
+ } else {
+ /* Remove the half-loaded data in case we started with
+ * an empty replica. */
+ emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
+ }
- /* Check if the transfer is now complete */
- if (!usemark) {
- if (server.repl_transfer_read == server.repl_transfer_size)
- eof_reached = 1;
- }
+ /* Note that there's no point in restarting the AOF on SYNC
+ * failure, it'll be restarted when sync succeeds or the replica
+ * gets promoted. */
+ return;
+ }
+ stopLoading();
+
+ /* RDB loading succeeded if we reach this point. */
+ if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
+ /* Delete the backup databases we created before starting to load
+ * the new RDB. Now the RDB was loaded with success so the old
+ * data is useless. */
+ disklessLoadRestoreBackups(diskless_load_backup,0,empty_db_flags);
+ }
- if (eof_reached) {
- int aof_is_enabled = server.aof_state != AOF_OFF;
+ /* Verify the end mark is correct. */
+ if (usemark) {
+ if (!rioRead(&rdb,buf,CONFIG_RUN_ID_SIZE) ||
+ memcmp(buf,eofmark,CONFIG_RUN_ID_SIZE) != 0)
+ {
+ serverLog(LL_WARNING,"Replication stream EOF marker is broken");
+ cancelReplicationHandshake();
+ rioFreeFd(&rdb, NULL);
+ return;
+ }
+ }
+ /* Cleanup and restore the socket to the original state to continue
+ * with the normal replication. */
+ rioFreeFd(&rdb, NULL);
+ anetNonBlock(NULL,fd);
+ anetRecvTimeout(NULL,fd,0);
+ } else {
/* Ensure background save doesn't overwrite synced data */
if (server.rdb_child_pid != -1) {
serverLog(LL_NOTICE,
@@ -1269,59 +1435,53 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
}
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
- serverLog(LL_WARNING,"Failed trying to rename the temp DB into %s in MASTER <-> REPLICA synchronization: %s",
- server.rdb_filename, strerror(errno));
+ serverLog(LL_WARNING,
+ "Failed trying to rename the temp DB into %s in "
+ "MASTER <-> REPLICA synchronization: %s",
+ server.rdb_filename, strerror(errno));
cancelReplicationHandshake();
return;
}
- serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
- /* We need to stop any AOFRW fork before flusing and parsing
- * RDB, otherwise we'll create a copy-on-write disaster. */
- if(aof_is_enabled) stopAppendOnly();
- signalFlushedDb(-1);
- emptyDb(
- -1,
- server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS,
- replicationEmptyDbCallback);
- /* Before loading the DB into memory we need to delete the readable
- * handler, otherwise it will get called recursively since
- * rdbLoad() will call the event loop to process events from time to
- * time for non blocking loading. */
- aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
- serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
- rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
- serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
+ serverLog(LL_WARNING,
+ "Failed trying to load the MASTER synchronization "
+ "DB from disk");
cancelReplicationHandshake();
- /* Re-enable the AOF if we disabled it earlier, in order to restore
- * the original configuration. */
- if (aof_is_enabled) restartAOFAfterSYNC();
+ /* Note that there's no point in restarting the AOF on sync failure,
+ it'll be restarted when sync succeeds or replica promoted. */
return;
}
- /* Final setup of the connected slave <- master link */
+
+ /* Cleanup. */
zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd);
- replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
- server.repl_state = REPL_STATE_CONNECTED;
- server.repl_down_since = 0;
- /* After a full resynchroniziation we use the replication ID and
- * offset of the master. The secondary ID / offset are cleared since
- * we are starting a new history. */
- memcpy(server.replid,server.master->replid,sizeof(server.replid));
- server.master_repl_offset = server.master->reploff;
- clearReplicationId2();
- /* Let's create the replication backlog if needed. Slaves need to
- * accumulate the backlog regardless of the fact they have sub-slaves
- * or not, in order to behave correctly if they are promoted to
- * masters after a failover. */
- if (server.repl_backlog == NULL) createReplicationBacklog();
-
- serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");
- /* Restart the AOF subsystem now that we finished the sync. This
- * will trigger an AOF rewrite, and when done will start appending
- * to the new file. */
- if (aof_is_enabled) restartAOFAfterSYNC();
+ server.repl_transfer_fd = -1;
+ server.repl_transfer_tmpfile = NULL;
}
+
+ /* Final setup of the connected slave <- master link */
+ replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
+ server.repl_state = REPL_STATE_CONNECTED;
+ server.repl_down_since = 0;
+
+ /* After a full resynchroniziation we use the replication ID and
+ * offset of the master. The secondary ID / offset are cleared since
+ * we are starting a new history. */
+ memcpy(server.replid,server.master->replid,sizeof(server.replid));
+ server.master_repl_offset = server.master->reploff;
+ clearReplicationId2();
+
+ /* Let's create the replication backlog if needed. Slaves need to
+ * accumulate the backlog regardless of the fact they have sub-slaves
+ * or not, in order to behave correctly if they are promoted to
+ * masters after a failover. */
+ if (server.repl_backlog == NULL) createReplicationBacklog();
+ serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");
+
+ /* Restart the AOF subsystem now that we finished the sync. This
+ * will trigger an AOF rewrite, and when done will start appending
+ * to the new file. */
+ if (server.aof_enabled) restartAOFAfterSYNC();
return;
error:
@@ -1845,16 +2005,20 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
}
/* Prepare a suitable temp file for bulk transfer */
- while(maxtries--) {
- snprintf(tmpfile,256,
- "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
- dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
- if (dfd != -1) break;
- sleep(1);
- }
- if (dfd == -1) {
- serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
- goto error;
+ if (!useDisklessLoad()) {
+ while(maxtries--) {
+ snprintf(tmpfile,256,
+ "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
+ dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
+ if (dfd != -1) break;
+ sleep(1);
+ }
+ if (dfd == -1) {
+ serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
+ goto error;
+ }
+ server.repl_transfer_tmpfile = zstrdup(tmpfile);
+ server.repl_transfer_fd = dfd;
}
/* Setup the non blocking download of the bulk file. */
@@ -1871,15 +2035,19 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
- server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = server.unixtime;
- server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;
error:
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
if (dfd != -1) close(dfd);
close(fd);
+ if (server.repl_transfer_fd != -1)
+ close(server.repl_transfer_fd);
+ if (server.repl_transfer_tmpfile)
+ zfree(server.repl_transfer_tmpfile);
+ server.repl_transfer_tmpfile = NULL;
+ server.repl_transfer_fd = -1;
server.repl_transfer_s = -1;
server.repl_state = REPL_STATE_CONNECT;
return;
@@ -1933,9 +2101,13 @@ void undoConnectWithMaster(void) {
void replicationAbortSyncTransfer(void) {
serverAssert(server.repl_state == REPL_STATE_TRANSFER);
undoConnectWithMaster();
- close(server.repl_transfer_fd);
- unlink(server.repl_transfer_tmpfile);
- zfree(server.repl_transfer_tmpfile);
+ if (server.repl_transfer_fd!=-1) {
+ close(server.repl_transfer_fd);
+ unlink(server.repl_transfer_tmpfile);
+ zfree(server.repl_transfer_tmpfile);
+ server.repl_transfer_tmpfile = NULL;
+ server.repl_transfer_fd = -1;
+ }
}
/* This function aborts a non blocking replication attempt if there is one
@@ -2045,6 +2217,9 @@ void replicaofCommand(client *c) {
serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
client);
sdsfree(client);
+ /* Restart the AOF subsystem in case we shut it down during a sync when
+ * we were still a slave. */
+ if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOFAfterSYNC();
}
} else {
long port;
diff --git a/src/rio.c b/src/rio.c
index c9c76b8f2..5359bc3d6 100644
--- a/src/rio.c
+++ b/src/rio.c
@@ -157,7 +157,123 @@ void rioInitWithFile(rio *r, FILE *fp) {
r->io.file.autosync = 0;
}
-/* ------------------- File descriptors set implementation ------------------- */
+/* ------------------- File descriptor implementation -------------------
+ * We use this RIO implemetnation when reading an RDB file directly from
+ * the socket to the memory via rdbLoadRio(), thus this implementation
+ * only implements reading from a file descriptor that is, normally,
+ * just a socket. */
+
+static size_t rioFdWrite(rio *r, const void *buf, size_t len) {
+ UNUSED(r);
+ UNUSED(buf);
+ UNUSED(len);
+ return 0; /* Error, this target does not yet support writing. */
+}
+
+/* Returns 1 or 0 for success/failure. */
+static size_t rioFdRead(rio *r, void *buf, size_t len) {
+ size_t avail = sdslen(r->io.fd.buf)-r->io.fd.pos;
+
+ /* If the buffer is too small for the entire request: realloc. */
+ if (sdslen(r->io.fd.buf) + sdsavail(r->io.fd.buf) < len)
+ r->io.fd.buf = sdsMakeRoomFor(r->io.fd.buf, len - sdslen(r->io.fd.buf));
+
+ /* If the remaining unused buffer is not large enough: memmove so that we
+ * can read the rest. */
+ if (len > avail && sdsavail(r->io.fd.buf) < len - avail) {
+ sdsrange(r->io.fd.buf, r->io.fd.pos, -1);
+ r->io.fd.pos = 0;
+ }
+
+ /* If we don't already have all the data in the sds, read more */
+ while (len > sdslen(r->io.fd.buf) - r->io.fd.pos) {
+ size_t buffered = sdslen(r->io.fd.buf) - r->io.fd.pos;
+ size_t toread = len - buffered;
+ /* Read either what's missing, or PROTO_IOBUF_LEN, the bigger of
+ * the two. */
+ if (toread < PROTO_IOBUF_LEN) toread = PROTO_IOBUF_LEN;
+ if (toread > sdsavail(r->io.fd.buf)) toread = sdsavail(r->io.fd.buf);
+ if (r->io.fd.read_limit != 0 &&
+ r->io.fd.read_so_far + buffered + toread > r->io.fd.read_limit)
+ {
+ if (r->io.fd.read_limit >= r->io.fd.read_so_far - buffered)
+ toread = r->io.fd.read_limit - r->io.fd.read_so_far - buffered;
+ else {
+ errno = EOVERFLOW;
+ return 0;
+ }
+ }
+ int retval = read(r->io.fd.fd,
+ (char*)r->io.fd.buf + sdslen(r->io.fd.buf),
+ toread);
+ if (retval <= 0) {
+ if (errno == EWOULDBLOCK) errno = ETIMEDOUT;
+ return 0;
+ }
+ sdsIncrLen(r->io.fd.buf, retval);
+ }
+
+ memcpy(buf, (char*)r->io.fd.buf + r->io.fd.pos, len);
+ r->io.fd.read_so_far += len;
+ r->io.fd.pos += len;
+ return len;
+}
+
+/* Returns read/write position in file. */
+static off_t rioFdTell(rio *r) {
+ return r->io.fd.read_so_far;
+}
+
+/* Flushes any buffer to target device if applicable. Returns 1 on success
+ * and 0 on failures. */
+static int rioFdFlush(rio *r) {
+ /* Our flush is implemented by the write method, that recognizes a
+ * buffer set to NULL with a count of zero as a flush request. */
+ return rioFdWrite(r,NULL,0);
+}
+
+static const rio rioFdIO = {
+ rioFdRead,
+ rioFdWrite,
+ rioFdTell,
+ rioFdFlush,
+ NULL, /* update_checksum */
+ 0, /* current checksum */
+ 0, /* bytes read or written */
+ 0, /* read/write chunk size */
+ { { NULL, 0 } } /* union for io-specific vars */
+};
+
+/* Create an RIO that implements a buffered read from an fd
+ * read_limit argument stops buffering when the reaching the limit. */
+void rioInitWithFd(rio *r, int fd, size_t read_limit) {
+ *r = rioFdIO;
+ r->io.fd.fd = fd;
+ r->io.fd.pos = 0;
+ r->io.fd.read_limit = read_limit;
+ r->io.fd.read_so_far = 0;
+ r->io.fd.buf = sdsnewlen(NULL, PROTO_IOBUF_LEN);
+ sdsclear(r->io.fd.buf);
+}
+
+/* Release the RIO tream. Optionally returns the unread buffered data
+ * when the SDS pointer 'remaining' is passed. */
+void rioFreeFd(rio *r, sds *remaining) {
+ if (remaining && (size_t)r->io.fd.pos < sdslen(r->io.fd.buf)) {
+ if (r->io.fd.pos > 0) sdsrange(r->io.fd.buf, r->io.fd.pos, -1);
+ *remaining = r->io.fd.buf;
+ } else {
+ sdsfree(r->io.fd.buf);
+ if (remaining) *remaining = NULL;
+ }
+ r->io.fd.buf = NULL;
+}
+
+/* ------------------- File descriptors set implementation ------------------
+ * This target is used to write the RDB file to N different replicas via
+ * sockets, when the master just streams the data to the replicas without
+ * creating an RDB on-disk image (diskless replication option).
+ * It only implements writes. */
/* Returns 1 or 0 for success/failure.
* The function returns success as long as we are able to correctly write
@@ -300,7 +416,7 @@ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
* disk I/O concentrated in very little time. When we fsync in an explicit
* way instead the I/O pressure is more distributed across time. */
void rioSetAutoSync(rio *r, off_t bytes) {
- serverAssert(r->read == rioFileIO.read);
+ if(r->write != rioFileIO.write) return;
r->io.file.autosync = bytes;
}
diff --git a/src/rio.h b/src/rio.h
index c996c54f6..beea06888 100644
--- a/src/rio.h
+++ b/src/rio.h
@@ -73,6 +73,14 @@ struct _rio {
off_t buffered; /* Bytes written since last fsync. */
off_t autosync; /* fsync after 'autosync' bytes written. */
} file;
+ /* file descriptor */
+ struct {
+ int fd; /* File descriptor. */
+ off_t pos; /* pos in buf that was returned */
+ sds buf; /* buffered data */
+ size_t read_limit; /* don't allow to buffer/read more than that */
+ size_t read_so_far; /* amount of data read from the rio (not buffered) */
+ } fd;
/* Multiple FDs target (used to write to N sockets). */
struct {
int *fds; /* File descriptors. */
@@ -126,9 +134,11 @@ static inline int rioFlush(rio *r) {
void rioInitWithFile(rio *r, FILE *fp);
void rioInitWithBuffer(rio *r, sds s);
+void rioInitWithFd(rio *r, int fd, size_t read_limit);
void rioInitWithFdset(rio *r, int *fds, int numfds);
void rioFreeFdset(rio *r);
+void rioFreeFd(rio *r, sds* out_remainingBufferedData);
size_t rioWriteBulkCount(rio *r, char prefix, long count);
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);
diff --git a/src/server.c b/src/server.c
index e1d48e596..4337b8f01 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2265,6 +2265,7 @@ void initServerConfig(void) {
server.aof_flush_postponed_start = 0;
server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
server.rdb_save_incremental_fsync = CONFIG_DEFAULT_RDB_SAVE_INCREMENTAL_FSYNC;
+ server.rdb_key_save_delay = CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY;
server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED;
server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE;
server.pidfile = NULL;
@@ -2334,6 +2335,9 @@ void initServerConfig(void) {
server.cached_master = NULL;
server.master_initial_offset = -1;
server.repl_state = REPL_STATE_NONE;
+ server.repl_transfer_tmpfile = NULL;
+ server.repl_transfer_fd = -1;
+ server.repl_transfer_s = -1;
server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA;
server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY;
@@ -2342,6 +2346,7 @@ void initServerConfig(void) {
server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY;
server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC;
+ server.repl_diskless_load = CONFIG_DEFAULT_REPL_DISKLESS_LOAD;
server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY;
server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD;
server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT;
@@ -3194,6 +3199,7 @@ void call(client *c, int flags) {
latencyAddSampleIfNeeded(latency_event,duration/1000);
slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
}
+
if (flags & CMD_CALL_STATS) {
/* use the real command that was executed (cmd and lastamc) may be
* different, in case of MULTI-EXEC or re-written commands such as
@@ -3261,6 +3267,16 @@ void call(client *c, int flags) {
redisOpArrayFree(&server.also_propagate);
}
server.also_propagate = prev_also_propagate;
+
+ /* If the client has keys tracking enabled for client side caching,
+ * make sure to remember the keys it fetched via this command. */
+ if (c->cmd->flags & CMD_READONLY) {
+ client *caller = (c->flags & CLIENT_LUA && server.lua_caller) ?
+ server.lua_caller : c;
+ if (caller->flags & CLIENT_TRACKING)
+ trackingRememberKeys(caller);
+ }
+
server.stat_numcommands++;
}
@@ -3879,10 +3895,12 @@ sds genRedisInfoString(char *section) {
"connected_clients:%lu\r\n"
"client_recent_max_input_buffer:%zu\r\n"
"client_recent_max_output_buffer:%zu\r\n"
- "blocked_clients:%d\r\n",
+ "blocked_clients:%d\r\n"
+ "tracking_clients:%d\r\n",
listLength(server.clients)-listLength(server.slaves),
maxin, maxout,
- server.blocked_clients);
+ server.blocked_clients,
+ server.tracking_clients);
}
/* Memory */
@@ -4042,7 +4060,7 @@ sds genRedisInfoString(char *section) {
(server.aof_last_write_status == C_OK) ? "ok" : "err",
server.stat_aof_cow_bytes);
- if (server.aof_state != AOF_OFF) {
+ if (server.aof_enabled) {
info = sdscatprintf(info,
"aof_current_size:%lld\r\n"
"aof_base_size:%lld\r\n"
diff --git a/src/server.h b/src/server.h
index 0813f8bd1..f81b1010e 100644
--- a/src/server.h
+++ b/src/server.h
@@ -132,6 +132,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define CONFIG_DEFAULT_RDB_FILENAME "dump.rdb"
#define CONFIG_DEFAULT_REPL_DISKLESS_SYNC 0
#define CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY 5
+#define CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY 0
#define CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA 1
#define CONFIG_DEFAULT_SLAVE_READ_ONLY 1
#define CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY 1
@@ -254,8 +255,8 @@ typedef long long mstime_t; /* millisecond time type. */
#define AOF_WAIT_REWRITE 2 /* AOF waits rewrite to start appending */
/* Client flags */
-#define CLIENT_SLAVE (1<<0) /* This client is a slave server */
-#define CLIENT_MASTER (1<<1) /* This client is a master server */
+#define CLIENT_SLAVE (1<<0) /* This client is a repliaca */
+#define CLIENT_MASTER (1<<1) /* This client is a master */
#define CLIENT_MONITOR (1<<2) /* This client is a slave monitor, see MONITOR */
#define CLIENT_MULTI (1<<3) /* This client is in a MULTI context */
#define CLIENT_BLOCKED (1<<4) /* The client is waiting in a blocking operation */
@@ -289,7 +290,13 @@ typedef long long mstime_t; /* millisecond time type. */
#define CLIENT_PENDING_READ (1<<29) /* The client has pending reads and was put
in the list of clients we can read
from. */
-#define CLIENT_PENDING_COMMAND (1<<30) /* */
+#define CLIENT_PENDING_COMMAND (1<<30) /* Used in threaded I/O to signal after
+ we return single threaded that the
+ client has already pending commands
+ to be executed. */
+#define CLIENT_TRACKING (1<<31) /* Client enabled keys tracking in order to
+ perform client side caching. */
+#define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
@@ -388,6 +395,12 @@ typedef long long mstime_t; /* millisecond time type. */
#define AOF_FSYNC_EVERYSEC 2
#define CONFIG_DEFAULT_AOF_FSYNC AOF_FSYNC_EVERYSEC
+/* Replication diskless load defines */
+#define REPL_DISKLESS_LOAD_DISABLED 0
+#define REPL_DISKLESS_LOAD_WHEN_DB_EMPTY 1
+#define REPL_DISKLESS_LOAD_SWAPDB 2
+#define CONFIG_DEFAULT_REPL_DISKLESS_LOAD REPL_DISKLESS_LOAD_DISABLED
+
/* Zipped structures related defaults */
#define OBJ_HASH_MAX_ZIPLIST_ENTRIES 512
#define OBJ_HASH_MAX_ZIPLIST_VALUE 64
@@ -646,6 +659,11 @@ typedef struct redisObject {
void *ptr;
} robj;
+/* The a string name for an object's type as listed above
+ * Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines,
+ * and Module types have their registered name returned. */
+char *getObjectTypeName(robj*);
+
/* Macro used to initialize a Redis object allocated on the stack.
* Note that this macro is taken near the structure definition to make sure
* we'll update it when the structure is changed, to avoid bugs like
@@ -816,7 +834,7 @@ typedef struct client {
time_t ctime; /* Client creation time. */
time_t lastinteraction; /* Time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time;
- int flags; /* Client flags: CLIENT_* macros. */
+ uint64_t flags; /* Client flags: CLIENT_* macros. */
int authenticated; /* Needed when the default user requires auth. */
int replstate; /* Replication state if this is a slave. */
int repl_put_online_on_ack; /* Install slave write handler on ACK. */
@@ -845,6 +863,11 @@ typedef struct client {
sds peerid; /* Cached peer ID. */
listNode *client_list_node; /* list node in client list */
+ /* If this client is in tracking mode and this field is non zero,
+ * invalidation messages for keys fetched by this client will be send to
+ * the specified client ID. */
+ uint64_t client_tracking_redirection;
+
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
@@ -1142,6 +1165,7 @@ struct redisServer {
int daemonize; /* True if running as a daemon */
clientBufferLimitsConfig client_obuf_limits[CLIENT_TYPE_OBUF_COUNT];
/* AOF persistence */
+ int aof_enabled; /* AOF configuration */
int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */
int aof_fsync; /* Kind of fsync() policy */
char *aof_filename; /* Name of the AOF file */
@@ -1198,6 +1222,8 @@ struct redisServer {
int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */
int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */
int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */
+ int rdb_key_save_delay; /* Delay in microseconds between keys while
+ * writing the RDB. (for testings) */
/* Pipe and data structures for child -> parent info sharing. */
int child_info_pipe[2]; /* Pipe used to write the child_info_data. */
struct {
@@ -1233,7 +1259,9 @@ struct redisServer {
int repl_min_slaves_to_write; /* Min number of slaves to write. */
int repl_min_slaves_max_lag; /* Max lag of <count> slaves to write. */
int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */
- int repl_diskless_sync; /* Send RDB to slaves sockets directly. */
+ int repl_diskless_sync; /* Master send RDB to slaves sockets directly. */
+ int repl_diskless_load; /* Slave parse RDB directly from the socket.
+ * see REPL_DISKLESS_LOAD_* enum */
int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
/* Replication (slave) */
char *masteruser; /* AUTH with this user and masterauth with master */
@@ -1286,6 +1314,8 @@ struct redisServer {
unsigned int blocked_clients_by_type[BLOCKED_NUM];
list *unblocked_clients; /* list of clients to unblock before next loop */
list *ready_keys; /* List of readyList structures for BLPOP & co */
+ /* Client side caching. */
+ unsigned int tracking_clients; /* # of clients with tracking enabled.*/
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
@@ -1591,6 +1621,7 @@ void linkClient(client *c);
void protectClient(client *c);
void unprotectClient(client *c);
void initThreadedIO(void);
+client *lookupClientByID(uint64_t id);
#ifdef __GNUC__
void addReplyErrorFormat(client *c, const char *fmt, ...)
@@ -1602,6 +1633,12 @@ void addReplyErrorFormat(client *c, const char *fmt, ...);
void addReplyStatusFormat(client *c, const char *fmt, ...);
#endif
+/* Client side caching (tracking mode) */
+void enableTracking(client *c, uint64_t redirect_to);
+void disableTracking(client *c);
+void trackingRememberKeys(client *c);
+void trackingInvalidateKey(robj *keyobj);
+
/* List data type */
void listTypeTryConversion(robj *subject, robj *value);
void listTypePush(robj *subject, robj *value, int where);
@@ -1714,7 +1751,8 @@ void replicationCacheMasterUsingMyself(void);
void feedReplicationBacklog(void *ptr, size_t len);
/* Generic persistence functions */
-void startLoading(FILE *fp);
+void startLoadingFile(FILE* fp, char* filename);
+void startLoading(size_t size);
void loadingProgress(off_t pos);
void stopLoading(void);
@@ -1926,6 +1964,7 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify);
void freePubsubPattern(void *p);
int listMatchPubsubPattern(void *a, void *b);
int pubsubPublishMessage(robj *channel, robj *message);
+void addReplyPubsubMessage(client *c, robj *channel, robj *msg);
/* Keyspace events notification */
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid);
@@ -1970,6 +2009,8 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o);
#define EMPTYDB_NO_FLAGS 0 /* No flags. */
#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */
long long emptyDb(int dbnum, int flags, void(callback)(void*));
+long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*));
+long long dbTotalServerKeyCount();
int selectDb(client *c, int id);
void signalModifiedKey(redisDb *db, robj *key);
diff --git a/src/tracking.c b/src/tracking.c
new file mode 100644
index 000000000..bbfc66a72
--- /dev/null
+++ b/src/tracking.c
@@ -0,0 +1,175 @@
+/* tracking.c - Client side caching: keys tracking and invalidation
+ *
+ * Copyright (c) 2019, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "server.h"
+
+/* The tracking table is constituted by 2^24 radix trees (each tree, and the
+ * table itself, are allocated in a lazy way only when needed) tracking
+ * clients that may have certain keys in their local, client side, cache.
+ *
+ * Keys are grouped into 2^24 slots, in a way similar to Redis Cluster hash
+ * slots, however here the function we use is crc64, taking the least
+ * significant 24 bits of the output.
+ *
+ * When a client enables tracking with "CLIENT TRACKING on", each key served to
+ * the client is hashed to one of such slots, and Redis will remember what
+ * client may have keys about such slot. Later, when a key in a given slot is
+ * modified, all the clients that may have local copies of keys in that slot
+ * will receive an invalidation message. There is no distinction of database
+ * number: a single table is used.
+ *
+ * Clients will normally take frequently requested objects in memory, removing
+ * them when invalidation messages are received. A strategy clients may use is
+ * to just cache objects in a dictionary, associating to each cached object
+ * some incremental epoch, or just a timestamp. When invalidation messages are
+ * received clients may store, in a different table, the timestamp (or epoch)
+ * of the invalidation of such given slot: later when accessing objects, the
+ * eviction of stale objects may be performed in a lazy way by checking if the
+ * cached object timestamp is older than the invalidation timestamp for such
+ * objects.
+ *
+ * The output of the 24 bit hash function is very large (more than 16 million
+ * possible slots), so clients that may want to use less resources may only
+ * use the most significant bits instead of the full 24 bits. */
+#define TRACKING_TABLE_SIZE (1<<24)
+rax **TrackingTable = NULL;
+robj *TrackingChannelName;
+
+/* Remove the tracking state from the client 'c'. Note that there is not much
+ * to do for us here, if not to decrement the counter of the clients in
+ * tracking mode, because we just store the ID of the client in the tracking
+ * table, so we'll remove the ID reference in a lazy way. Otherwise when a
+ * client with many entries in the table is removed, it would cost a lot of
+ * time to do the cleanup. */
+void disableTracking(client *c) {
+ if (c->flags & CLIENT_TRACKING) {
+ server.tracking_clients--;
+ c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR);
+ }
+}
+
+/* Enable the tracking state for the client 'c', and as a side effect allocates
+ * the tracking table if needed. If the 'redirect_to' argument is non zero, the
+ * invalidation messages for this client will be sent to the client ID
+ * specified by the 'redirect_to' argument. Note that if such client will
+ * eventually get freed, we'll send a message to the original client to
+ * inform it of the condition. Multiple clients can redirect the invalidation
+ * messages to the same client ID. */
+void enableTracking(client *c, uint64_t redirect_to) {
+ if (c->flags & CLIENT_TRACKING) return;
+ c->flags |= CLIENT_TRACKING;
+ c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR;
+ c->client_tracking_redirection = redirect_to;
+ server.tracking_clients++;
+ if (TrackingTable == NULL) {
+ TrackingTable = zcalloc(sizeof(rax*) * TRACKING_TABLE_SIZE);
+ TrackingChannelName = createStringObject("__redis__:invalidate",20);
+ }
+}
+
+/* This function is called after the excution of a readonly command in the
+ * case the client 'c' has keys tracking enabled. It will populate the
+ * tracking ivalidation table according to the keys the user fetched, so that
+ * Redis will know what are the clients that should receive an invalidation
+ * message with certain groups of keys are modified. */
+void trackingRememberKeys(client *c) {
+ int numkeys;
+ int *keys = getKeysFromCommand(c->cmd,c->argv,c->argc,&numkeys);
+ if (keys == NULL) return;
+
+ for(int j = 0; j < numkeys; j++) {
+ int idx = keys[j];
+ sds sdskey = c->argv[idx]->ptr;
+ uint64_t hash = crc64(0,
+ (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
+ if (TrackingTable[hash] == NULL)
+ TrackingTable[hash] = raxNew();
+ raxTryInsert(TrackingTable[hash],
+ (unsigned char*)&c->id,sizeof(c->id),NULL,NULL);
+ }
+ getKeysFreeResult(keys);
+}
+
+/* This function is called from signalModifiedKey() or other places in Redis
+ * when a key changes value. In the context of keys tracking, our task here is
+ * to send a notification to every client that may have keys about such . */
+void trackingInvalidateKey(robj *keyobj) {
+ sds sdskey = keyobj->ptr;
+ uint64_t hash = crc64(0,
+ (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
+ if (TrackingTable == NULL || TrackingTable[hash] == NULL) return;
+
+ raxIterator ri;
+ raxStart(&ri,TrackingTable[hash]);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ uint64_t id;
+ memcpy(&id,ri.key,ri.key_len);
+ client *c = lookupClientByID(id);
+ if (c == NULL) continue;
+ int using_redirection = 0;
+ if (c->client_tracking_redirection) {
+ client *redir = lookupClientByID(c->client_tracking_redirection);
+ if (!redir) {
+ /* We need to signal to the original connection that we
+ * are unable to send invalidation messages to the redirected
+ * connection, because the client no longer exist. */
+ if (c->resp > 2) {
+ addReplyPushLen(c,3);
+ addReplyBulkCBuffer(c,"tracking-redir-broken",21);
+ addReplyLongLong(c,c->client_tracking_redirection);
+ }
+ continue;
+ }
+ c = redir;
+ using_redirection = 1;
+ }
+
+ /* Only send such info for clients in RESP version 3 or more. However
+ * if redirection is active, and the connection we redirect to is
+ * in Pub/Sub mode, we can support the feature with RESP 2 as well,
+ * by sending Pub/Sub messages in the __redis__:invalidate channel. */
+ if (c->resp > 2) {
+ addReplyPushLen(c,2);
+ addReplyBulkCBuffer(c,"invalidate",10);
+ addReplyLongLong(c,hash);
+ } else if (using_redirection && c->flags & CLIENT_PUBSUB) {
+ robj *msg = createStringObjectFromLongLong(hash);
+ addReplyPubsubMessage(c,TrackingChannelName,msg);
+ decrRefCount(msg);
+ }
+ }
+ raxStop(&ri);
+
+ /* Free the tracking table: we'll create the radix tree and populate it
+ * again if more keys will be modified in this hash slot. */
+ raxFree(TrackingTable[hash]);
+ TrackingTable[hash] = NULL;
+}
diff --git a/tests/integration/replication-4.tcl b/tests/integration/replication-4.tcl
index 3c6df52a8..54891151b 100644
--- a/tests/integration/replication-4.tcl
+++ b/tests/integration/replication-4.tcl
@@ -1,12 +1,3 @@
-proc start_bg_complex_data {host port db ops} {
- set tclsh [info nameofexecutable]
- exec $tclsh tests/helpers/bg_complex_data.tcl $host $port $db $ops &
-}
-
-proc stop_bg_complex_data {handle} {
- catch {exec /bin/kill -9 $handle}
-}
-
start_server {tags {"repl"}} {
start_server {} {
diff --git a/tests/integration/replication-psync.tcl b/tests/integration/replication-psync.tcl
index bf8682446..3c98723af 100644
--- a/tests/integration/replication-psync.tcl
+++ b/tests/integration/replication-psync.tcl
@@ -1,12 +1,3 @@
-proc start_bg_complex_data {host port db ops} {
- set tclsh [info nameofexecutable]
- exec $tclsh tests/helpers/bg_complex_data.tcl $host $port $db $ops &
-}
-
-proc stop_bg_complex_data {handle} {
- catch {exec /bin/kill -9 $handle}
-}
-
# Creates a master-slave pair and breaks the link continuously to force
# partial resyncs attempts, all this while flooding the master with
# write queries.
@@ -17,7 +8,7 @@ proc stop_bg_complex_data {handle} {
# If reconnect is > 0, the test actually try to break the connection and
# reconnect with the master, otherwise just the initial synchronization is
# checked for consistency.
-proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless reconnect} {
+proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} {
start_server {tags {"repl"}} {
start_server {} {
@@ -28,8 +19,9 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec
$master config set repl-backlog-size $backlog_size
$master config set repl-backlog-ttl $backlog_ttl
- $master config set repl-diskless-sync $diskless
+ $master config set repl-diskless-sync $mdl
$master config set repl-diskless-sync-delay 1
+ $slave config set repl-diskless-load $sdl
set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000]
set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000]
@@ -54,7 +46,7 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec
}
}
- test "Test replication partial resync: $descr (diskless: $diskless, reconnect: $reconnect)" {
+ test "Test replication partial resync: $descr (diskless: $mdl, $sdl, reconnect: $reconnect)" {
# Now while the clients are writing data, break the maste-slave
# link multiple times.
if ($reconnect) {
@@ -132,23 +124,25 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec
}
}
-foreach diskless {no yes} {
- test_psync {no reconnection, just sync} 6 1000000 3600 0 {
- } $diskless 0
+foreach mdl {no yes} {
+ foreach sdl {disabled swapdb} {
+ test_psync {no reconnection, just sync} 6 1000000 3600 0 {
+ } $mdl $sdl 0
- test_psync {ok psync} 6 100000000 3600 0 {
+ test_psync {ok psync} 6 100000000 3600 0 {
assert {[s -1 sync_partial_ok] > 0}
- } $diskless 1
+ } $mdl $sdl 1
- test_psync {no backlog} 6 100 3600 0.5 {
+ test_psync {no backlog} 6 100 3600 0.5 {
assert {[s -1 sync_partial_err] > 0}
- } $diskless 1
+ } $mdl $sdl 1
- test_psync {ok after delay} 3 100000000 3600 3 {
+ test_psync {ok after delay} 3 100000000 3600 3 {
assert {[s -1 sync_partial_ok] > 0}
- } $diskless 1
+ } $mdl $sdl 1
- test_psync {backlog expired} 3 100000000 1 3 {
+ test_psync {backlog expired} 3 100000000 1 3 {
assert {[s -1 sync_partial_err] > 0}
- } $diskless 1
+ } $mdl $sdl 1
+ }
}
diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl
index 0e50c20a9..d69a1761a 100644
--- a/tests/integration/replication.tcl
+++ b/tests/integration/replication.tcl
@@ -183,85 +183,92 @@ start_server {tags {"repl"}} {
}
}
-foreach dl {no yes} {
- start_server {tags {"repl"}} {
- set master [srv 0 client]
- $master config set repl-diskless-sync $dl
- set master_host [srv 0 host]
- set master_port [srv 0 port]
- set slaves {}
- set load_handle0 [start_write_load $master_host $master_port 3]
- set load_handle1 [start_write_load $master_host $master_port 5]
- set load_handle2 [start_write_load $master_host $master_port 20]
- set load_handle3 [start_write_load $master_host $master_port 8]
- set load_handle4 [start_write_load $master_host $master_port 4]
- start_server {} {
- lappend slaves [srv 0 client]
+foreach mdl {no yes} {
+ foreach sdl {disabled swapdb} {
+ start_server {tags {"repl"}} {
+ set master [srv 0 client]
+ $master config set repl-diskless-sync $mdl
+ $master config set repl-diskless-sync-delay 1
+ set master_host [srv 0 host]
+ set master_port [srv 0 port]
+ set slaves {}
+ set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000]
+ set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000]
+ set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000000]
+ set load_handle3 [start_write_load $master_host $master_port 8]
+ set load_handle4 [start_write_load $master_host $master_port 4]
+ after 5000 ;# wait for some data to accumulate so that we have RDB part for the fork
start_server {} {
lappend slaves [srv 0 client]
start_server {} {
lappend slaves [srv 0 client]
- test "Connect multiple replicas at the same time (issue #141), diskless=$dl" {
- # Send SLAVEOF commands to slaves
- [lindex $slaves 0] slaveof $master_host $master_port
- [lindex $slaves 1] slaveof $master_host $master_port
- [lindex $slaves 2] slaveof $master_host $master_port
-
- # Wait for all the three slaves to reach the "online"
- # state from the POV of the master.
- set retry 500
- while {$retry} {
- set info [r -3 info]
- if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} {
- break
+ start_server {} {
+ lappend slaves [srv 0 client]
+ test "Connect multiple replicas at the same time (issue #141), master diskless=$mdl, replica diskless=$sdl" {
+ # Send SLAVEOF commands to slaves
+ [lindex $slaves 0] config set repl-diskless-load $sdl
+ [lindex $slaves 1] config set repl-diskless-load $sdl
+ [lindex $slaves 2] config set repl-diskless-load $sdl
+ [lindex $slaves 0] slaveof $master_host $master_port
+ [lindex $slaves 1] slaveof $master_host $master_port
+ [lindex $slaves 2] slaveof $master_host $master_port
+
+ # Wait for all the three slaves to reach the "online"
+ # state from the POV of the master.
+ set retry 500
+ while {$retry} {
+ set info [r -3 info]
+ if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} {
+ break
+ } else {
+ incr retry -1
+ after 100
+ }
+ }
+ if {$retry == 0} {
+ error "assertion:Slaves not correctly synchronized"
+ }
+
+ # Wait that slaves acknowledge they are online so
+ # we are sure that DBSIZE and DEBUG DIGEST will not
+ # fail because of timing issues.
+ wait_for_condition 500 100 {
+ [lindex [[lindex $slaves 0] role] 3] eq {connected} &&
+ [lindex [[lindex $slaves 1] role] 3] eq {connected} &&
+ [lindex [[lindex $slaves 2] role] 3] eq {connected}
} else {
- incr retry -1
- after 100
+ fail "Slaves still not connected after some time"
}
- }
- if {$retry == 0} {
- error "assertion:Replicas not correctly synchronized"
- }
- # Wait that slaves acknowledge they are online so
- # we are sure that DBSIZE and DEBUG DIGEST will not
- # fail because of timing issues.
- wait_for_condition 500 100 {
- [lindex [[lindex $slaves 0] role] 3] eq {connected} &&
- [lindex [[lindex $slaves 1] role] 3] eq {connected} &&
- [lindex [[lindex $slaves 2] role] 3] eq {connected}
- } else {
- fail "Replicas still not connected after some time"
- }
+ # Stop the write load
+ stop_bg_complex_data $load_handle0
+ stop_bg_complex_data $load_handle1
+ stop_bg_complex_data $load_handle2
+ stop_write_load $load_handle3
+ stop_write_load $load_handle4
+
+ # Make sure that slaves and master have same
+ # number of keys
+ wait_for_condition 500 100 {
+ [$master dbsize] == [[lindex $slaves 0] dbsize] &&
+ [$master dbsize] == [[lindex $slaves 1] dbsize] &&
+ [$master dbsize] == [[lindex $slaves 2] dbsize]
+ } else {
+ fail "Different number of keys between master and replica after too long time."
+ }
- # Stop the write load
- stop_write_load $load_handle0
- stop_write_load $load_handle1
- stop_write_load $load_handle2
- stop_write_load $load_handle3
- stop_write_load $load_handle4
-
- # Make sure that slaves and master have same
- # number of keys
- wait_for_condition 500 100 {
- [$master dbsize] == [[lindex $slaves 0] dbsize] &&
- [$master dbsize] == [[lindex $slaves 1] dbsize] &&
- [$master dbsize] == [[lindex $slaves 2] dbsize]
- } else {
- fail "Different number of keys between masted and replica after too long time."
+ # Check digests
+ set digest [$master debug digest]
+ set digest0 [[lindex $slaves 0] debug digest]
+ set digest1 [[lindex $slaves 1] debug digest]
+ set digest2 [[lindex $slaves 2] debug digest]
+ assert {$digest ne 0000000000000000000000000000000000000000}
+ assert {$digest eq $digest0}
+ assert {$digest eq $digest1}
+ assert {$digest eq $digest2}
}
-
- # Check digests
- set digest [$master debug digest]
- set digest0 [[lindex $slaves 0] debug digest]
- set digest1 [[lindex $slaves 1] debug digest]
- set digest2 [[lindex $slaves 2] debug digest]
- assert {$digest ne 0000000000000000000000000000000000000000}
- assert {$digest eq $digest0}
- assert {$digest eq $digest1}
- assert {$digest eq $digest2}
- }
- }
+ }
+ }
}
}
}
@@ -309,3 +316,70 @@ start_server {tags {"repl"}} {
}
}
}
+
+test {slave fails full sync and diskless load swapdb recoveres it} {
+ start_server {tags {"repl"}} {
+ set slave [srv 0 client]
+ set slave_host [srv 0 host]
+ set slave_port [srv 0 port]
+ set slave_log [srv 0 stdout]
+ start_server {} {
+ set master [srv 0 client]
+ set master_host [srv 0 host]
+ set master_port [srv 0 port]
+
+ # Put different data sets on the master and slave
+ # we need to put large keys on the master since the slave replies to info only once in 2mb
+ $slave debug populate 2000 slave 10
+ $master debug populate 200 master 100000
+ $master config set rdbcompression no
+
+ # Set master and slave to use diskless replication
+ $master config set repl-diskless-sync yes
+ $master config set repl-diskless-sync-delay 0
+ $slave config set repl-diskless-load swapdb
+
+ # Set master with a slow rdb generation, so that we can easily disconnect it mid sync
+ # 10ms per key, with 200 keys is 2 seconds
+ $master config set rdb-key-save-delay 10000
+
+ # Start the replication process...
+ $slave slaveof $master_host $master_port
+
+ # wait for the slave to start reading the rdb
+ wait_for_condition 50 100 {
+ [s -1 loading] eq 1
+ } else {
+ fail "Replica didn't get into loading mode"
+ }
+
+ # make sure that next sync will not start immediately so that we can catch the slave in betweeen syncs
+ $master config set repl-diskless-sync-delay 5
+ # for faster server shutdown, make rdb saving fast again (the fork is already uses the slow one)
+ $master config set rdb-key-save-delay 0
+
+ # waiting slave to do flushdb (key count drop)
+ wait_for_condition 50 100 {
+ 2000 != [scan [regexp -inline {keys\=([\d]*)} [$slave info keyspace]] keys=%d]
+ } else {
+ fail "Replica didn't flush"
+ }
+
+ # make sure we're still loading
+ assert_equal [s -1 loading] 1
+
+ # kill the slave connection on the master
+ set killed [$master client kill type slave]
+
+ # wait for loading to stop (fail)
+ wait_for_condition 50 100 {
+ [s -1 loading] eq 0
+ } else {
+ fail "Replica didn't disconnect"
+ }
+
+ # make sure the original keys were restored
+ assert_equal [$slave dbsize] 2000
+ }
+ }
+}
diff --git a/tests/support/util.tcl b/tests/support/util.tcl
index 74f491e48..41cc5612a 100644
--- a/tests/support/util.tcl
+++ b/tests/support/util.tcl
@@ -399,3 +399,15 @@ proc lshuffle {list} {
}
return $slist
}
+
+# Execute a background process writing complex data for the specified number
+# of ops to the specified Redis instance.
+proc start_bg_complex_data {host port db ops} {
+ set tclsh [info nameofexecutable]
+ exec $tclsh tests/helpers/bg_complex_data.tcl $host $port $db $ops &
+}
+
+# Stop a process generating write load executed with start_bg_complex_data.
+proc stop_bg_complex_data {handle} {
+ catch {exec /bin/kill -9 $handle}
+}
diff --git a/tests/unit/geo.tcl b/tests/unit/geo.tcl
index 604697be4..49e421ee9 100644
--- a/tests/unit/geo.tcl
+++ b/tests/unit/geo.tcl
@@ -61,6 +61,7 @@ set regression_vectors {
{939895 151 59.149620271823181 65.204186651485145}
{1412 156 149.29737817929004 15.95807862745508}
{564862 149 84.062063109158544 -65.685403922426232}
+ {1546032440391 16751 -1.8175081637769495 20.665668878082954}
}
set rv_idx 0
@@ -274,8 +275,19 @@ start_server {tags {"geo"}} {
foreach place $diff {
set mydist [geo_distance $lon $lat $search_lon $search_lat]
set mydist [expr $mydist/1000]
- if {($mydist / $radius_km) > 0.999} {incr rounding_errors}
+ if {($mydist / $radius_km) > 0.999} {
+ incr rounding_errors
+ continue
+ }
+ if {$mydist < $radius_m} {
+ # This is a false positive for redis since given the
+ # same points the higher precision calculation provided
+ # by TCL shows the point within range
+ incr rounding_errors
+ continue
+ }
}
+
# Make sure this is a real error and not a rounidng issue.
if {[llength $diff] == $rounding_errors} {
set res $res2; # Error silenced
diff --git a/tests/unit/scan.tcl b/tests/unit/scan.tcl
index c0f4349d2..9f9ff4df2 100644
--- a/tests/unit/scan.tcl
+++ b/tests/unit/scan.tcl
@@ -53,6 +53,51 @@ start_server {tags {"scan"}} {
assert_equal 100 [llength $keys]
}
+ test "SCAN TYPE" {
+ r flushdb
+ # populate only creates strings
+ r debug populate 1000
+
+ # Check non-strings are excluded
+ set cur 0
+ set keys {}
+ while 1 {
+ set res [r scan $cur type "list"]
+ set cur [lindex $res 0]
+ set k [lindex $res 1]
+ lappend keys {*}$k
+ if {$cur == 0} break
+ }
+
+ assert_equal 0 [llength $keys]
+
+ # Check strings are included
+ set cur 0
+ set keys {}
+ while 1 {
+ set res [r scan $cur type "string"]
+ set cur [lindex $res 0]
+ set k [lindex $res 1]
+ lappend keys {*}$k
+ if {$cur == 0} break
+ }
+
+ assert_equal 1000 [llength $keys]
+
+ # Check all three args work together
+ set cur 0
+ set keys {}
+ while 1 {
+ set res [r scan $cur type "string" match "key:*" count 10]
+ set cur [lindex $res 0]
+ set k [lindex $res 1]
+ lappend keys {*}$k
+ if {$cur == 0} break
+ }
+
+ assert_equal 1000 [llength $keys]
+ }
+
foreach enc {intset hashtable} {
test "SSCAN with encoding $enc" {
# Create the Set