diff options
-rw-r--r-- | redis.conf | 16 | ||||
-rw-r--r-- | src/Makefile | 2 | ||||
-rw-r--r-- | src/anet.c | 14 | ||||
-rw-r--r-- | src/anet.h | 1 | ||||
-rw-r--r-- | src/aof.c | 2 | ||||
-rw-r--r-- | src/config.c | 388 | ||||
-rw-r--r-- | src/db.c | 55 | ||||
-rw-r--r-- | src/debug.c | 2 | ||||
-rw-r--r-- | src/expire.c | 1 | ||||
-rw-r--r-- | src/module.c | 14 | ||||
-rw-r--r-- | src/networking.c | 83 | ||||
-rw-r--r-- | src/object.c | 4 | ||||
-rw-r--r-- | src/rdb.c | 40 | ||||
-rw-r--r-- | src/redis-check-rdb.c | 4 | ||||
-rw-r--r-- | src/redismodule.h | 4 | ||||
-rw-r--r-- | src/replication.c | 417 | ||||
-rw-r--r-- | src/rio.c | 120 | ||||
-rw-r--r-- | src/rio.h | 10 | ||||
-rw-r--r-- | src/server.c | 24 | ||||
-rw-r--r-- | src/server.h | 53 | ||||
-rw-r--r-- | src/tracking.c | 175 | ||||
-rw-r--r-- | tests/integration/replication-4.tcl | 9 | ||||
-rw-r--r-- | tests/integration/replication-psync.tcl | 40 | ||||
-rw-r--r-- | tests/integration/replication.tcl | 214 | ||||
-rw-r--r-- | tests/support/util.tcl | 12 | ||||
-rw-r--r-- | tests/unit/geo.tcl | 14 | ||||
-rw-r--r-- | tests/unit/scan.tcl | 45 |
27 files changed, 1214 insertions, 549 deletions
diff --git a/redis.conf b/redis.conf index 060510768..74b6c018f 100644 --- a/redis.conf +++ b/redis.conf @@ -377,6 +377,22 @@ repl-diskless-sync no # it entirely just set it to 0 seconds and the transfer will start ASAP. repl-diskless-sync-delay 5 +# Replica can load the rdb it reads from the replication link directly from the +# socket, or store the rdb to a file and read that file after it was completely +# recived from the master. +# In many cases the disk is slower than the network, and storing and loading +# the rdb file may increase replication time (and even increase the master's +# Copy on Write memory and salve buffers). +# However, parsing the rdb file directly from the socket may mean that we have +# to flush the contents of the current database before the full rdb was received. +# for this reason we have the following options: +# "disabled" - Don't use diskless load (store the rdb file to the disk first) +# "on-empty-db" - Use diskless load only when it is completely safe. +# "swapdb" - Keep a copy of the current db contents in RAM while parsing +# the data directly from the socket. note that this requires +# sufficient memory, if you don't have it, you risk an OOM kill. +repl-diskless-load disabled + # Replicas send PINGs to server in a predefined interval. It's possible to change # this interval with the repl_ping_replica_period option. The default value is 10 # seconds. diff --git a/src/Makefile b/src/Makefile index f35685eff..b6cc69e2f 100644 --- a/src/Makefile +++ b/src/Makefile @@ -164,7 +164,7 @@ endif REDIS_SERVER_NAME=redis-server REDIS_SENTINEL_NAME=redis-sentinel -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o acl.o gopher.o +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o acl.o gopher.o tracking.o REDIS_CLI_NAME=redis-cli REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o REDIS_BENCHMARK_NAME=redis-benchmark diff --git a/src/anet.c b/src/anet.c index 2981fca13..2088f4fb1 100644 --- a/src/anet.c +++ b/src/anet.c @@ -193,6 +193,20 @@ int anetSendTimeout(char *err, int fd, long long ms) { return ANET_OK; } +/* Set the socket receive timeout (SO_RCVTIMEO socket option) to the specified + * number of milliseconds, or disable it if the 'ms' argument is zero. */ +int anetRecvTimeout(char *err, int fd, long long ms) { + struct timeval tv; + + tv.tv_sec = ms/1000; + tv.tv_usec = (ms%1000)*1000; + if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) { + anetSetError(err, "setsockopt SO_RCVTIMEO: %s", strerror(errno)); + return ANET_ERR; + } + return ANET_OK; +} + /* anetGenericResolve() is called by anetResolve() and anetResolveIP() to * do the actual work. It resolves the hostname "host" and set the string * representation of the IP address into the buffer pointed by "ipbuf". diff --git a/src/anet.h b/src/anet.h index 7142f78d2..dd735240d 100644 --- a/src/anet.h +++ b/src/anet.h @@ -70,6 +70,7 @@ int anetEnableTcpNoDelay(char *err, int fd); int anetDisableTcpNoDelay(char *err, int fd); int anetTcpKeepAlive(char *err, int fd); int anetSendTimeout(char *err, int fd, long long ms); +int anetRecvTimeout(char *err, int fd, long long ms); int anetPeerToString(int fd, char *ip, size_t ip_len, int *port); int anetKeepAlive(char *err, int fd, int interval); int anetSockName(int fd, char *ip, size_t ip_len, int *port); @@ -729,7 +729,7 @@ int loadAppendOnlyFile(char *filename) { server.aof_state = AOF_OFF; fakeClient = createFakeClient(); - startLoading(fp); + startLoadingFile(fp, filename); /* Check if this AOF file has an RDB preamble. In that case we need to * load the RDB file and later continue loading the AOF tail. */ diff --git a/src/config.c b/src/config.c index 7f0e9af89..fde00ddf5 100644 --- a/src/config.c +++ b/src/config.c @@ -91,6 +91,13 @@ configEnum aof_fsync_enum[] = { {NULL, 0} }; +configEnum repl_diskless_load_enum[] = { + {"disabled", REPL_DISKLESS_LOAD_DISABLED}, + {"on-empty-db", REPL_DISKLESS_LOAD_WHEN_DB_EMPTY}, + {"swapdb", REPL_DISKLESS_LOAD_SWAPDB}, + {NULL, 0} +}; + /* Output buffer limits presets. */ clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = { {0, 0, 0}, /* normal */ @@ -98,6 +105,48 @@ clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = { {1024*1024*32, 1024*1024*8, 60} /* pubsub */ }; +/* Configuration values that require no special handling to set, get, load or + * rewrite. */ +typedef struct configYesNo { + const char *name; /* The user visible name of this config */ + const char *alias; /* An alias that can also be used for this config */ + int *config; /* The pointer to the server config this value is stored in */ + const int modifiable; /* Can this value be updated by CONFIG SET? */ + const int default_value; /* The default value of the config on rewrite */ +} configYesNo; + +configYesNo configs_yesno[] = { + /* Non-Modifiable */ + {"rdbchecksum",NULL,&server.rdb_checksum,0,CONFIG_DEFAULT_RDB_CHECKSUM}, + {"daemonize",NULL,&server.daemonize,0,0}, + {"io-threads-do-reads",NULL,&server.io_threads_do_reads, 0, CONFIG_DEFAULT_IO_THREADS_DO_READS}, + {"always-show-logo",NULL,&server.always_show_logo,0,CONFIG_DEFAULT_ALWAYS_SHOW_LOGO}, + /* Modifiable */ + {"protected-mode",NULL,&server.protected_mode,1,CONFIG_DEFAULT_PROTECTED_MODE}, + {"rdbcompression",NULL,&server.rdb_compression,1,CONFIG_DEFAULT_RDB_COMPRESSION}, + {"activerehashing",NULL,&server.activerehashing,1,CONFIG_DEFAULT_ACTIVE_REHASHING}, + {"stop-writes-on-bgsave-error",NULL,&server.stop_writes_on_bgsave_err,1,CONFIG_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR}, + {"dynamic-hz",NULL,&server.dynamic_hz,1,CONFIG_DEFAULT_DYNAMIC_HZ}, + {"lazyfree-lazy-eviction",NULL,&server.lazyfree_lazy_eviction,1,CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION}, + {"lazyfree-lazy-expire",NULL,&server.lazyfree_lazy_expire,1,CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE}, + {"lazyfree-lazy-server-del",NULL,&server.lazyfree_lazy_server_del,1,CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL}, + {"repl-disable-tcp-nodelay",NULL,&server.repl_disable_tcp_nodelay,1,CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY}, + {"repl-diskless-sync",NULL,&server.repl_diskless_sync,1,CONFIG_DEFAULT_REPL_DISKLESS_SYNC}, + {"gopher-enabled",NULL,&server.gopher_enabled,1,CONFIG_DEFAULT_GOPHER_ENABLED}, + {"aof-rewrite-incremental-fsync",NULL,&server.aof_rewrite_incremental_fsync,1,CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC}, + {"no-appendfsync-on-rewrite",NULL,&server.aof_no_fsync_on_rewrite,1,CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE}, + {"cluster-require-full-coverage",NULL,&server.cluster_require_full_coverage,CLUSTER_DEFAULT_REQUIRE_FULL_COVERAGE}, + {"rdb-save-incremental-fsync",NULL,&server.rdb_save_incremental_fsync,1,CONFIG_DEFAULT_RDB_SAVE_INCREMENTAL_FSYNC}, + {"aof-load-truncated",NULL,&server.aof_load_truncated,1,CONFIG_DEFAULT_AOF_LOAD_TRUNCATED}, + {"aof-use-rdb-preamble",NULL,&server.aof_use_rdb_preamble,1,CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE}, + {"cluster-replica-no-failover","cluster-slave-no-failover",&server.cluster_slave_no_failover,1,CLUSTER_DEFAULT_SLAVE_NO_FAILOVER}, + {"replica-lazy-flush","slave-lazy-flush",&server.repl_slave_lazy_flush,1,CONFIG_DEFAULT_SLAVE_LAZY_FLUSH}, + {"replica-serve-stale-data","slave-serve-stale-data",&server.repl_serve_stale_data,1,CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA}, + {"replica-read-only","slave-read-only",&server.repl_slave_ro,1,CONFIG_DEFAULT_SLAVE_READ_ONLY}, + {"replica-ignore-maxmemory","slave-ignore-maxmemory",&server.repl_slave_ignore_maxmemory,1,CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY}, + {NULL, NULL, 0, 0} +}; + /*----------------------------------------------------------------------------- * Enum access functions *----------------------------------------------------------------------------*/ @@ -201,6 +250,26 @@ void loadServerConfigFromString(char *config) { } sdstolower(argv[0]); + /* Iterate the configs that are standard */ + int match = 0; + for (configYesNo *config = configs_yesno; config->name != NULL; config++) { + if ((!strcasecmp(argv[0],config->name) || + (config->alias && !strcasecmp(argv[0],config->alias))) && + (argc == 2)) + { + if ((*(config->config) = yesnotoi(argv[1])) == -1) { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } + match = 1; + break; + } + } + + if (match) { + sdsfreesplitres(argv,argc); + continue; + } + /* Execute config directives */ if (!strcasecmp(argv[0],"timeout") && argc == 2) { server.maxidletime = atoi(argv[1]); @@ -212,14 +281,6 @@ void loadServerConfigFromString(char *config) { if (server.tcpkeepalive < 0) { err = "Invalid tcp-keepalive value"; goto loaderr; } - } else if (!strcasecmp(argv[0],"protected-mode") && argc == 2) { - if ((server.protected_mode = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } - } else if (!strcasecmp(argv[0],"gopher-enabled") && argc == 2) { - if ((server.gopher_enabled = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } } else if (!strcasecmp(argv[0],"port") && argc == 2) { server.port = atoi(argv[1]); if (server.port < 0 || server.port > 65535) { @@ -290,10 +351,6 @@ void loadServerConfigFromString(char *config) { } else if (!strcasecmp(argv[0],"aclfile") && argc == 2) { zfree(server.acl_filename); server.acl_filename = zstrdup(argv[1]); - } else if (!strcasecmp(argv[0],"always-show-logo") && argc == 2) { - if ((server.always_show_logo = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } } else if (!strcasecmp(argv[0],"syslog-enabled") && argc == 2) { if ((server.syslog_enabled = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; @@ -318,10 +375,6 @@ void loadServerConfigFromString(char *config) { if (server.io_threads_num < 1 || server.io_threads_num > 512) { err = "Invalid number of I/O threads"; goto loaderr; } - } else if (!strcasecmp(argv[0],"io-threads-do-reads") && argc == 2) { - if ((server.io_threads_do_reads = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } } else if (!strcasecmp(argv[0],"include") && argc == 2) { loadServerConfig(argv[1],NULL); } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) { @@ -381,13 +434,10 @@ void loadServerConfigFromString(char *config) { err = "repl-timeout must be 1 or greater"; goto loaderr; } - } else if (!strcasecmp(argv[0],"repl-disable-tcp-nodelay") && argc==2) { - if ((server.repl_disable_tcp_nodelay = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } - } else if (!strcasecmp(argv[0],"repl-diskless-sync") && argc==2) { - if ((server.repl_diskless_sync = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; + } else if (!strcasecmp(argv[0],"repl-diskless-load") && argc==2) { + server.repl_diskless_load = configEnumGetValue(repl_diskless_load_enum,argv[1]); + if (server.repl_diskless_load == INT_MIN) { + err = "argument must be 'disabled', 'on-empty-db', 'swapdb' or 'flushdb'"; } } else if (!strcasecmp(argv[0],"repl-diskless-sync-delay") && argc==2) { server.repl_diskless_sync_delay = atoi(argv[1]); @@ -414,57 +464,6 @@ void loadServerConfigFromString(char *config) { } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) { zfree(server.masterauth); server.masterauth = argv[1][0] ? zstrdup(argv[1]) : NULL; - } else if ((!strcasecmp(argv[0],"slave-serve-stale-data") || - !strcasecmp(argv[0],"replica-serve-stale-data")) - && argc == 2) - { - if ((server.repl_serve_stale_data = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } - } else if ((!strcasecmp(argv[0],"slave-read-only") || - !strcasecmp(argv[0],"replica-read-only")) - && argc == 2) - { - if ((server.repl_slave_ro = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } - } else if ((!strcasecmp(argv[0],"slave-ignore-maxmemory") || - !strcasecmp(argv[0],"replica-ignore-maxmemory")) - && argc == 2) - { - if ((server.repl_slave_ignore_maxmemory = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } - } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) { - if ((server.rdb_compression = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } - } else if (!strcasecmp(argv[0],"rdbchecksum") && argc == 2) { - if ((server.rdb_checksum = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } - } else if (!strcasecmp(argv[0],"activerehashing") && argc == 2) { - if ((server.activerehashing = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } - } else if (!strcasecmp(argv[0],"lazyfree-lazy-eviction") && argc == 2) { - if ((server.lazyfree_lazy_eviction = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } - } else if (!strcasecmp(argv[0],"lazyfree-lazy-expire") && argc == 2) { - if ((server.lazyfree_lazy_expire = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } - } else if (!strcasecmp(argv[0],"lazyfree-lazy-server-del") && argc == 2){ - if ((server.lazyfree_lazy_server_del = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } - } else if ((!strcasecmp(argv[0],"slave-lazy-flush") || - !strcasecmp(argv[0],"replica-lazy-flush")) && argc == 2) - { - if ((server.repl_slave_lazy_flush = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } } else if (!strcasecmp(argv[0],"activedefrag") && argc == 2) { if ((server.active_defrag_enabled = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; @@ -474,25 +473,15 @@ void loadServerConfigFromString(char *config) { err = "active defrag can't be enabled without proper jemalloc support"; goto loaderr; #endif } - } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) { - if ((server.daemonize = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } - } else if (!strcasecmp(argv[0],"dynamic-hz") && argc == 2) { - if ((server.dynamic_hz = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } } else if (!strcasecmp(argv[0],"hz") && argc == 2) { server.config_hz = atoi(argv[1]); if (server.config_hz < CONFIG_MIN_HZ) server.config_hz = CONFIG_MIN_HZ; if (server.config_hz > CONFIG_MAX_HZ) server.config_hz = CONFIG_MAX_HZ; } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) { - int yes; - - if ((yes = yesnotoi(argv[1])) == -1) { + if ((server.aof_enabled = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } - server.aof_state = yes ? AOF_ON : AOF_OFF; + server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF; } else if (!strcasecmp(argv[0],"appendfilename") && argc == 2) { if (!pathIsBaseName(argv[1])) { err = "appendfilename can't be a path, just a filename"; @@ -500,11 +489,6 @@ void loadServerConfigFromString(char *config) { } zfree(server.aof_filename); server.aof_filename = zstrdup(argv[1]); - } else if (!strcasecmp(argv[0],"no-appendfsync-on-rewrite") - && argc == 2) { - if ((server.aof_no_fsync_on_rewrite= yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) { server.aof_fsync = configEnumGetValue(aof_fsync_enum,argv[1]); if (server.aof_fsync == INT_MIN) { @@ -523,27 +507,11 @@ void loadServerConfigFromString(char *config) { argc == 2) { server.aof_rewrite_min_size = memtoll(argv[1],NULL); - } else if (!strcasecmp(argv[0],"aof-rewrite-incremental-fsync") && - argc == 2) - { - if ((server.aof_rewrite_incremental_fsync = - yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } - } else if (!strcasecmp(argv[0],"rdb-save-incremental-fsync") && - argc == 2) - { - if ((server.rdb_save_incremental_fsync = - yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } - } else if (!strcasecmp(argv[0],"aof-load-truncated") && argc == 2) { - if ((server.aof_load_truncated = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } - } else if (!strcasecmp(argv[0],"aof-use-rdb-preamble") && argc == 2) { - if ((server.aof_use_rdb_preamble = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; + } else if (!strcasecmp(argv[0],"rdb-key-save-delay") && argc==2) { + server.rdb_key_save_delay = atoi(argv[1]); + if (server.rdb_key_save_delay < 0) { + err = "rdb-key-save-delay can't be negative"; + goto loaderr; } } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) { if (strlen(argv[1]) > CONFIG_AUTHPASS_MAX_LEN) { @@ -678,13 +646,6 @@ void loadServerConfigFromString(char *config) { { err = "Invalid port"; goto loaderr; } - } else if (!strcasecmp(argv[0],"cluster-require-full-coverage") && - argc == 2) - { - if ((server.cluster_require_full_coverage = yesnotoi(argv[1])) == -1) - { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } } else if (!strcasecmp(argv[0],"cluster-node-timeout") && argc == 2) { server.cluster_node_timeout = strtoll(argv[1],NULL,10); if (server.cluster_node_timeout <= 0) { @@ -707,15 +668,6 @@ void loadServerConfigFromString(char *config) { err = "cluster replica validity factor must be zero or positive"; goto loaderr; } - } else if ((!strcasecmp(argv[0],"cluster-slave-no-failover") || - !strcasecmp(argv[0],"cluster-replica-no-failover")) && - argc == 2) - { - server.cluster_slave_no_failover = yesnotoi(argv[1]); - if (server.cluster_slave_no_failover == -1) { - err = "argument must be 'yes' or 'no'"; - goto loaderr; - } } else if (!strcasecmp(argv[0],"lua-time-limit") && argc == 2) { server.lua_time_limit = strtoll(argv[1],NULL,10); } else if (!strcasecmp(argv[0],"lua-replicate-commands") && argc == 2) { @@ -756,11 +708,6 @@ void loadServerConfigFromString(char *config) { server.client_obuf_limits[class].hard_limit_bytes = hard; server.client_obuf_limits[class].soft_limit_bytes = soft; server.client_obuf_limits[class].soft_limit_seconds = soft_seconds; - } else if (!strcasecmp(argv[0],"stop-writes-on-bgsave-error") && - argc == 2) { - if ((server.stop_writes_on_bgsave_err = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } } else if ((!strcasecmp(argv[0],"slave-priority") || !strcasecmp(argv[0],"replica-priority")) && argc == 2) { @@ -941,6 +888,19 @@ void configSetCommand(client *c) { serverAssertWithInfo(c,c->argv[3],sdsEncodedObject(c->argv[3])); o = c->argv[3]; + /* Iterate the configs that are standard */ + for (configYesNo *config = configs_yesno; config->name != NULL; config++) { + if(config->modifiable && (!strcasecmp(c->argv[2]->ptr,config->name) || + (config->alias && !strcasecmp(c->argv[2]->ptr,config->alias)))) + { + int yn = yesnotoi(o->ptr); + if (yn == -1) goto badfmt; + *(config->config) = yn; + addReply(c,shared.ok); + return; + } + } + if (0) { /* this starts the config_set macros else-if chain. */ /* Special fields that can't be handled with general macros. */ @@ -998,6 +958,7 @@ void configSetCommand(client *c) { int enable = yesnotoi(o->ptr); if (enable == -1) goto badfmt; + server.aof_enabled = enable; if (enable == 0 && server.aof_state != AOF_OFF) { stopAppendOnly(); } else if (enable && server.aof_state == AOF_OFF) { @@ -1106,40 +1067,6 @@ void configSetCommand(client *c) { /* Boolean fields. * config_set_bool_field(name,var). */ } config_set_bool_field( - "rdbcompression", server.rdb_compression) { - } config_set_bool_field( - "repl-disable-tcp-nodelay",server.repl_disable_tcp_nodelay) { - } config_set_bool_field( - "repl-diskless-sync",server.repl_diskless_sync) { - } config_set_bool_field( - "cluster-require-full-coverage",server.cluster_require_full_coverage) { - } config_set_bool_field( - "cluster-slave-no-failover",server.cluster_slave_no_failover) { - } config_set_bool_field( - "cluster-replica-no-failover",server.cluster_slave_no_failover) { - } config_set_bool_field( - "aof-rewrite-incremental-fsync",server.aof_rewrite_incremental_fsync) { - } config_set_bool_field( - "rdb-save-incremental-fsync",server.rdb_save_incremental_fsync) { - } config_set_bool_field( - "aof-load-truncated",server.aof_load_truncated) { - } config_set_bool_field( - "aof-use-rdb-preamble",server.aof_use_rdb_preamble) { - } config_set_bool_field( - "slave-serve-stale-data",server.repl_serve_stale_data) { - } config_set_bool_field( - "replica-serve-stale-data",server.repl_serve_stale_data) { - } config_set_bool_field( - "slave-read-only",server.repl_slave_ro) { - } config_set_bool_field( - "replica-read-only",server.repl_slave_ro) { - } config_set_bool_field( - "slave-ignore-maxmemory",server.repl_slave_ignore_maxmemory) { - } config_set_bool_field( - "replica-ignore-maxmemory",server.repl_slave_ignore_maxmemory) { - } config_set_bool_field( - "activerehashing",server.activerehashing) { - } config_set_bool_field( "activedefrag",server.active_defrag_enabled) { #ifndef HAVE_DEFRAG if (server.active_defrag_enabled) { @@ -1152,27 +1079,6 @@ void configSetCommand(client *c) { return; } #endif - } config_set_bool_field( - "protected-mode",server.protected_mode) { - } config_set_bool_field( - "gopher-enabled",server.gopher_enabled) { - } config_set_bool_field( - "stop-writes-on-bgsave-error",server.stop_writes_on_bgsave_err) { - } config_set_bool_field( - "lazyfree-lazy-eviction",server.lazyfree_lazy_eviction) { - } config_set_bool_field( - "lazyfree-lazy-expire",server.lazyfree_lazy_expire) { - } config_set_bool_field( - "lazyfree-lazy-server-del",server.lazyfree_lazy_server_del) { - } config_set_bool_field( - "slave-lazy-flush",server.repl_slave_lazy_flush) { - } config_set_bool_field( - "replica-lazy-flush",server.repl_slave_lazy_flush) { - } config_set_bool_field( - "no-appendfsync-on-rewrite",server.aof_no_fsync_on_rewrite) { - } config_set_bool_field( - "dynamic-hz",server.dynamic_hz) { - /* Numerical fields. * config_set_numerical_field(name,var,min,max) */ } config_set_numerical_field( @@ -1244,6 +1150,8 @@ void configSetCommand(client *c) { } config_set_numerical_field( "replica-priority",server.slave_priority,0,INT_MAX) { } config_set_numerical_field( + "rdb-key-save-delay",server.rdb_key_save_delay,0,LLONG_MAX) { + } config_set_numerical_field( "slave-announce-port",server.slave_announce_port,0,65535) { } config_set_numerical_field( "replica-announce-port",server.slave_announce_port,0,65535) { @@ -1310,6 +1218,8 @@ void configSetCommand(client *c) { "maxmemory-policy",server.maxmemory_policy,maxmemory_policy_enum) { } config_set_enum_field( "appendfsync",server.aof_fsync,aof_fsync_enum) { + } config_set_enum_field( + "repl-diskless-load",server.repl_diskless_load,repl_diskless_load_enum) { /* Everyhing else is an error... */ } config_set_else { @@ -1457,63 +1367,19 @@ void configGetCommand(client *c) { config_get_numerical_field("cluster-slave-validity-factor",server.cluster_slave_validity_factor); config_get_numerical_field("cluster-replica-validity-factor",server.cluster_slave_validity_factor); config_get_numerical_field("repl-diskless-sync-delay",server.repl_diskless_sync_delay); + config_get_numerical_field("rdb-key-save-delay",server.rdb_key_save_delay); config_get_numerical_field("tcp-keepalive",server.tcpkeepalive); /* Bool (yes/no) values */ - config_get_bool_field("cluster-require-full-coverage", - server.cluster_require_full_coverage); - config_get_bool_field("cluster-slave-no-failover", - server.cluster_slave_no_failover); - config_get_bool_field("cluster-replica-no-failover", - server.cluster_slave_no_failover); - config_get_bool_field("no-appendfsync-on-rewrite", - server.aof_no_fsync_on_rewrite); - config_get_bool_field("slave-serve-stale-data", - server.repl_serve_stale_data); - config_get_bool_field("replica-serve-stale-data", - server.repl_serve_stale_data); - config_get_bool_field("slave-read-only", - server.repl_slave_ro); - config_get_bool_field("replica-read-only", - server.repl_slave_ro); - config_get_bool_field("slave-ignore-maxmemory", - server.repl_slave_ignore_maxmemory); - config_get_bool_field("replica-ignore-maxmemory", - server.repl_slave_ignore_maxmemory); - config_get_bool_field("stop-writes-on-bgsave-error", - server.stop_writes_on_bgsave_err); - config_get_bool_field("daemonize", server.daemonize); - config_get_bool_field("rdbcompression", server.rdb_compression); - config_get_bool_field("rdbchecksum", server.rdb_checksum); - config_get_bool_field("activerehashing", server.activerehashing); + /* Iterate the configs that are standard */ + for (configYesNo *config = configs_yesno; config->name != NULL; config++) { + config_get_bool_field(config->name, *(config->config)); + if (config->alias) { + config_get_bool_field(config->alias, *(config->config)); + } + } + config_get_bool_field("activedefrag", server.active_defrag_enabled); - config_get_bool_field("protected-mode", server.protected_mode); - config_get_bool_field("gopher-enabled", server.gopher_enabled); - config_get_bool_field("io-threads-do-reads", server.io_threads_do_reads); - config_get_bool_field("repl-disable-tcp-nodelay", - server.repl_disable_tcp_nodelay); - config_get_bool_field("repl-diskless-sync", - server.repl_diskless_sync); - config_get_bool_field("aof-rewrite-incremental-fsync", - server.aof_rewrite_incremental_fsync); - config_get_bool_field("rdb-save-incremental-fsync", - server.rdb_save_incremental_fsync); - config_get_bool_field("aof-load-truncated", - server.aof_load_truncated); - config_get_bool_field("aof-use-rdb-preamble", - server.aof_use_rdb_preamble); - config_get_bool_field("lazyfree-lazy-eviction", - server.lazyfree_lazy_eviction); - config_get_bool_field("lazyfree-lazy-expire", - server.lazyfree_lazy_expire); - config_get_bool_field("lazyfree-lazy-server-del", - server.lazyfree_lazy_server_del); - config_get_bool_field("slave-lazy-flush", - server.repl_slave_lazy_flush); - config_get_bool_field("replica-lazy-flush", - server.repl_slave_lazy_flush); - config_get_bool_field("dynamic-hz", - server.dynamic_hz); /* Enum values */ config_get_enum_field("maxmemory-policy", @@ -1526,12 +1392,14 @@ void configGetCommand(client *c) { server.aof_fsync,aof_fsync_enum); config_get_enum_field("syslog-facility", server.syslog_facility,syslog_facility_enum); + config_get_enum_field("repl-diskless-load", + server.repl_diskless_load,repl_diskless_load_enum); /* Everything we can't handle with macros follows. */ if (stringmatch(pattern,"appendonly",1)) { addReplyBulkCString(c,"appendonly"); - addReplyBulkCString(c,server.aof_state == AOF_OFF ? "no" : "yes"); + addReplyBulkCString(c,server.aof_enabled ? "yes" : "no"); matches++; } if (stringmatch(pattern,"dir",1)) { @@ -1858,7 +1726,7 @@ void rewriteConfigBytesOption(struct rewriteConfigState *state, char *option, lo } /* Rewrite a yes/no option. */ -void rewriteConfigYesNoOption(struct rewriteConfigState *state, char *option, int value, int defvalue) { +void rewriteConfigYesNoOption(struct rewriteConfigState *state, const char *option, int value, int defvalue) { int force = value != defvalue; sds line = sdscatprintf(sdsempty(),"%s %s",option, value ? "yes" : "no"); @@ -2228,7 +2096,11 @@ int rewriteConfig(char *path) { /* Step 2: rewrite every single option, replacing or appending it inside * the rewrite state. */ - rewriteConfigYesNoOption(state,"daemonize",server.daemonize,0); + /* Iterate the configs that are standard */ + for (configYesNo *config = configs_yesno; config->name != NULL; config++) { + rewriteConfigYesNoOption(state,config->name,*(config->config),config->default_value); + } + rewriteConfigStringOption(state,"pidfile",server.pidfile,CONFIG_DEFAULT_PID_FILE); rewriteConfigNumericalOption(state,"port",server.port,CONFIG_DEFAULT_SERVER_PORT); rewriteConfigNumericalOption(state,"cluster-announce-port",server.cluster_announce_port,CONFIG_DEFAULT_CLUSTER_ANNOUNCE_PORT); @@ -2250,9 +2122,6 @@ int rewriteConfig(char *path) { rewriteConfigUserOption(state); rewriteConfigNumericalOption(state,"databases",server.dbnum,CONFIG_DEFAULT_DBNUM); rewriteConfigNumericalOption(state,"io-threads",server.dbnum,CONFIG_DEFAULT_IO_THREADS_NUM); - rewriteConfigYesNoOption(state,"stop-writes-on-bgsave-error",server.stop_writes_on_bgsave_err,CONFIG_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR); - rewriteConfigYesNoOption(state,"rdbcompression",server.rdb_compression,CONFIG_DEFAULT_RDB_COMPRESSION); - rewriteConfigYesNoOption(state,"rdbchecksum",server.rdb_checksum,CONFIG_DEFAULT_RDB_CHECKSUM); rewriteConfigStringOption(state,"dbfilename",server.rdb_filename,CONFIG_DEFAULT_RDB_FILENAME); rewriteConfigDirOption(state); rewriteConfigSlaveofOption(state,"replicaof"); @@ -2260,15 +2129,11 @@ int rewriteConfig(char *path) { rewriteConfigStringOption(state,"masteruser",server.masteruser,NULL); rewriteConfigStringOption(state,"masterauth",server.masterauth,NULL); rewriteConfigStringOption(state,"cluster-announce-ip",server.cluster_announce_ip,NULL); - rewriteConfigYesNoOption(state,"replica-serve-stale-data",server.repl_serve_stale_data,CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA); - rewriteConfigYesNoOption(state,"replica-read-only",server.repl_slave_ro,CONFIG_DEFAULT_SLAVE_READ_ONLY); - rewriteConfigYesNoOption(state,"replica-ignore-maxmemory",server.repl_slave_ignore_maxmemory,CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY); rewriteConfigNumericalOption(state,"repl-ping-replica-period",server.repl_ping_slave_period,CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD); rewriteConfigNumericalOption(state,"repl-timeout",server.repl_timeout,CONFIG_DEFAULT_REPL_TIMEOUT); rewriteConfigBytesOption(state,"repl-backlog-size",server.repl_backlog_size,CONFIG_DEFAULT_REPL_BACKLOG_SIZE); rewriteConfigBytesOption(state,"repl-backlog-ttl",server.repl_backlog_time_limit,CONFIG_DEFAULT_REPL_BACKLOG_TIME_LIMIT); - rewriteConfigYesNoOption(state,"repl-disable-tcp-nodelay",server.repl_disable_tcp_nodelay,CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY); - rewriteConfigYesNoOption(state,"repl-diskless-sync",server.repl_diskless_sync,CONFIG_DEFAULT_REPL_DISKLESS_SYNC); + rewriteConfigEnumOption(state,"repl-diskless-load",server.repl_diskless_load,repl_diskless_load_enum,CONFIG_DEFAULT_REPL_DISKLESS_LOAD); rewriteConfigNumericalOption(state,"repl-diskless-sync-delay",server.repl_diskless_sync_delay,CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY); rewriteConfigNumericalOption(state,"replica-priority",server.slave_priority,CONFIG_DEFAULT_SLAVE_PRIORITY); rewriteConfigNumericalOption(state,"min-replicas-to-write",server.repl_min_slaves_to_write,CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE); @@ -2288,17 +2153,14 @@ int rewriteConfig(char *path) { rewriteConfigNumericalOption(state,"active-defrag-cycle-min",server.active_defrag_cycle_min,CONFIG_DEFAULT_DEFRAG_CYCLE_MIN); rewriteConfigNumericalOption(state,"active-defrag-cycle-max",server.active_defrag_cycle_max,CONFIG_DEFAULT_DEFRAG_CYCLE_MAX); rewriteConfigNumericalOption(state,"active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS); - rewriteConfigYesNoOption(state,"appendonly",server.aof_state != AOF_OFF,0); + rewriteConfigYesNoOption(state,"appendonly",server.aof_enabled,0); rewriteConfigStringOption(state,"appendfilename",server.aof_filename,CONFIG_DEFAULT_AOF_FILENAME); rewriteConfigEnumOption(state,"appendfsync",server.aof_fsync,aof_fsync_enum,CONFIG_DEFAULT_AOF_FSYNC); - rewriteConfigYesNoOption(state,"no-appendfsync-on-rewrite",server.aof_no_fsync_on_rewrite,CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE); rewriteConfigNumericalOption(state,"auto-aof-rewrite-percentage",server.aof_rewrite_perc,AOF_REWRITE_PERC); rewriteConfigBytesOption(state,"auto-aof-rewrite-min-size",server.aof_rewrite_min_size,AOF_REWRITE_MIN_SIZE); rewriteConfigNumericalOption(state,"lua-time-limit",server.lua_time_limit,LUA_SCRIPT_TIME_LIMIT); rewriteConfigYesNoOption(state,"cluster-enabled",server.cluster_enabled,0); rewriteConfigStringOption(state,"cluster-config-file",server.cluster_configfile,CONFIG_DEFAULT_CLUSTER_CONFIG_FILE); - rewriteConfigYesNoOption(state,"cluster-require-full-coverage",server.cluster_require_full_coverage,CLUSTER_DEFAULT_REQUIRE_FULL_COVERAGE); - rewriteConfigYesNoOption(state,"cluster-replica-no-failover",server.cluster_slave_no_failover,CLUSTER_DEFAULT_SLAVE_NO_FAILOVER); rewriteConfigNumericalOption(state,"cluster-node-timeout",server.cluster_node_timeout,CLUSTER_DEFAULT_NODE_TIMEOUT); rewriteConfigNumericalOption(state,"cluster-migration-barrier",server.cluster_migration_barrier,CLUSTER_DEFAULT_MIGRATION_BARRIER); rewriteConfigNumericalOption(state,"cluster-replica-validity-factor",server.cluster_slave_validity_factor,CLUSTER_DEFAULT_SLAVE_VALIDITY); @@ -2316,23 +2178,11 @@ int rewriteConfig(char *path) { rewriteConfigNumericalOption(state,"zset-max-ziplist-entries",server.zset_max_ziplist_entries,OBJ_ZSET_MAX_ZIPLIST_ENTRIES); rewriteConfigNumericalOption(state,"zset-max-ziplist-value",server.zset_max_ziplist_value,OBJ_ZSET_MAX_ZIPLIST_VALUE); rewriteConfigNumericalOption(state,"hll-sparse-max-bytes",server.hll_sparse_max_bytes,CONFIG_DEFAULT_HLL_SPARSE_MAX_BYTES); - rewriteConfigYesNoOption(state,"activerehashing",server.activerehashing,CONFIG_DEFAULT_ACTIVE_REHASHING); rewriteConfigYesNoOption(state,"activedefrag",server.active_defrag_enabled,CONFIG_DEFAULT_ACTIVE_DEFRAG); - rewriteConfigYesNoOption(state,"protected-mode",server.protected_mode,CONFIG_DEFAULT_PROTECTED_MODE); - rewriteConfigYesNoOption(state,"gopher-enabled",server.gopher_enabled,CONFIG_DEFAULT_GOPHER_ENABLED); - rewriteConfigYesNoOption(state,"io-threads-do-reads",server.io_threads_do_reads,CONFIG_DEFAULT_IO_THREADS_DO_READS); rewriteConfigClientoutputbufferlimitOption(state); rewriteConfigNumericalOption(state,"hz",server.config_hz,CONFIG_DEFAULT_HZ); - rewriteConfigYesNoOption(state,"aof-rewrite-incremental-fsync",server.aof_rewrite_incremental_fsync,CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC); - rewriteConfigYesNoOption(state,"rdb-save-incremental-fsync",server.rdb_save_incremental_fsync,CONFIG_DEFAULT_RDB_SAVE_INCREMENTAL_FSYNC); - rewriteConfigYesNoOption(state,"aof-load-truncated",server.aof_load_truncated,CONFIG_DEFAULT_AOF_LOAD_TRUNCATED); - rewriteConfigYesNoOption(state,"aof-use-rdb-preamble",server.aof_use_rdb_preamble,CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE); rewriteConfigEnumOption(state,"supervised",server.supervised_mode,supervised_mode_enum,SUPERVISED_NONE); - rewriteConfigYesNoOption(state,"lazyfree-lazy-eviction",server.lazyfree_lazy_eviction,CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION); - rewriteConfigYesNoOption(state,"lazyfree-lazy-expire",server.lazyfree_lazy_expire,CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE); - rewriteConfigYesNoOption(state,"lazyfree-lazy-server-del",server.lazyfree_lazy_server_del,CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL); - rewriteConfigYesNoOption(state,"replica-lazy-flush",server.repl_slave_lazy_flush,CONFIG_DEFAULT_SLAVE_LAZY_FLUSH); - rewriteConfigYesNoOption(state,"dynamic-hz",server.dynamic_hz,CONFIG_DEFAULT_DYNAMIC_HZ); + rewriteConfigNumericalOption(state,"rdb-key-save-delay",server.rdb_key_save_delay,CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY); /* Rewrite Sentinel config if in Sentinel mode. */ if (server.sentinel_mode) rewriteConfigSentinelOption(state); @@ -344,7 +344,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { * On success the fuction returns the number of keys removed from the * database(s). Otherwise -1 is returned in the specific case the * DB number is out of range, and errno is set to EINVAL. */ -long long emptyDb(int dbnum, int flags, void(callback)(void*)) { +long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)) { int async = (flags & EMPTYDB_ASYNC); long long removed = 0; @@ -362,12 +362,12 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { } for (int j = startdb; j <= enddb; j++) { - removed += dictSize(server.db[j].dict); + removed += dictSize(dbarray[j].dict); if (async) { - emptyDbAsync(&server.db[j]); + emptyDbAsync(&dbarray[j]); } else { - dictEmpty(server.db[j].dict,callback); - dictEmpty(server.db[j].expires,callback); + dictEmpty(dbarray[j].dict,callback); + dictEmpty(dbarray[j].expires,callback); } } if (server.cluster_enabled) { @@ -381,6 +381,10 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { return removed; } +long long emptyDb(int dbnum, int flags, void(callback)(void*)) { + return emptyDbGeneric(server.db, dbnum, flags, callback); +} + int selectDb(client *c, int id) { if (id < 0 || id >= server.dbnum) return C_ERR; @@ -388,6 +392,15 @@ int selectDb(client *c, int id) { return C_OK; } +long long dbTotalServerKeyCount() { + long long total = 0; + int j; + for (j = 0; j < server.dbnum; j++) { + total += dictSize(server.db[j].dict); + } + return total; +} + /*----------------------------------------------------------------------------- * Hooks for key space changes. * @@ -399,6 +412,7 @@ int selectDb(client *c, int id) { void signalModifiedKey(redisDb *db, robj *key) { touchWatchedKey(db,key); + if (server.tracking_clients) trackingInvalidateKey(key); } void signalFlushedDb(int dbid) { @@ -613,7 +627,7 @@ int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor) { } /* This command implements SCAN, HSCAN and SSCAN commands. - * If object 'o' is passed, then it must be a Hash or Set object, otherwise + * If object 'o' is passed, then it must be a Hash, Set or Zset object, otherwise * if 'o' is NULL the command will operate on the dictionary associated with * the current database. * @@ -629,6 +643,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) { listNode *node, *nextnode; long count = 10; sds pat = NULL; + sds typename = NULL; int patlen = 0, use_pattern = 0; dict *ht; @@ -665,6 +680,10 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) { use_pattern = !(pat[0] == '*' && patlen == 1); i += 2; + } else if (!strcasecmp(c->argv[i]->ptr, "type") && o == NULL && j >= 2) { + /* SCAN for a particular type only applies to the db dict */ + typename = c->argv[i+1]->ptr; + i+= 2; } else { addReply(c,shared.syntaxerr); goto cleanup; @@ -759,6 +778,13 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) { } } + /* Filter an element if it isn't the type we want. */ + if (!filter && o == NULL && typename){ + robj* typecheck = lookupKeyReadWithFlags(c->db, kobj, LOOKUP_NOTOUCH); + char* type = getObjectTypeName(typecheck); + if (strcasecmp((char*) typename, type)) filter = 1; + } + /* Filter element if it is an expired key. */ if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1; @@ -815,11 +841,8 @@ void lastsaveCommand(client *c) { addReplyLongLong(c,server.lastsave); } -void typeCommand(client *c) { - robj *o; - char *type; - - o = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH); +char* getObjectTypeName(robj *o) { + char* type; if (o == NULL) { type = "none"; } else { @@ -837,7 +860,13 @@ void typeCommand(client *c) { default: type = "unknown"; break; } } - addReplyStatus(c,type); + return type; +} + +void typeCommand(client *c) { + robj *o; + o = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH); + addReplyStatus(c, getObjectTypeName(o)); } void shutdownCommand(client *c) { @@ -999,7 +1028,7 @@ void scanDatabaseForReadyLists(redisDb *db) { * * Returns C_ERR if at least one of the DB ids are out of range, otherwise * C_OK is returned. */ -int dbSwapDatabases(int id1, int id2) { +int dbSwapDatabases(long id1, long id2) { if (id1 < 0 || id1 >= server.dbnum || id2 < 0 || id2 >= server.dbnum) return C_ERR; if (id1 == id2) return C_OK; diff --git a/src/debug.c b/src/debug.c index 0c6b5630c..1f1157d4a 100644 --- a/src/debug.c +++ b/src/debug.c @@ -702,7 +702,7 @@ void _serverAssertPrintClientInfo(const client *c) { bugReportStart(); serverLog(LL_WARNING,"=== ASSERTION FAILED CLIENT CONTEXT ==="); - serverLog(LL_WARNING,"client->flags = %d", c->flags); + serverLog(LL_WARNING,"client->flags = %llu", (unsigned long long)c->flags); serverLog(LL_WARNING,"client->fd = %d", c->fd); serverLog(LL_WARNING,"client->argc = %d", c->argc); for (j=0; j < c->argc; j++) { diff --git a/src/expire.c b/src/expire.c index 0b92ee3fe..b23117a3c 100644 --- a/src/expire.c +++ b/src/expire.c @@ -64,6 +64,7 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { dbSyncDelete(db,keyobj); notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired",keyobj,db->id); + if (server.tracking_clients) trackingInvalidateKey(keyobj); decrRefCount(keyobj); server.stat_expiredkeys++; return 1; diff --git a/src/module.c b/src/module.c index 7dee7e776..f4f753c00 100644 --- a/src/module.c +++ b/src/module.c @@ -1242,6 +1242,17 @@ int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) { return REDISMODULE_OK; } +/* Reply with a bulk string, taking in input a C buffer pointer that is + * assumed to be null-terminated. + * + * The function always returns REDISMODULE_OK. */ +int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) { + client *c = moduleGetReplyClient(ctx); + if (c == NULL) return REDISMODULE_OK; + addReplyBulkCString(c,(char*)buf); + return REDISMODULE_OK; +} + /* Reply with a bulk string, taking in input a RedisModuleString object. * * The function always returns REDISMODULE_OK. */ @@ -1455,6 +1466,9 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) { if (server.cluster_enabled) flags |= REDISMODULE_CTX_FLAGS_CLUSTER; + if (server.loading) + flags |= REDISMODULE_CTX_FLAGS_LOADING; + /* Maxmemory and eviction policy */ if (server.maxmemory > 0) { flags |= REDISMODULE_CTX_FLAGS_MAXMEMORY; diff --git a/src/networking.c b/src/networking.c index 4bc22120a..7976caf29 100644 --- a/src/networking.c +++ b/src/networking.c @@ -158,6 +158,7 @@ client *createClient(int fd) { c->pubsub_patterns = listCreate(); c->peerid = NULL; c->client_list_node = NULL; + c->client_tracking_redirection = 0; listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); if (fd != -1) linkClient(c); @@ -506,7 +507,7 @@ void addReplyDouble(client *c, double d) { if (c->resp == 2) { addReplyBulkCString(c, d > 0 ? "inf" : "-inf"); } else { - addReplyProto(c, d > 0 ? ",inf\r\n" : "-inf\r\n", + addReplyProto(c, d > 0 ? ",inf\r\n" : ",-inf\r\n", d > 0 ? 6 : 7); } } else { @@ -966,6 +967,9 @@ void unlinkClient(client *c) { listDelNode(server.unblocked_clients,ln); c->flags &= ~CLIENT_UNBLOCKED; } + + /* Clear the tracking status. */ + if (c->flags & CLIENT_TRACKING) disableTracking(c); } void freeClient(client *c) { @@ -1849,6 +1853,8 @@ sds catClientInfoString(sds s, client *client) { if (client->flags & CLIENT_PUBSUB) *p++ = 'P'; if (client->flags & CLIENT_MULTI) *p++ = 'x'; if (client->flags & CLIENT_BLOCKED) *p++ = 'b'; + if (client->flags & CLIENT_TRACKING) *p++ = 't'; + if (client->flags & CLIENT_TRACKING_BROKEN_REDIR) *p++ = 'R'; if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd'; if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c'; if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u'; @@ -1948,19 +1954,21 @@ void clientCommand(client *c) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { -"id -- Return the ID of the current connection.", -"getname -- Return the name of the current connection.", -"kill <ip:port> -- Kill connection made from <ip:port>.", -"kill <option> <value> [option value ...] -- Kill connections. Options are:", -" addr <ip:port> -- Kill connection made from <ip:port>", -" type (normal|master|replica|pubsub) -- Kill connections by type.", -" skipme (yes|no) -- Skip killing current connection (default: yes).", -"list [options ...] -- Return information about client connections. Options:", -" type (normal|master|replica|pubsub) -- Return clients of specified type.", -"pause <timeout> -- Suspend all Redis clients for <timout> milliseconds.", -"reply (on|off|skip) -- Control the replies sent to the current connection.", -"setname <name> -- Assign the name <name> to the current connection.", -"unblock <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client.", +"ID -- Return the ID of the current connection.", +"GETNAME -- Return the name of the current connection.", +"KILL <ip:port> -- Kill connection made from <ip:port>.", +"KILL <option> <value> [option value ...] -- Kill connections. Options are:", +" ADDR <ip:port> -- Kill connection made from <ip:port>", +" TYPE (normal|master|replica|pubsub) -- Kill connections by type.", +" SKIPME (yes|no) -- Skip killing current connection (default: yes).", +"LIST [options ...] -- Return information about client connections. Options:", +" TYPE (normal|master|replica|pubsub) -- Return clients of specified type.", +"PAUSE <timeout> -- Suspend all Redis clients for <timout> milliseconds.", +"REPLY (on|off|skip) -- Control the replies sent to the current connection.", +"SETNAME <name> -- Assign the name <name> to the current connection.", +"UNBLOCK <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client.", +"TRACKING (on|off) [REDIRECT <id>] -- Enable client keys tracking for client side caching.", +"GETREDIR -- Return the client ID we are redirecting to when tracking is enabled.", NULL }; addReplyHelp(c, help); @@ -2117,20 +2125,63 @@ NULL addReply(c,shared.czero); } } else if (!strcasecmp(c->argv[1]->ptr,"setname") && c->argc == 3) { + /* CLIENT SETNAME */ if (clientSetNameOrReply(c,c->argv[2]) == C_OK) addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"getname") && c->argc == 2) { + /* CLIENT GETNAME */ if (c->name) addReplyBulk(c,c->name); else addReplyNull(c); } else if (!strcasecmp(c->argv[1]->ptr,"pause") && c->argc == 3) { + /* CLIENT PAUSE */ long long duration; - if (getTimeoutFromObjectOrReply(c,c->argv[2],&duration,UNIT_MILLISECONDS) - != C_OK) return; + if (getTimeoutFromObjectOrReply(c,c->argv[2],&duration, + UNIT_MILLISECONDS) != C_OK) return; pauseClients(duration); addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"tracking") && + (c->argc == 3 || c->argc == 5)) + { + /* CLIENT TRACKING (on|off) [REDIRECT <id>] */ + long long redir = 0; + + /* Parse the redirection option: we'll require the client with + * the specified ID to exist right now, even if it is possible + * it will get disconnected later. */ + if (c->argc == 5) { + if (strcasecmp(c->argv[3]->ptr,"redirect") != 0) { + addReply(c,shared.syntaxerr); + return; + } else { + if (getLongLongFromObjectOrReply(c,c->argv[4],&redir,NULL) != + C_OK) return; + if (lookupClientByID(redir) == NULL) { + addReplyError(c,"The client ID you want redirect to " + "does not exist"); + return; + } + } + } + + if (!strcasecmp(c->argv[2]->ptr,"on")) { + enableTracking(c,redir); + } else if (!strcasecmp(c->argv[2]->ptr,"off")) { + disableTracking(c); + } else { + addReply(c,shared.syntaxerr); + return; + } + addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"getredir") && c->argc == 2) { + /* CLIENT GETREDIR */ + if (c->flags & CLIENT_TRACKING) { + addReplyLongLong(c,c->client_tracking_redirection); + } else { + addReplyLongLong(c,-1); + } } else { addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try CLIENT HELP", (char*)c->argv[1]->ptr); } diff --git a/src/object.c b/src/object.c index 234e11f8a..10209a6c8 100644 --- a/src/object.c +++ b/src/object.c @@ -834,7 +834,9 @@ size_t objectComputeSize(robj *o, size_t sample_size) { d = ((zset*)o->ptr)->dict; zskiplist *zsl = ((zset*)o->ptr)->zsl; zskiplistNode *znode = zsl->header->level[0].forward; - asize = sizeof(*o)+sizeof(zset)+(sizeof(struct dictEntry*)*dictSlots(d)); + asize = sizeof(*o)+sizeof(zset)+sizeof(zskiplist)+sizeof(dict)+ + (sizeof(struct dictEntry*)*dictSlots(d))+ + zmalloc_size(zsl->header); while(znode != NULL && samples < sample_size) { elesize += sdsAllocSize(znode->ele); elesize += sizeof(struct dictEntry) + zmalloc_size(znode); @@ -44,6 +44,7 @@ #define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__) +char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */ extern int rdbCheckMode; void rdbCheckError(const char *fmt, ...); void rdbCheckSetError(const char *fmt, ...); @@ -61,11 +62,17 @@ void rdbCheckThenExit(int linenum, char *reason, ...) { if (!rdbCheckMode) { serverLog(LL_WARNING, "%s", msg); - char *argv[2] = {"",server.rdb_filename}; - redis_check_rdb_main(2,argv,NULL); + if (rdbFileBeingLoaded) { + char *argv[2] = {"",rdbFileBeingLoaded}; + redis_check_rdb_main(2,argv,NULL); + } else { + serverLog(LL_WARNING, "Failure loading rdb format from socket, assuming connection error, resuming operation."); + return; + } } else { rdbCheckError("%s",msg); } + serverLog(LL_WARNING, "Terminating server after rdb file reading failure."); exit(1); } @@ -1039,6 +1046,11 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { if (rdbSaveObjectType(rdb,val) == -1) return -1; if (rdbSaveStringObject(rdb,key) == -1) return -1; if (rdbSaveObject(rdb,val,key) == -1) return -1; + + /* Delay return if required (for testing) */ + if (server.rdb_key_save_delay) + usleep(server.rdb_key_save_delay); + return 1; } @@ -1800,18 +1812,23 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { /* Mark that we are loading in the global state and setup the fields * needed to provide loading stats. */ -void startLoading(FILE *fp) { - struct stat sb; - +void startLoading(size_t size) { /* Load the DB */ server.loading = 1; server.loading_start_time = time(NULL); server.loading_loaded_bytes = 0; - if (fstat(fileno(fp), &sb) == -1) { - server.loading_total_bytes = 0; - } else { - server.loading_total_bytes = sb.st_size; - } + server.loading_total_bytes = size; +} + +/* Mark that we are loading in the global state and setup the fields + * needed to provide loading stats. + * 'filename' is optional and used for rdb-check on error */ +void startLoadingFile(FILE *fp, char* filename) { + struct stat sb; + if (fstat(fileno(fp), &sb) == -1) + sb.st_size = 0; + rdbFileBeingLoaded = filename; + startLoading(sb.st_size); } /* Refresh the loading progress info */ @@ -1824,6 +1841,7 @@ void loadingProgress(off_t pos) { /* Loading finished */ void stopLoading(void) { server.loading = 0; + rdbFileBeingLoaded = NULL; } /* Track loading progress in order to serve client's from time to time @@ -2089,7 +2107,7 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi) { int retval; if ((fp = fopen(filename,"r")) == NULL) return C_ERR; - startLoading(fp); + startLoadingFile(fp, filename); rioInitWithFile(&rdb,fp); retval = rdbLoadRio(&rdb,rsi,0); fclose(fp); diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c index ec00ee71c..e2d71b5a5 100644 --- a/src/redis-check-rdb.c +++ b/src/redis-check-rdb.c @@ -202,7 +202,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { } expiretime = -1; - startLoading(fp); + startLoadingFile(fp, rdbfilename); while(1) { robj *key, *val; @@ -314,6 +314,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { } if (closefile) fclose(fp); + stopLoading(); return 0; eoferr: /* unexpected end of file is handled here with a fatal exit */ @@ -324,6 +325,7 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */ } err: if (closefile) fclose(fp); + stopLoading(); return 1; } diff --git a/src/redismodule.h b/src/redismodule.h index 259a5f1db..b9c73957b 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -87,6 +87,8 @@ #define REDISMODULE_CTX_FLAGS_OOM_WARNING (1<<11) /* The command was sent over the replication link. */ #define REDISMODULE_CTX_FLAGS_REPLICATED (1<<12) +/* Redis is currently loading either from AOF or RDB. */ +#define REDISMODULE_CTX_FLAGS_LOADING (1<<13) #define REDISMODULE_NOTIFY_GENERIC (1<<2) /* g */ @@ -226,6 +228,7 @@ int REDISMODULE_API_FUNC(RedisModule_ReplyWithSimpleString)(RedisModuleCtx *ctx, int REDISMODULE_API_FUNC(RedisModule_ReplyWithArray)(RedisModuleCtx *ctx, long len); void REDISMODULE_API_FUNC(RedisModule_ReplySetArrayLength)(RedisModuleCtx *ctx, long len); int REDISMODULE_API_FUNC(RedisModule_ReplyWithStringBuffer)(RedisModuleCtx *ctx, const char *buf, size_t len); +int REDISMODULE_API_FUNC(RedisModule_ReplyWithCString)(RedisModuleCtx *ctx, const char *buf); int REDISMODULE_API_FUNC(RedisModule_ReplyWithString)(RedisModuleCtx *ctx, RedisModuleString *str); int REDISMODULE_API_FUNC(RedisModule_ReplyWithNull)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_ReplyWithDouble)(RedisModuleCtx *ctx, double d); @@ -376,6 +379,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(ReplyWithArray); REDISMODULE_GET_API(ReplySetArrayLength); REDISMODULE_GET_API(ReplyWithStringBuffer); + REDISMODULE_GET_API(ReplyWithCString); REDISMODULE_GET_API(ReplyWithString); REDISMODULE_GET_API(ReplyWithNull); REDISMODULE_GET_API(ReplyWithCallReply); diff --git a/src/replication.c b/src/replication.c index 63a67a06a..26e7cf8f0 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1113,11 +1113,65 @@ void restartAOFAfterSYNC() { } } +static int useDisklessLoad() { + /* compute boolean decision to use diskless load */ + return server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB || + (server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0); +} + +/* Helper function for readSyncBulkPayload() to make backups of the current + * DBs before socket-loading the new ones. The backups may be restored later + * or freed by disklessLoadRestoreBackups(). */ +redisDb *disklessLoadMakeBackups(void) { + redisDb *backups = zmalloc(sizeof(redisDb)*server.dbnum); + for (int i=0; i<server.dbnum; i++) { + backups[i] = server.db[i]; + server.db[i].dict = dictCreate(&dbDictType,NULL); + server.db[i].expires = dictCreate(&keyptrDictType,NULL); + } + return backups; +} + +/* Helper function for readSyncBulkPayload(): when replica-side diskless + * database loading is used, Redis makes a backup of the existing databases + * before loading the new ones from the socket. + * + * If the socket loading went wrong, we want to restore the old backups + * into the server databases. This function does just that in the case + * the 'restore' argument (the number of DBs to replace) is non-zero. + * + * When instead the loading succeeded we want just to free our old backups, + * in that case the funciton will do just that when 'restore' is 0. */ +void disklessLoadRestoreBackups(redisDb *backup, int restore, int empty_db_flags) +{ + if (restore) { + /* Restore. */ + emptyDbGeneric(server.db,-1,empty_db_flags,replicationEmptyDbCallback); + for (int i=0; i<server.dbnum; i++) { + dictRelease(server.db[i].dict); + dictRelease(server.db[i].expires); + server.db[i] = backup[i]; + } + } else { + /* Delete. */ + emptyDbGeneric(backup,-1,empty_db_flags,replicationEmptyDbCallback); + for (int i=0; i<server.dbnum; i++) { + dictRelease(backup[i].dict); + dictRelease(backup[i].expires); + } + } + zfree(backup); +} + /* Asynchronously read the SYNC payload we receive from a master */ #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { char buf[4096]; ssize_t nread, readlen, nwritten; + int use_diskless_load; + redisDb *diskless_load_backup = NULL; + int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : + EMPTYDB_NO_FLAGS; off_t left; UNUSED(el); UNUSED(privdata); @@ -1173,90 +1227,202 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { * at the next call. */ server.repl_transfer_size = 0; serverLog(LL_NOTICE, - "MASTER <-> REPLICA sync: receiving streamed RDB from master"); + "MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF %s", + useDisklessLoad()? "to parser":"to disk"); } else { usemark = 0; server.repl_transfer_size = strtol(buf+1,NULL,10); serverLog(LL_NOTICE, - "MASTER <-> REPLICA sync: receiving %lld bytes from master", - (long long) server.repl_transfer_size); + "MASTER <-> REPLICA sync: receiving %lld bytes from master %s", + (long long) server.repl_transfer_size, + useDisklessLoad()? "to parser":"to disk"); } return; } - /* Read bulk data */ - if (usemark) { - readlen = sizeof(buf); - } else { - left = server.repl_transfer_size - server.repl_transfer_read; - readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); - } - - nread = read(fd,buf,readlen); - if (nread <= 0) { - serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s", - (nread == -1) ? strerror(errno) : "connection lost"); - cancelReplicationHandshake(); - return; - } - server.stat_net_input_bytes += nread; + use_diskless_load = useDisklessLoad(); + if (!use_diskless_load) { + /* Read the data from the socket, store it to a file and search + * for the EOF. */ + if (usemark) { + readlen = sizeof(buf); + } else { + left = server.repl_transfer_size - server.repl_transfer_read; + readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); + } - /* When a mark is used, we want to detect EOF asap in order to avoid - * writing the EOF mark into the file... */ - int eof_reached = 0; + nread = read(fd,buf,readlen); + if (nread <= 0) { + serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s", + (nread == -1) ? strerror(errno) : "connection lost"); + cancelReplicationHandshake(); + return; + } + server.stat_net_input_bytes += nread; + + /* When a mark is used, we want to detect EOF asap in order to avoid + * writing the EOF mark into the file... */ + int eof_reached = 0; + + if (usemark) { + /* Update the last bytes array, and check if it matches our + * delimiter. */ + if (nread >= CONFIG_RUN_ID_SIZE) { + memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE, + CONFIG_RUN_ID_SIZE); + } else { + int rem = CONFIG_RUN_ID_SIZE-nread; + memmove(lastbytes,lastbytes+nread,rem); + memcpy(lastbytes+rem,buf,nread); + } + if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) + eof_reached = 1; + } - if (usemark) { - /* Update the last bytes array, and check if it matches our delimiter.*/ - if (nread >= CONFIG_RUN_ID_SIZE) { - memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE); - } else { - int rem = CONFIG_RUN_ID_SIZE-nread; - memmove(lastbytes,lastbytes+nread,rem); - memcpy(lastbytes+rem,buf,nread); + /* Update the last I/O time for the replication transfer (used in + * order to detect timeouts during replication), and write what we + * got from the socket to the dump file on disk. */ + server.repl_transfer_lastio = server.unixtime; + if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) { + serverLog(LL_WARNING, + "Write error or short write writing to the DB dump file " + "needed for MASTER <-> REPLICA synchronization: %s", + (nwritten == -1) ? strerror(errno) : "short write"); + goto error; } - if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1; - } + server.repl_transfer_read += nread; - server.repl_transfer_lastio = server.unixtime; - if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) { - serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> REPLICA synchronization: %s", - (nwritten == -1) ? strerror(errno) : "short write"); - goto error; - } - server.repl_transfer_read += nread; + /* Delete the last 40 bytes from the file if we reached EOF. */ + if (usemark && eof_reached) { + if (ftruncate(server.repl_transfer_fd, + server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1) + { + serverLog(LL_WARNING, + "Error truncating the RDB file received from the master " + "for SYNC: %s", strerror(errno)); + goto error; + } + } - /* Delete the last 40 bytes from the file if we reached EOF. */ - if (usemark && eof_reached) { - if (ftruncate(server.repl_transfer_fd, - server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1) + /* Sync data on disk from time to time, otherwise at the end of the + * transfer we may suffer a big delay as the memory buffers are copied + * into the actual disk. */ + if (server.repl_transfer_read >= + server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) { - serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno)); - goto error; + off_t sync_size = server.repl_transfer_read - + server.repl_transfer_last_fsync_off; + rdb_fsync_range(server.repl_transfer_fd, + server.repl_transfer_last_fsync_off, sync_size); + server.repl_transfer_last_fsync_off += sync_size; } + + /* Check if the transfer is now complete */ + if (!usemark) { + if (server.repl_transfer_read == server.repl_transfer_size) + eof_reached = 1; + } + + /* If the transfer is yet not complete, we need to read more, so + * return ASAP and wait for the handler to be called again. */ + if (!eof_reached) return; } - /* Sync data on disk from time to time, otherwise at the end of the transfer - * we may suffer a big delay as the memory buffers are copied into the - * actual disk. */ - if (server.repl_transfer_read >= - server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) + /* We reach this point in one of the following cases: + * + * 1. The replica is using diskless replication, that is, it reads data + * directly from the socket to the Redis memory, without using + * a temporary RDB file on disk. In that case we just block and + * read everything from the socket. + * + * 2. Or when we are done reading from the socket to the RDB file, in + * such case we want just to read the RDB file in memory. */ + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); + + /* We need to stop any AOF rewriting child before flusing and parsing + * the RDB, otherwise we'll create a copy-on-write disaster. */ + if (server.aof_state != AOF_OFF) stopAppendOnly(); + signalFlushedDb(-1); + + /* When diskless RDB loading is used by replicas, it may be configured + * in order to save the current DB instead of throwing it away, + * so that we can restore it in case of failed transfer. */ + if (use_diskless_load && + server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { - off_t sync_size = server.repl_transfer_read - - server.repl_transfer_last_fsync_off; - rdb_fsync_range(server.repl_transfer_fd, - server.repl_transfer_last_fsync_off, sync_size); - server.repl_transfer_last_fsync_off += sync_size; - } + diskless_load_backup = disklessLoadMakeBackups(); + } else { + emptyDb(-1,empty_db_flags,replicationEmptyDbCallback); + } + + /* Before loading the DB into memory we need to delete the readable + * handler, otherwise it will get called recursively since + * rdbLoad() will call the event loop to process events from time to + * time for non blocking loading. */ + aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory"); + rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + if (use_diskless_load) { + rio rdb; + rioInitWithFd(&rdb,fd,server.repl_transfer_size); + + /* Put the socket in blocking mode to simplify RDB transfer. + * We'll restore it when the RDB is received. */ + anetBlock(NULL,fd); + anetRecvTimeout(NULL,fd,server.repl_timeout*1000); + startLoading(server.repl_transfer_size); + + if (rdbLoadRio(&rdb,&rsi,0) != C_OK) { + /* RDB loading failed. */ + stopLoading(); + serverLog(LL_WARNING, + "Failed trying to load the MASTER synchronization DB " + "from socket"); + cancelReplicationHandshake(); + rioFreeFd(&rdb, NULL); + if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { + /* Restore the backed up databases. */ + disklessLoadRestoreBackups(diskless_load_backup,1, + empty_db_flags); + } else { + /* Remove the half-loaded data in case we started with + * an empty replica. */ + emptyDb(-1,empty_db_flags,replicationEmptyDbCallback); + } - /* Check if the transfer is now complete */ - if (!usemark) { - if (server.repl_transfer_read == server.repl_transfer_size) - eof_reached = 1; - } + /* Note that there's no point in restarting the AOF on SYNC + * failure, it'll be restarted when sync succeeds or the replica + * gets promoted. */ + return; + } + stopLoading(); + + /* RDB loading succeeded if we reach this point. */ + if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { + /* Delete the backup databases we created before starting to load + * the new RDB. Now the RDB was loaded with success so the old + * data is useless. */ + disklessLoadRestoreBackups(diskless_load_backup,0,empty_db_flags); + } - if (eof_reached) { - int aof_is_enabled = server.aof_state != AOF_OFF; + /* Verify the end mark is correct. */ + if (usemark) { + if (!rioRead(&rdb,buf,CONFIG_RUN_ID_SIZE) || + memcmp(buf,eofmark,CONFIG_RUN_ID_SIZE) != 0) + { + serverLog(LL_WARNING,"Replication stream EOF marker is broken"); + cancelReplicationHandshake(); + rioFreeFd(&rdb, NULL); + return; + } + } + /* Cleanup and restore the socket to the original state to continue + * with the normal replication. */ + rioFreeFd(&rdb, NULL); + anetNonBlock(NULL,fd); + anetRecvTimeout(NULL,fd,0); + } else { /* Ensure background save doesn't overwrite synced data */ if (server.rdb_child_pid != -1) { serverLog(LL_NOTICE, @@ -1269,59 +1435,53 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { } if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { - serverLog(LL_WARNING,"Failed trying to rename the temp DB into %s in MASTER <-> REPLICA synchronization: %s", - server.rdb_filename, strerror(errno)); + serverLog(LL_WARNING, + "Failed trying to rename the temp DB into %s in " + "MASTER <-> REPLICA synchronization: %s", + server.rdb_filename, strerror(errno)); cancelReplicationHandshake(); return; } - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); - /* We need to stop any AOFRW fork before flusing and parsing - * RDB, otherwise we'll create a copy-on-write disaster. */ - if(aof_is_enabled) stopAppendOnly(); - signalFlushedDb(-1); - emptyDb( - -1, - server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS, - replicationEmptyDbCallback); - /* Before loading the DB into memory we need to delete the readable - * handler, otherwise it will get called recursively since - * rdbLoad() will call the event loop to process events from time to - * time for non blocking loading. */ - aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory"); - rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; if (rdbLoad(server.rdb_filename,&rsi) != C_OK) { - serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); + serverLog(LL_WARNING, + "Failed trying to load the MASTER synchronization " + "DB from disk"); cancelReplicationHandshake(); - /* Re-enable the AOF if we disabled it earlier, in order to restore - * the original configuration. */ - if (aof_is_enabled) restartAOFAfterSYNC(); + /* Note that there's no point in restarting the AOF on sync failure, + it'll be restarted when sync succeeds or replica promoted. */ return; } - /* Final setup of the connected slave <- master link */ + + /* Cleanup. */ zfree(server.repl_transfer_tmpfile); close(server.repl_transfer_fd); - replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db); - server.repl_state = REPL_STATE_CONNECTED; - server.repl_down_since = 0; - /* After a full resynchroniziation we use the replication ID and - * offset of the master. The secondary ID / offset are cleared since - * we are starting a new history. */ - memcpy(server.replid,server.master->replid,sizeof(server.replid)); - server.master_repl_offset = server.master->reploff; - clearReplicationId2(); - /* Let's create the replication backlog if needed. Slaves need to - * accumulate the backlog regardless of the fact they have sub-slaves - * or not, in order to behave correctly if they are promoted to - * masters after a failover. */ - if (server.repl_backlog == NULL) createReplicationBacklog(); - - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success"); - /* Restart the AOF subsystem now that we finished the sync. This - * will trigger an AOF rewrite, and when done will start appending - * to the new file. */ - if (aof_is_enabled) restartAOFAfterSYNC(); + server.repl_transfer_fd = -1; + server.repl_transfer_tmpfile = NULL; } + + /* Final setup of the connected slave <- master link */ + replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db); + server.repl_state = REPL_STATE_CONNECTED; + server.repl_down_since = 0; + + /* After a full resynchroniziation we use the replication ID and + * offset of the master. The secondary ID / offset are cleared since + * we are starting a new history. */ + memcpy(server.replid,server.master->replid,sizeof(server.replid)); + server.master_repl_offset = server.master->reploff; + clearReplicationId2(); + + /* Let's create the replication backlog if needed. Slaves need to + * accumulate the backlog regardless of the fact they have sub-slaves + * or not, in order to behave correctly if they are promoted to + * masters after a failover. */ + if (server.repl_backlog == NULL) createReplicationBacklog(); + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success"); + + /* Restart the AOF subsystem now that we finished the sync. This + * will trigger an AOF rewrite, and when done will start appending + * to the new file. */ + if (server.aof_enabled) restartAOFAfterSYNC(); return; error: @@ -1845,16 +2005,20 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { } /* Prepare a suitable temp file for bulk transfer */ - while(maxtries--) { - snprintf(tmpfile,256, - "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); - dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); - if (dfd != -1) break; - sleep(1); - } - if (dfd == -1) { - serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno)); - goto error; + if (!useDisklessLoad()) { + while(maxtries--) { + snprintf(tmpfile,256, + "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); + dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); + if (dfd != -1) break; + sleep(1); + } + if (dfd == -1) { + serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno)); + goto error; + } + server.repl_transfer_tmpfile = zstrdup(tmpfile); + server.repl_transfer_fd = dfd; } /* Setup the non blocking download of the bulk file. */ @@ -1871,15 +2035,19 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { server.repl_transfer_size = -1; server.repl_transfer_read = 0; server.repl_transfer_last_fsync_off = 0; - server.repl_transfer_fd = dfd; server.repl_transfer_lastio = server.unixtime; - server.repl_transfer_tmpfile = zstrdup(tmpfile); return; error: aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); if (dfd != -1) close(dfd); close(fd); + if (server.repl_transfer_fd != -1) + close(server.repl_transfer_fd); + if (server.repl_transfer_tmpfile) + zfree(server.repl_transfer_tmpfile); + server.repl_transfer_tmpfile = NULL; + server.repl_transfer_fd = -1; server.repl_transfer_s = -1; server.repl_state = REPL_STATE_CONNECT; return; @@ -1933,9 +2101,13 @@ void undoConnectWithMaster(void) { void replicationAbortSyncTransfer(void) { serverAssert(server.repl_state == REPL_STATE_TRANSFER); undoConnectWithMaster(); - close(server.repl_transfer_fd); - unlink(server.repl_transfer_tmpfile); - zfree(server.repl_transfer_tmpfile); + if (server.repl_transfer_fd!=-1) { + close(server.repl_transfer_fd); + unlink(server.repl_transfer_tmpfile); + zfree(server.repl_transfer_tmpfile); + server.repl_transfer_tmpfile = NULL; + server.repl_transfer_fd = -1; + } } /* This function aborts a non blocking replication attempt if there is one @@ -2045,6 +2217,9 @@ void replicaofCommand(client *c) { serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')", client); sdsfree(client); + /* Restart the AOF subsystem in case we shut it down during a sync when + * we were still a slave. */ + if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOFAfterSYNC(); } } else { long port; @@ -157,7 +157,123 @@ void rioInitWithFile(rio *r, FILE *fp) { r->io.file.autosync = 0; } -/* ------------------- File descriptors set implementation ------------------- */ +/* ------------------- File descriptor implementation ------------------- + * We use this RIO implemetnation when reading an RDB file directly from + * the socket to the memory via rdbLoadRio(), thus this implementation + * only implements reading from a file descriptor that is, normally, + * just a socket. */ + +static size_t rioFdWrite(rio *r, const void *buf, size_t len) { + UNUSED(r); + UNUSED(buf); + UNUSED(len); + return 0; /* Error, this target does not yet support writing. */ +} + +/* Returns 1 or 0 for success/failure. */ +static size_t rioFdRead(rio *r, void *buf, size_t len) { + size_t avail = sdslen(r->io.fd.buf)-r->io.fd.pos; + + /* If the buffer is too small for the entire request: realloc. */ + if (sdslen(r->io.fd.buf) + sdsavail(r->io.fd.buf) < len) + r->io.fd.buf = sdsMakeRoomFor(r->io.fd.buf, len - sdslen(r->io.fd.buf)); + + /* If the remaining unused buffer is not large enough: memmove so that we + * can read the rest. */ + if (len > avail && sdsavail(r->io.fd.buf) < len - avail) { + sdsrange(r->io.fd.buf, r->io.fd.pos, -1); + r->io.fd.pos = 0; + } + + /* If we don't already have all the data in the sds, read more */ + while (len > sdslen(r->io.fd.buf) - r->io.fd.pos) { + size_t buffered = sdslen(r->io.fd.buf) - r->io.fd.pos; + size_t toread = len - buffered; + /* Read either what's missing, or PROTO_IOBUF_LEN, the bigger of + * the two. */ + if (toread < PROTO_IOBUF_LEN) toread = PROTO_IOBUF_LEN; + if (toread > sdsavail(r->io.fd.buf)) toread = sdsavail(r->io.fd.buf); + if (r->io.fd.read_limit != 0 && + r->io.fd.read_so_far + buffered + toread > r->io.fd.read_limit) + { + if (r->io.fd.read_limit >= r->io.fd.read_so_far - buffered) + toread = r->io.fd.read_limit - r->io.fd.read_so_far - buffered; + else { + errno = EOVERFLOW; + return 0; + } + } + int retval = read(r->io.fd.fd, + (char*)r->io.fd.buf + sdslen(r->io.fd.buf), + toread); + if (retval <= 0) { + if (errno == EWOULDBLOCK) errno = ETIMEDOUT; + return 0; + } + sdsIncrLen(r->io.fd.buf, retval); + } + + memcpy(buf, (char*)r->io.fd.buf + r->io.fd.pos, len); + r->io.fd.read_so_far += len; + r->io.fd.pos += len; + return len; +} + +/* Returns read/write position in file. */ +static off_t rioFdTell(rio *r) { + return r->io.fd.read_so_far; +} + +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioFdFlush(rio *r) { + /* Our flush is implemented by the write method, that recognizes a + * buffer set to NULL with a count of zero as a flush request. */ + return rioFdWrite(r,NULL,0); +} + +static const rio rioFdIO = { + rioFdRead, + rioFdWrite, + rioFdTell, + rioFdFlush, + NULL, /* update_checksum */ + 0, /* current checksum */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ + { { NULL, 0 } } /* union for io-specific vars */ +}; + +/* Create an RIO that implements a buffered read from an fd + * read_limit argument stops buffering when the reaching the limit. */ +void rioInitWithFd(rio *r, int fd, size_t read_limit) { + *r = rioFdIO; + r->io.fd.fd = fd; + r->io.fd.pos = 0; + r->io.fd.read_limit = read_limit; + r->io.fd.read_so_far = 0; + r->io.fd.buf = sdsnewlen(NULL, PROTO_IOBUF_LEN); + sdsclear(r->io.fd.buf); +} + +/* Release the RIO tream. Optionally returns the unread buffered data + * when the SDS pointer 'remaining' is passed. */ +void rioFreeFd(rio *r, sds *remaining) { + if (remaining && (size_t)r->io.fd.pos < sdslen(r->io.fd.buf)) { + if (r->io.fd.pos > 0) sdsrange(r->io.fd.buf, r->io.fd.pos, -1); + *remaining = r->io.fd.buf; + } else { + sdsfree(r->io.fd.buf); + if (remaining) *remaining = NULL; + } + r->io.fd.buf = NULL; +} + +/* ------------------- File descriptors set implementation ------------------ + * This target is used to write the RDB file to N different replicas via + * sockets, when the master just streams the data to the replicas without + * creating an RDB on-disk image (diskless replication option). + * It only implements writes. */ /* Returns 1 or 0 for success/failure. * The function returns success as long as we are able to correctly write @@ -300,7 +416,7 @@ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) { * disk I/O concentrated in very little time. When we fsync in an explicit * way instead the I/O pressure is more distributed across time. */ void rioSetAutoSync(rio *r, off_t bytes) { - serverAssert(r->read == rioFileIO.read); + if(r->write != rioFileIO.write) return; r->io.file.autosync = bytes; } @@ -73,6 +73,14 @@ struct _rio { off_t buffered; /* Bytes written since last fsync. */ off_t autosync; /* fsync after 'autosync' bytes written. */ } file; + /* file descriptor */ + struct { + int fd; /* File descriptor. */ + off_t pos; /* pos in buf that was returned */ + sds buf; /* buffered data */ + size_t read_limit; /* don't allow to buffer/read more than that */ + size_t read_so_far; /* amount of data read from the rio (not buffered) */ + } fd; /* Multiple FDs target (used to write to N sockets). */ struct { int *fds; /* File descriptors. */ @@ -126,9 +134,11 @@ static inline int rioFlush(rio *r) { void rioInitWithFile(rio *r, FILE *fp); void rioInitWithBuffer(rio *r, sds s); +void rioInitWithFd(rio *r, int fd, size_t read_limit); void rioInitWithFdset(rio *r, int *fds, int numfds); void rioFreeFdset(rio *r); +void rioFreeFd(rio *r, sds* out_remainingBufferedData); size_t rioWriteBulkCount(rio *r, char prefix, long count); size_t rioWriteBulkString(rio *r, const char *buf, size_t len); diff --git a/src/server.c b/src/server.c index e1d48e596..4337b8f01 100644 --- a/src/server.c +++ b/src/server.c @@ -2265,6 +2265,7 @@ void initServerConfig(void) { server.aof_flush_postponed_start = 0; server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC; server.rdb_save_incremental_fsync = CONFIG_DEFAULT_RDB_SAVE_INCREMENTAL_FSYNC; + server.rdb_key_save_delay = CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY; server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED; server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE; server.pidfile = NULL; @@ -2334,6 +2335,9 @@ void initServerConfig(void) { server.cached_master = NULL; server.master_initial_offset = -1; server.repl_state = REPL_STATE_NONE; + server.repl_transfer_tmpfile = NULL; + server.repl_transfer_fd = -1; + server.repl_transfer_s = -1; server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA; server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY; @@ -2342,6 +2346,7 @@ void initServerConfig(void) { server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY; server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC; + server.repl_diskless_load = CONFIG_DEFAULT_REPL_DISKLESS_LOAD; server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY; server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD; server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT; @@ -3194,6 +3199,7 @@ void call(client *c, int flags) { latencyAddSampleIfNeeded(latency_event,duration/1000); slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration); } + if (flags & CMD_CALL_STATS) { /* use the real command that was executed (cmd and lastamc) may be * different, in case of MULTI-EXEC or re-written commands such as @@ -3261,6 +3267,16 @@ void call(client *c, int flags) { redisOpArrayFree(&server.also_propagate); } server.also_propagate = prev_also_propagate; + + /* If the client has keys tracking enabled for client side caching, + * make sure to remember the keys it fetched via this command. */ + if (c->cmd->flags & CMD_READONLY) { + client *caller = (c->flags & CLIENT_LUA && server.lua_caller) ? + server.lua_caller : c; + if (caller->flags & CLIENT_TRACKING) + trackingRememberKeys(caller); + } + server.stat_numcommands++; } @@ -3879,10 +3895,12 @@ sds genRedisInfoString(char *section) { "connected_clients:%lu\r\n" "client_recent_max_input_buffer:%zu\r\n" "client_recent_max_output_buffer:%zu\r\n" - "blocked_clients:%d\r\n", + "blocked_clients:%d\r\n" + "tracking_clients:%d\r\n", listLength(server.clients)-listLength(server.slaves), maxin, maxout, - server.blocked_clients); + server.blocked_clients, + server.tracking_clients); } /* Memory */ @@ -4042,7 +4060,7 @@ sds genRedisInfoString(char *section) { (server.aof_last_write_status == C_OK) ? "ok" : "err", server.stat_aof_cow_bytes); - if (server.aof_state != AOF_OFF) { + if (server.aof_enabled) { info = sdscatprintf(info, "aof_current_size:%lld\r\n" "aof_base_size:%lld\r\n" diff --git a/src/server.h b/src/server.h index 0813f8bd1..f81b1010e 100644 --- a/src/server.h +++ b/src/server.h @@ -132,6 +132,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define CONFIG_DEFAULT_RDB_FILENAME "dump.rdb" #define CONFIG_DEFAULT_REPL_DISKLESS_SYNC 0 #define CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY 5 +#define CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY 0 #define CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA 1 #define CONFIG_DEFAULT_SLAVE_READ_ONLY 1 #define CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY 1 @@ -254,8 +255,8 @@ typedef long long mstime_t; /* millisecond time type. */ #define AOF_WAIT_REWRITE 2 /* AOF waits rewrite to start appending */ /* Client flags */ -#define CLIENT_SLAVE (1<<0) /* This client is a slave server */ -#define CLIENT_MASTER (1<<1) /* This client is a master server */ +#define CLIENT_SLAVE (1<<0) /* This client is a repliaca */ +#define CLIENT_MASTER (1<<1) /* This client is a master */ #define CLIENT_MONITOR (1<<2) /* This client is a slave monitor, see MONITOR */ #define CLIENT_MULTI (1<<3) /* This client is in a MULTI context */ #define CLIENT_BLOCKED (1<<4) /* The client is waiting in a blocking operation */ @@ -289,7 +290,13 @@ typedef long long mstime_t; /* millisecond time type. */ #define CLIENT_PENDING_READ (1<<29) /* The client has pending reads and was put in the list of clients we can read from. */ -#define CLIENT_PENDING_COMMAND (1<<30) /* */ +#define CLIENT_PENDING_COMMAND (1<<30) /* Used in threaded I/O to signal after + we return single threaded that the + client has already pending commands + to be executed. */ +#define CLIENT_TRACKING (1<<31) /* Client enabled keys tracking in order to + perform client side caching. */ +#define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -388,6 +395,12 @@ typedef long long mstime_t; /* millisecond time type. */ #define AOF_FSYNC_EVERYSEC 2 #define CONFIG_DEFAULT_AOF_FSYNC AOF_FSYNC_EVERYSEC +/* Replication diskless load defines */ +#define REPL_DISKLESS_LOAD_DISABLED 0 +#define REPL_DISKLESS_LOAD_WHEN_DB_EMPTY 1 +#define REPL_DISKLESS_LOAD_SWAPDB 2 +#define CONFIG_DEFAULT_REPL_DISKLESS_LOAD REPL_DISKLESS_LOAD_DISABLED + /* Zipped structures related defaults */ #define OBJ_HASH_MAX_ZIPLIST_ENTRIES 512 #define OBJ_HASH_MAX_ZIPLIST_VALUE 64 @@ -646,6 +659,11 @@ typedef struct redisObject { void *ptr; } robj; +/* The a string name for an object's type as listed above + * Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines, + * and Module types have their registered name returned. */ +char *getObjectTypeName(robj*); + /* Macro used to initialize a Redis object allocated on the stack. * Note that this macro is taken near the structure definition to make sure * we'll update it when the structure is changed, to avoid bugs like @@ -816,7 +834,7 @@ typedef struct client { time_t ctime; /* Client creation time. */ time_t lastinteraction; /* Time of the last interaction, used for timeout */ time_t obuf_soft_limit_reached_time; - int flags; /* Client flags: CLIENT_* macros. */ + uint64_t flags; /* Client flags: CLIENT_* macros. */ int authenticated; /* Needed when the default user requires auth. */ int replstate; /* Replication state if this is a slave. */ int repl_put_online_on_ack; /* Install slave write handler on ACK. */ @@ -845,6 +863,11 @@ typedef struct client { sds peerid; /* Cached peer ID. */ listNode *client_list_node; /* list node in client list */ + /* If this client is in tracking mode and this field is non zero, + * invalidation messages for keys fetched by this client will be send to + * the specified client ID. */ + uint64_t client_tracking_redirection; + /* Response buffer */ int bufpos; char buf[PROTO_REPLY_CHUNK_BYTES]; @@ -1142,6 +1165,7 @@ struct redisServer { int daemonize; /* True if running as a daemon */ clientBufferLimitsConfig client_obuf_limits[CLIENT_TYPE_OBUF_COUNT]; /* AOF persistence */ + int aof_enabled; /* AOF configuration */ int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */ int aof_fsync; /* Kind of fsync() policy */ char *aof_filename; /* Name of the AOF file */ @@ -1198,6 +1222,8 @@ struct redisServer { int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */ int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */ int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */ + int rdb_key_save_delay; /* Delay in microseconds between keys while + * writing the RDB. (for testings) */ /* Pipe and data structures for child -> parent info sharing. */ int child_info_pipe[2]; /* Pipe used to write the child_info_data. */ struct { @@ -1233,7 +1259,9 @@ struct redisServer { int repl_min_slaves_to_write; /* Min number of slaves to write. */ int repl_min_slaves_max_lag; /* Max lag of <count> slaves to write. */ int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */ - int repl_diskless_sync; /* Send RDB to slaves sockets directly. */ + int repl_diskless_sync; /* Master send RDB to slaves sockets directly. */ + int repl_diskless_load; /* Slave parse RDB directly from the socket. + * see REPL_DISKLESS_LOAD_* enum */ int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ /* Replication (slave) */ char *masteruser; /* AUTH with this user and masterauth with master */ @@ -1286,6 +1314,8 @@ struct redisServer { unsigned int blocked_clients_by_type[BLOCKED_NUM]; list *unblocked_clients; /* list of clients to unblock before next loop */ list *ready_keys; /* List of readyList structures for BLPOP & co */ + /* Client side caching. */ + unsigned int tracking_clients; /* # of clients with tracking enabled.*/ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; @@ -1591,6 +1621,7 @@ void linkClient(client *c); void protectClient(client *c); void unprotectClient(client *c); void initThreadedIO(void); +client *lookupClientByID(uint64_t id); #ifdef __GNUC__ void addReplyErrorFormat(client *c, const char *fmt, ...) @@ -1602,6 +1633,12 @@ void addReplyErrorFormat(client *c, const char *fmt, ...); void addReplyStatusFormat(client *c, const char *fmt, ...); #endif +/* Client side caching (tracking mode) */ +void enableTracking(client *c, uint64_t redirect_to); +void disableTracking(client *c); +void trackingRememberKeys(client *c); +void trackingInvalidateKey(robj *keyobj); + /* List data type */ void listTypeTryConversion(robj *subject, robj *value); void listTypePush(robj *subject, robj *value, int where); @@ -1714,7 +1751,8 @@ void replicationCacheMasterUsingMyself(void); void feedReplicationBacklog(void *ptr, size_t len); /* Generic persistence functions */ -void startLoading(FILE *fp); +void startLoadingFile(FILE* fp, char* filename); +void startLoading(size_t size); void loadingProgress(off_t pos); void stopLoading(void); @@ -1926,6 +1964,7 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify); void freePubsubPattern(void *p); int listMatchPubsubPattern(void *a, void *b); int pubsubPublishMessage(robj *channel, robj *message); +void addReplyPubsubMessage(client *c, robj *channel, robj *msg); /* Keyspace events notification */ void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid); @@ -1970,6 +2009,8 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o); #define EMPTYDB_NO_FLAGS 0 /* No flags. */ #define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */ long long emptyDb(int dbnum, int flags, void(callback)(void*)); +long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)); +long long dbTotalServerKeyCount(); int selectDb(client *c, int id); void signalModifiedKey(redisDb *db, robj *key); diff --git a/src/tracking.c b/src/tracking.c new file mode 100644 index 000000000..bbfc66a72 --- /dev/null +++ b/src/tracking.c @@ -0,0 +1,175 @@ +/* tracking.c - Client side caching: keys tracking and invalidation + * + * Copyright (c) 2019, Salvatore Sanfilippo <antirez at gmail dot com> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "server.h" + +/* The tracking table is constituted by 2^24 radix trees (each tree, and the + * table itself, are allocated in a lazy way only when needed) tracking + * clients that may have certain keys in their local, client side, cache. + * + * Keys are grouped into 2^24 slots, in a way similar to Redis Cluster hash + * slots, however here the function we use is crc64, taking the least + * significant 24 bits of the output. + * + * When a client enables tracking with "CLIENT TRACKING on", each key served to + * the client is hashed to one of such slots, and Redis will remember what + * client may have keys about such slot. Later, when a key in a given slot is + * modified, all the clients that may have local copies of keys in that slot + * will receive an invalidation message. There is no distinction of database + * number: a single table is used. + * + * Clients will normally take frequently requested objects in memory, removing + * them when invalidation messages are received. A strategy clients may use is + * to just cache objects in a dictionary, associating to each cached object + * some incremental epoch, or just a timestamp. When invalidation messages are + * received clients may store, in a different table, the timestamp (or epoch) + * of the invalidation of such given slot: later when accessing objects, the + * eviction of stale objects may be performed in a lazy way by checking if the + * cached object timestamp is older than the invalidation timestamp for such + * objects. + * + * The output of the 24 bit hash function is very large (more than 16 million + * possible slots), so clients that may want to use less resources may only + * use the most significant bits instead of the full 24 bits. */ +#define TRACKING_TABLE_SIZE (1<<24) +rax **TrackingTable = NULL; +robj *TrackingChannelName; + +/* Remove the tracking state from the client 'c'. Note that there is not much + * to do for us here, if not to decrement the counter of the clients in + * tracking mode, because we just store the ID of the client in the tracking + * table, so we'll remove the ID reference in a lazy way. Otherwise when a + * client with many entries in the table is removed, it would cost a lot of + * time to do the cleanup. */ +void disableTracking(client *c) { + if (c->flags & CLIENT_TRACKING) { + server.tracking_clients--; + c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR); + } +} + +/* Enable the tracking state for the client 'c', and as a side effect allocates + * the tracking table if needed. If the 'redirect_to' argument is non zero, the + * invalidation messages for this client will be sent to the client ID + * specified by the 'redirect_to' argument. Note that if such client will + * eventually get freed, we'll send a message to the original client to + * inform it of the condition. Multiple clients can redirect the invalidation + * messages to the same client ID. */ +void enableTracking(client *c, uint64_t redirect_to) { + if (c->flags & CLIENT_TRACKING) return; + c->flags |= CLIENT_TRACKING; + c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR; + c->client_tracking_redirection = redirect_to; + server.tracking_clients++; + if (TrackingTable == NULL) { + TrackingTable = zcalloc(sizeof(rax*) * TRACKING_TABLE_SIZE); + TrackingChannelName = createStringObject("__redis__:invalidate",20); + } +} + +/* This function is called after the excution of a readonly command in the + * case the client 'c' has keys tracking enabled. It will populate the + * tracking ivalidation table according to the keys the user fetched, so that + * Redis will know what are the clients that should receive an invalidation + * message with certain groups of keys are modified. */ +void trackingRememberKeys(client *c) { + int numkeys; + int *keys = getKeysFromCommand(c->cmd,c->argv,c->argc,&numkeys); + if (keys == NULL) return; + + for(int j = 0; j < numkeys; j++) { + int idx = keys[j]; + sds sdskey = c->argv[idx]->ptr; + uint64_t hash = crc64(0, + (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1); + if (TrackingTable[hash] == NULL) + TrackingTable[hash] = raxNew(); + raxTryInsert(TrackingTable[hash], + (unsigned char*)&c->id,sizeof(c->id),NULL,NULL); + } + getKeysFreeResult(keys); +} + +/* This function is called from signalModifiedKey() or other places in Redis + * when a key changes value. In the context of keys tracking, our task here is + * to send a notification to every client that may have keys about such . */ +void trackingInvalidateKey(robj *keyobj) { + sds sdskey = keyobj->ptr; + uint64_t hash = crc64(0, + (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1); + if (TrackingTable == NULL || TrackingTable[hash] == NULL) return; + + raxIterator ri; + raxStart(&ri,TrackingTable[hash]); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + uint64_t id; + memcpy(&id,ri.key,ri.key_len); + client *c = lookupClientByID(id); + if (c == NULL) continue; + int using_redirection = 0; + if (c->client_tracking_redirection) { + client *redir = lookupClientByID(c->client_tracking_redirection); + if (!redir) { + /* We need to signal to the original connection that we + * are unable to send invalidation messages to the redirected + * connection, because the client no longer exist. */ + if (c->resp > 2) { + addReplyPushLen(c,3); + addReplyBulkCBuffer(c,"tracking-redir-broken",21); + addReplyLongLong(c,c->client_tracking_redirection); + } + continue; + } + c = redir; + using_redirection = 1; + } + + /* Only send such info for clients in RESP version 3 or more. However + * if redirection is active, and the connection we redirect to is + * in Pub/Sub mode, we can support the feature with RESP 2 as well, + * by sending Pub/Sub messages in the __redis__:invalidate channel. */ + if (c->resp > 2) { + addReplyPushLen(c,2); + addReplyBulkCBuffer(c,"invalidate",10); + addReplyLongLong(c,hash); + } else if (using_redirection && c->flags & CLIENT_PUBSUB) { + robj *msg = createStringObjectFromLongLong(hash); + addReplyPubsubMessage(c,TrackingChannelName,msg); + decrRefCount(msg); + } + } + raxStop(&ri); + + /* Free the tracking table: we'll create the radix tree and populate it + * again if more keys will be modified in this hash slot. */ + raxFree(TrackingTable[hash]); + TrackingTable[hash] = NULL; +} diff --git a/tests/integration/replication-4.tcl b/tests/integration/replication-4.tcl index 3c6df52a8..54891151b 100644 --- a/tests/integration/replication-4.tcl +++ b/tests/integration/replication-4.tcl @@ -1,12 +1,3 @@ -proc start_bg_complex_data {host port db ops} { - set tclsh [info nameofexecutable] - exec $tclsh tests/helpers/bg_complex_data.tcl $host $port $db $ops & -} - -proc stop_bg_complex_data {handle} { - catch {exec /bin/kill -9 $handle} -} - start_server {tags {"repl"}} { start_server {} { diff --git a/tests/integration/replication-psync.tcl b/tests/integration/replication-psync.tcl index bf8682446..3c98723af 100644 --- a/tests/integration/replication-psync.tcl +++ b/tests/integration/replication-psync.tcl @@ -1,12 +1,3 @@ -proc start_bg_complex_data {host port db ops} { - set tclsh [info nameofexecutable] - exec $tclsh tests/helpers/bg_complex_data.tcl $host $port $db $ops & -} - -proc stop_bg_complex_data {handle} { - catch {exec /bin/kill -9 $handle} -} - # Creates a master-slave pair and breaks the link continuously to force # partial resyncs attempts, all this while flooding the master with # write queries. @@ -17,7 +8,7 @@ proc stop_bg_complex_data {handle} { # If reconnect is > 0, the test actually try to break the connection and # reconnect with the master, otherwise just the initial synchronization is # checked for consistency. -proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless reconnect} { +proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} { start_server {tags {"repl"}} { start_server {} { @@ -28,8 +19,9 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec $master config set repl-backlog-size $backlog_size $master config set repl-backlog-ttl $backlog_ttl - $master config set repl-diskless-sync $diskless + $master config set repl-diskless-sync $mdl $master config set repl-diskless-sync-delay 1 + $slave config set repl-diskless-load $sdl set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000] set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000] @@ -54,7 +46,7 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec } } - test "Test replication partial resync: $descr (diskless: $diskless, reconnect: $reconnect)" { + test "Test replication partial resync: $descr (diskless: $mdl, $sdl, reconnect: $reconnect)" { # Now while the clients are writing data, break the maste-slave # link multiple times. if ($reconnect) { @@ -132,23 +124,25 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec } } -foreach diskless {no yes} { - test_psync {no reconnection, just sync} 6 1000000 3600 0 { - } $diskless 0 +foreach mdl {no yes} { + foreach sdl {disabled swapdb} { + test_psync {no reconnection, just sync} 6 1000000 3600 0 { + } $mdl $sdl 0 - test_psync {ok psync} 6 100000000 3600 0 { + test_psync {ok psync} 6 100000000 3600 0 { assert {[s -1 sync_partial_ok] > 0} - } $diskless 1 + } $mdl $sdl 1 - test_psync {no backlog} 6 100 3600 0.5 { + test_psync {no backlog} 6 100 3600 0.5 { assert {[s -1 sync_partial_err] > 0} - } $diskless 1 + } $mdl $sdl 1 - test_psync {ok after delay} 3 100000000 3600 3 { + test_psync {ok after delay} 3 100000000 3600 3 { assert {[s -1 sync_partial_ok] > 0} - } $diskless 1 + } $mdl $sdl 1 - test_psync {backlog expired} 3 100000000 1 3 { + test_psync {backlog expired} 3 100000000 1 3 { assert {[s -1 sync_partial_err] > 0} - } $diskless 1 + } $mdl $sdl 1 + } } diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 0e50c20a9..d69a1761a 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -183,85 +183,92 @@ start_server {tags {"repl"}} { } } -foreach dl {no yes} { - start_server {tags {"repl"}} { - set master [srv 0 client] - $master config set repl-diskless-sync $dl - set master_host [srv 0 host] - set master_port [srv 0 port] - set slaves {} - set load_handle0 [start_write_load $master_host $master_port 3] - set load_handle1 [start_write_load $master_host $master_port 5] - set load_handle2 [start_write_load $master_host $master_port 20] - set load_handle3 [start_write_load $master_host $master_port 8] - set load_handle4 [start_write_load $master_host $master_port 4] - start_server {} { - lappend slaves [srv 0 client] +foreach mdl {no yes} { + foreach sdl {disabled swapdb} { + start_server {tags {"repl"}} { + set master [srv 0 client] + $master config set repl-diskless-sync $mdl + $master config set repl-diskless-sync-delay 1 + set master_host [srv 0 host] + set master_port [srv 0 port] + set slaves {} + set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000] + set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000] + set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000000] + set load_handle3 [start_write_load $master_host $master_port 8] + set load_handle4 [start_write_load $master_host $master_port 4] + after 5000 ;# wait for some data to accumulate so that we have RDB part for the fork start_server {} { lappend slaves [srv 0 client] start_server {} { lappend slaves [srv 0 client] - test "Connect multiple replicas at the same time (issue #141), diskless=$dl" { - # Send SLAVEOF commands to slaves - [lindex $slaves 0] slaveof $master_host $master_port - [lindex $slaves 1] slaveof $master_host $master_port - [lindex $slaves 2] slaveof $master_host $master_port - - # Wait for all the three slaves to reach the "online" - # state from the POV of the master. - set retry 500 - while {$retry} { - set info [r -3 info] - if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} { - break + start_server {} { + lappend slaves [srv 0 client] + test "Connect multiple replicas at the same time (issue #141), master diskless=$mdl, replica diskless=$sdl" { + # Send SLAVEOF commands to slaves + [lindex $slaves 0] config set repl-diskless-load $sdl + [lindex $slaves 1] config set repl-diskless-load $sdl + [lindex $slaves 2] config set repl-diskless-load $sdl + [lindex $slaves 0] slaveof $master_host $master_port + [lindex $slaves 1] slaveof $master_host $master_port + [lindex $slaves 2] slaveof $master_host $master_port + + # Wait for all the three slaves to reach the "online" + # state from the POV of the master. + set retry 500 + while {$retry} { + set info [r -3 info] + if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} { + break + } else { + incr retry -1 + after 100 + } + } + if {$retry == 0} { + error "assertion:Slaves not correctly synchronized" + } + + # Wait that slaves acknowledge they are online so + # we are sure that DBSIZE and DEBUG DIGEST will not + # fail because of timing issues. + wait_for_condition 500 100 { + [lindex [[lindex $slaves 0] role] 3] eq {connected} && + [lindex [[lindex $slaves 1] role] 3] eq {connected} && + [lindex [[lindex $slaves 2] role] 3] eq {connected} } else { - incr retry -1 - after 100 + fail "Slaves still not connected after some time" } - } - if {$retry == 0} { - error "assertion:Replicas not correctly synchronized" - } - # Wait that slaves acknowledge they are online so - # we are sure that DBSIZE and DEBUG DIGEST will not - # fail because of timing issues. - wait_for_condition 500 100 { - [lindex [[lindex $slaves 0] role] 3] eq {connected} && - [lindex [[lindex $slaves 1] role] 3] eq {connected} && - [lindex [[lindex $slaves 2] role] 3] eq {connected} - } else { - fail "Replicas still not connected after some time" - } + # Stop the write load + stop_bg_complex_data $load_handle0 + stop_bg_complex_data $load_handle1 + stop_bg_complex_data $load_handle2 + stop_write_load $load_handle3 + stop_write_load $load_handle4 + + # Make sure that slaves and master have same + # number of keys + wait_for_condition 500 100 { + [$master dbsize] == [[lindex $slaves 0] dbsize] && + [$master dbsize] == [[lindex $slaves 1] dbsize] && + [$master dbsize] == [[lindex $slaves 2] dbsize] + } else { + fail "Different number of keys between master and replica after too long time." + } - # Stop the write load - stop_write_load $load_handle0 - stop_write_load $load_handle1 - stop_write_load $load_handle2 - stop_write_load $load_handle3 - stop_write_load $load_handle4 - - # Make sure that slaves and master have same - # number of keys - wait_for_condition 500 100 { - [$master dbsize] == [[lindex $slaves 0] dbsize] && - [$master dbsize] == [[lindex $slaves 1] dbsize] && - [$master dbsize] == [[lindex $slaves 2] dbsize] - } else { - fail "Different number of keys between masted and replica after too long time." + # Check digests + set digest [$master debug digest] + set digest0 [[lindex $slaves 0] debug digest] + set digest1 [[lindex $slaves 1] debug digest] + set digest2 [[lindex $slaves 2] debug digest] + assert {$digest ne 0000000000000000000000000000000000000000} + assert {$digest eq $digest0} + assert {$digest eq $digest1} + assert {$digest eq $digest2} } - - # Check digests - set digest [$master debug digest] - set digest0 [[lindex $slaves 0] debug digest] - set digest1 [[lindex $slaves 1] debug digest] - set digest2 [[lindex $slaves 2] debug digest] - assert {$digest ne 0000000000000000000000000000000000000000} - assert {$digest eq $digest0} - assert {$digest eq $digest1} - assert {$digest eq $digest2} - } - } + } + } } } } @@ -309,3 +316,70 @@ start_server {tags {"repl"}} { } } } + +test {slave fails full sync and diskless load swapdb recoveres it} { + start_server {tags {"repl"}} { + set slave [srv 0 client] + set slave_host [srv 0 host] + set slave_port [srv 0 port] + set slave_log [srv 0 stdout] + start_server {} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + # Put different data sets on the master and slave + # we need to put large keys on the master since the slave replies to info only once in 2mb + $slave debug populate 2000 slave 10 + $master debug populate 200 master 100000 + $master config set rdbcompression no + + # Set master and slave to use diskless replication + $master config set repl-diskless-sync yes + $master config set repl-diskless-sync-delay 0 + $slave config set repl-diskless-load swapdb + + # Set master with a slow rdb generation, so that we can easily disconnect it mid sync + # 10ms per key, with 200 keys is 2 seconds + $master config set rdb-key-save-delay 10000 + + # Start the replication process... + $slave slaveof $master_host $master_port + + # wait for the slave to start reading the rdb + wait_for_condition 50 100 { + [s -1 loading] eq 1 + } else { + fail "Replica didn't get into loading mode" + } + + # make sure that next sync will not start immediately so that we can catch the slave in betweeen syncs + $master config set repl-diskless-sync-delay 5 + # for faster server shutdown, make rdb saving fast again (the fork is already uses the slow one) + $master config set rdb-key-save-delay 0 + + # waiting slave to do flushdb (key count drop) + wait_for_condition 50 100 { + 2000 != [scan [regexp -inline {keys\=([\d]*)} [$slave info keyspace]] keys=%d] + } else { + fail "Replica didn't flush" + } + + # make sure we're still loading + assert_equal [s -1 loading] 1 + + # kill the slave connection on the master + set killed [$master client kill type slave] + + # wait for loading to stop (fail) + wait_for_condition 50 100 { + [s -1 loading] eq 0 + } else { + fail "Replica didn't disconnect" + } + + # make sure the original keys were restored + assert_equal [$slave dbsize] 2000 + } + } +} diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 74f491e48..41cc5612a 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -399,3 +399,15 @@ proc lshuffle {list} { } return $slist } + +# Execute a background process writing complex data for the specified number +# of ops to the specified Redis instance. +proc start_bg_complex_data {host port db ops} { + set tclsh [info nameofexecutable] + exec $tclsh tests/helpers/bg_complex_data.tcl $host $port $db $ops & +} + +# Stop a process generating write load executed with start_bg_complex_data. +proc stop_bg_complex_data {handle} { + catch {exec /bin/kill -9 $handle} +} diff --git a/tests/unit/geo.tcl b/tests/unit/geo.tcl index 604697be4..49e421ee9 100644 --- a/tests/unit/geo.tcl +++ b/tests/unit/geo.tcl @@ -61,6 +61,7 @@ set regression_vectors { {939895 151 59.149620271823181 65.204186651485145} {1412 156 149.29737817929004 15.95807862745508} {564862 149 84.062063109158544 -65.685403922426232} + {1546032440391 16751 -1.8175081637769495 20.665668878082954} } set rv_idx 0 @@ -274,8 +275,19 @@ start_server {tags {"geo"}} { foreach place $diff { set mydist [geo_distance $lon $lat $search_lon $search_lat] set mydist [expr $mydist/1000] - if {($mydist / $radius_km) > 0.999} {incr rounding_errors} + if {($mydist / $radius_km) > 0.999} { + incr rounding_errors + continue + } + if {$mydist < $radius_m} { + # This is a false positive for redis since given the + # same points the higher precision calculation provided + # by TCL shows the point within range + incr rounding_errors + continue + } } + # Make sure this is a real error and not a rounidng issue. if {[llength $diff] == $rounding_errors} { set res $res2; # Error silenced diff --git a/tests/unit/scan.tcl b/tests/unit/scan.tcl index c0f4349d2..9f9ff4df2 100644 --- a/tests/unit/scan.tcl +++ b/tests/unit/scan.tcl @@ -53,6 +53,51 @@ start_server {tags {"scan"}} { assert_equal 100 [llength $keys] } + test "SCAN TYPE" { + r flushdb + # populate only creates strings + r debug populate 1000 + + # Check non-strings are excluded + set cur 0 + set keys {} + while 1 { + set res [r scan $cur type "list"] + set cur [lindex $res 0] + set k [lindex $res 1] + lappend keys {*}$k + if {$cur == 0} break + } + + assert_equal 0 [llength $keys] + + # Check strings are included + set cur 0 + set keys {} + while 1 { + set res [r scan $cur type "string"] + set cur [lindex $res 0] + set k [lindex $res 1] + lappend keys {*}$k + if {$cur == 0} break + } + + assert_equal 1000 [llength $keys] + + # Check all three args work together + set cur 0 + set keys {} + while 1 { + set res [r scan $cur type "string" match "key:*" count 10] + set cur [lindex $res 0] + set k [lindex $res 1] + lappend keys {*}$k + if {$cur == 0} break + } + + assert_equal 1000 [llength $keys] + } + foreach enc {intset hashtable} { test "SSCAN with encoding $enc" { # Create the Set |