summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOran Agra <oran@redislabs.com>2019-07-30 15:14:08 +0300
committerOran Agra <oran@redislabs.com>2019-07-30 15:14:08 +0300
commite5187ad2aeb9b2d438af0b728b9de4e8e2cc8f33 (patch)
tree20b51660de8def2533911139ca63ca46c48e317e
parent4339706e07e15fe5a228d175756c4e4be83e2867 (diff)
parentf42846e8c7127c8e84b6938a6bd73e2363b4a90c (diff)
downloadredis-e5187ad2aeb9b2d438af0b728b9de4e8e2cc8f33.tar.gz
Merge remote-tracking branch 'oss/unstable' into module_rdb_load_errors
-rw-r--r--README.md2
-rw-r--r--redis.conf121
-rwxr-xr-xruntest-moduleapi2
-rw-r--r--src/aof.c4
-rw-r--r--src/cluster.c13
-rw-r--r--src/config.c24
-rw-r--r--src/db.c10
-rw-r--r--src/expire.c2
-rw-r--r--src/hyperloglog.c1
-rw-r--r--src/module.c51
-rw-r--r--src/rdb.c254
-rw-r--r--src/rdb.h1
-rw-r--r--src/redis-benchmark.c15
-rw-r--r--src/redis-check-rdb.c6
-rw-r--r--src/redismodule.h11
-rw-r--r--src/rio.c4
-rw-r--r--src/rio.h35
-rw-r--r--src/server.c13
-rw-r--r--src/server.h15
-rw-r--r--src/stream.h1
-rw-r--r--src/t_zset.c5
-rw-r--r--src/tracking.c207
-rw-r--r--tests/integration/replication.tcl97
-rw-r--r--tests/modules/Makefile6
-rw-r--r--tests/modules/testrdb.c229
-rw-r--r--tests/support/util.tcl19
-rw-r--r--tests/unit/geo.tcl14
-rw-r--r--tests/unit/moduleapi/testrdb.tcl62
28 files changed, 1034 insertions, 190 deletions
diff --git a/README.md b/README.md
index 6c9435b53..3442659e6 100644
--- a/README.md
+++ b/README.md
@@ -406,7 +406,7 @@ replicas, or to continue the replication after a disconnection.
Other C files
---
-* `t_hash.c`, `t_list.c`, `t_set.c`, `t_string.c` and `t_zset.c` contains the implementation of the Redis data types. They implement both an API to access a given data type, and the client commands implementations for these data types.
+* `t_hash.c`, `t_list.c`, `t_set.c`, `t_string.c`, `t_zset.c` and `t_stream.c` contains the implementation of the Redis data types. They implement both an API to access a given data type, and the client commands implementations for these data types.
* `ae.c` implements the Redis event loop, it's a self contained library which is simple to read and understand.
* `sds.c` is the Redis string library, check http://github.com/antirez/sds for more information.
* `anet.c` is a library to use POSIX networking in a simpler way compared to the raw interface exposed by the kernel.
diff --git a/redis.conf b/redis.conf
index 74b6c018f..50ba823ac 100644
--- a/redis.conf
+++ b/redis.conf
@@ -336,13 +336,11 @@ replica-read-only yes
# Replication SYNC strategy: disk or socket.
#
-# -------------------------------------------------------
-# WARNING: DISKLESS REPLICATION IS EXPERIMENTAL CURRENTLY
-# -------------------------------------------------------
+# New replicas and reconnecting replicas that are not able to continue the
+# replication process just receiving differences, need to do what is called a
+# "full synchronization". An RDB file is transmitted from the master to the
+# replicas.
#
-# New replicas and reconnecting replicas that are not able to continue the replication
-# process just receiving differences, need to do what is called a "full
-# synchronization". An RDB file is transmitted from the master to the replicas.
# The transmission can happen in two different ways:
#
# 1) Disk-backed: The Redis master creates a new process that writes the RDB
@@ -352,14 +350,14 @@ replica-read-only yes
# RDB file to replica sockets, without touching the disk at all.
#
# With disk-backed replication, while the RDB file is generated, more replicas
-# can be queued and served with the RDB file as soon as the current child producing
-# the RDB file finishes its work. With diskless replication instead once
-# the transfer starts, new replicas arriving will be queued and a new transfer
-# will start when the current one terminates.
+# can be queued and served with the RDB file as soon as the current child
+# producing the RDB file finishes its work. With diskless replication instead
+# once the transfer starts, new replicas arriving will be queued and a new
+# transfer will start when the current one terminates.
#
# When diskless replication is used, the master waits a configurable amount of
-# time (in seconds) before starting the transfer in the hope that multiple replicas
-# will arrive and the transfer can be parallelized.
+# time (in seconds) before starting the transfer in the hope that multiple
+# replicas will arrive and the transfer can be parallelized.
#
# With slow disks and fast (large bandwidth) networks, diskless replication
# works better.
@@ -370,22 +368,32 @@ repl-diskless-sync no
# to the replicas.
#
# This is important since once the transfer starts, it is not possible to serve
-# new replicas arriving, that will be queued for the next RDB transfer, so the server
-# waits a delay in order to let more replicas arrive.
+# new replicas arriving, that will be queued for the next RDB transfer, so the
+# server waits a delay in order to let more replicas arrive.
#
# The delay is specified in seconds, and by default is 5 seconds. To disable
# it entirely just set it to 0 seconds and the transfer will start ASAP.
repl-diskless-sync-delay 5
-# Replica can load the rdb it reads from the replication link directly from the
-# socket, or store the rdb to a file and read that file after it was completely
+# -----------------------------------------------------------------------------
+# WARNING: RDB diskless load is experimental. Since in this setup the replica
+# does not immediately store an RDB on disk, it may cause data loss during
+# failovers. RDB diskless load + Redis modules not handling I/O reads may also
+# cause Redis to abort in case of I/O errors during the initial synchronization
+# stage with the master. Use only if your do what you are doing.
+# -----------------------------------------------------------------------------
+#
+# Replica can load the RDB it reads from the replication link directly from the
+# socket, or store the RDB to a file and read that file after it was completely
# recived from the master.
+#
# In many cases the disk is slower than the network, and storing and loading
-# the rdb file may increase replication time (and even increase the master's
+# the RDB file may increase replication time (and even increase the master's
# Copy on Write memory and salve buffers).
-# However, parsing the rdb file directly from the socket may mean that we have
-# to flush the contents of the current database before the full rdb was received.
-# for this reason we have the following options:
+# However, parsing the RDB file directly from the socket may mean that we have
+# to flush the contents of the current database before the full rdb was
+# received. For this reason we have the following options:
+#
# "disabled" - Don't use diskless load (store the rdb file to the disk first)
# "on-empty-db" - Use diskless load only when it is completely safe.
# "swapdb" - Keep a copy of the current db contents in RAM while parsing
@@ -393,9 +401,9 @@ repl-diskless-sync-delay 5
# sufficient memory, if you don't have it, you risk an OOM kill.
repl-diskless-load disabled
-# Replicas send PINGs to server in a predefined interval. It's possible to change
-# this interval with the repl_ping_replica_period option. The default value is 10
-# seconds.
+# Replicas send PINGs to server in a predefined interval. It's possible to
+# change this interval with the repl_ping_replica_period option. The default
+# value is 10 seconds.
#
# repl-ping-replica-period 10
@@ -427,10 +435,10 @@ repl-diskless-load disabled
repl-disable-tcp-nodelay no
# Set the replication backlog size. The backlog is a buffer that accumulates
-# replica data when replicas are disconnected for some time, so that when a replica
-# wants to reconnect again, often a full resync is not needed, but a partial
-# resync is enough, just passing the portion of data the replica missed while
-# disconnected.
+# replica data when replicas are disconnected for some time, so that when a
+# replica wants to reconnect again, often a full resync is not needed, but a
+# partial resync is enough, just passing the portion of data the replica
+# missed while disconnected.
#
# The bigger the replication backlog, the longer the time the replica can be
# disconnected and later be able to perform a partial resynchronization.
@@ -452,13 +460,13 @@ repl-disable-tcp-nodelay no
#
# repl-backlog-ttl 3600
-# The replica priority is an integer number published by Redis in the INFO output.
-# It is used by Redis Sentinel in order to select a replica to promote into a
-# master if the master is no longer working correctly.
+# The replica priority is an integer number published by Redis in the INFO
+# output. It is used by Redis Sentinel in order to select a replica to promote
+# into a master if the master is no longer working correctly.
#
# A replica with a low priority number is considered better for promotion, so
-# for instance if there are three replicas with priority 10, 100, 25 Sentinel will
-# pick the one with priority 10, that is the lowest.
+# for instance if there are three replicas with priority 10, 100, 25 Sentinel
+# will pick the one with priority 10, that is the lowest.
#
# However a special priority of 0 marks the replica as not able to perform the
# role of master, so a replica with priority of 0 will never be selected by
@@ -518,6 +526,39 @@ replica-priority 100
# replica-announce-ip 5.5.5.5
# replica-announce-port 1234
+############################### KEYS TRACKING #################################
+
+# Redis implements server assisted support for client side caching of values.
+# This is implemented using an invalidation table that remembers, using
+# 16 millions of slots, what clients may have certain subsets of keys. In turn
+# this is used in order to send invalidation messages to clients. Please
+# to understand more about the feature check this page:
+#
+# https://redis.io/topics/client-side-caching
+#
+# When tracking is enabled for a client, all the read only queries are assumed
+# to be cached: this will force Redis to store information in the invalidation
+# table. When keys are modified, such information is flushed away, and
+# invalidation messages are sent to the clients. However if the workload is
+# heavily dominated by reads, Redis could use more and more memory in order
+# to track the keys fetched by many clients.
+#
+# For this reason it is possible to configure a maximum fill value for the
+# invalidation table. By default it is set to 10%, and once this limit is
+# reached, Redis will start to evict caching slots in the invalidation table
+# even if keys are not modified, just to reclaim memory: this will in turn
+# force the clients to invalidate the cached values. Basically the table
+# maximum fill rate is a trade off between the memory you want to spend server
+# side to track information about who cached what, and the ability of clients
+# to retain cached objects in memory.
+#
+# If you set the value to 0, it means there are no limits, and all the 16
+# millions of caching slots can be used at the same time. In the "stats"
+# INFO section, you can find information about the amount of caching slots
+# used at every given moment.
+#
+# tracking-table-max-fill 10
+
################################## SECURITY ###################################
# Warning: since Redis is pretty fast an outside user can try up to
@@ -747,17 +788,17 @@ replica-priority 100
# DEL commands to the replica as keys evict in the master side.
#
# This behavior ensures that masters and replicas stay consistent, and is usually
-# what you want, however if your replica is writable, or you want the replica to have
-# a different memory setting, and you are sure all the writes performed to the
-# replica are idempotent, then you may change this default (but be sure to understand
-# what you are doing).
+# what you want, however if your replica is writable, or you want the replica
+# to have a different memory setting, and you are sure all the writes performed
+# to the replica are idempotent, then you may change this default (but be sure
+# to understand what you are doing).
#
# Note that since the replica by default does not evict, it may end using more
# memory than the one set via maxmemory (there are certain buffers that may
-# be larger on the replica, or data structures may sometimes take more memory and so
-# forth). So make sure you monitor your replicas and make sure they have enough
-# memory to never hit a real out-of-memory condition before the master hits
-# the configured maxmemory setting.
+# be larger on the replica, or data structures may sometimes take more memory
+# and so forth). So make sure you monitor your replicas and make sure they
+# have enough memory to never hit a real out-of-memory condition before the
+# master hits the configured maxmemory setting.
#
# replica-ignore-maxmemory yes
diff --git a/runtest-moduleapi b/runtest-moduleapi
index 84cdb9bb8..8e1c0cb23 100755
--- a/runtest-moduleapi
+++ b/runtest-moduleapi
@@ -13,4 +13,4 @@ then
fi
make -C tests/modules && \
-$TCLSH tests/test_helper.tcl --single unit/moduleapi/commandfilter "${@}"
+$TCLSH tests/test_helper.tcl --single unit/moduleapi/commandfilter --single unit/moduleapi/testrdb "${@}"
diff --git a/src/aof.c b/src/aof.c
index 565ee8073..ae9c4bb68 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -303,9 +303,7 @@ ssize_t aofWrite(int fd, const char *buf, size_t len) {
nwritten = write(fd, buf, len);
if (nwritten < 0) {
- if (errno == EINTR) {
- continue;
- }
+ if (errno == EINTR) continue;
return totwritten ? totwritten : -1;
}
diff --git a/src/cluster.c b/src/cluster.c
index c85e3791d..e22222700 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -4251,12 +4251,7 @@ NULL
}
} else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
/* CLUSTER NODES */
- robj *o;
- sds ci = clusterGenNodesDescription(0);
-
- o = createObject(OBJ_STRING,ci);
- addReplyBulk(c,o);
- decrRefCount(o);
+ addReplyBulkSds(c,clusterGenNodesDescription(0));
} else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
/* CLUSTER MYID */
addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN);
@@ -4832,7 +4827,7 @@ int verifyDumpPayload(unsigned char *p, size_t len) {
* DUMP is actually not used by Redis Cluster but it is the obvious
* complement of RESTORE and can be useful for different applications. */
void dumpCommand(client *c) {
- robj *o, *dumpobj;
+ robj *o;
rio payload;
/* Check if the key is here. */
@@ -4845,9 +4840,7 @@ void dumpCommand(client *c) {
createDumpPayload(&payload,o,c->argv[1]);
/* Transfer to the client */
- dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr);
- addReplyBulk(c,dumpobj);
- decrRefCount(dumpobj);
+ addReplyBulkSds(c,payload.io.buffer.ptr);
return;
}
diff --git a/src/config.c b/src/config.c
index fde00ddf5..a72df2e78 100644
--- a/src/config.c
+++ b/src/config.c
@@ -686,6 +686,17 @@ void loadServerConfigFromString(char *config) {
}
} else if (!strcasecmp(argv[0],"slowlog-max-len") && argc == 2) {
server.slowlog_max_len = strtoll(argv[1],NULL,10);
+ } else if (!strcasecmp(argv[0],"tracking-table-max-fill") &&
+ argc == 2)
+ {
+ server.tracking_table_max_fill = strtoll(argv[1],NULL,10);
+ if (server.tracking_table_max_fill > 100 ||
+ server.tracking_table_max_fill < 0)
+ {
+ err = "The tracking table fill percentage must be an "
+ "integer between 0 and 100";
+ goto loaderr;
+ }
} else if (!strcasecmp(argv[0],"client-output-buffer-limit") &&
argc == 5)
{
@@ -1134,6 +1145,8 @@ void configSetCommand(client *c) {
/* Cast to unsigned. */
server.slowlog_max_len = (unsigned long)ll;
} config_set_numerical_field(
+ "tracking-table-max-fill",server.tracking_table_max_fill,0,100) {
+ } config_set_numerical_field(
"latency-monitor-threshold",server.latency_monitor_threshold,0,LLONG_MAX){
} config_set_numerical_field(
"repl-ping-slave-period",server.repl_ping_slave_period,1,INT_MAX) {
@@ -1338,8 +1351,8 @@ void configGetCommand(client *c) {
server.slowlog_log_slower_than);
config_get_numerical_field("latency-monitor-threshold",
server.latency_monitor_threshold);
- config_get_numerical_field("slowlog-max-len",
- server.slowlog_max_len);
+ config_get_numerical_field("slowlog-max-len", server.slowlog_max_len);
+ config_get_numerical_field("tracking-table-max-fill", server.tracking_table_max_fill);
config_get_numerical_field("port",server.port);
config_get_numerical_field("cluster-announce-port",server.cluster_announce_port);
config_get_numerical_field("cluster-announce-bus-port",server.cluster_announce_bus_port);
@@ -1470,12 +1483,10 @@ void configGetCommand(client *c) {
matches++;
}
if (stringmatch(pattern,"notify-keyspace-events",1)) {
- robj *flagsobj = createObject(OBJ_STRING,
- keyspaceEventsFlagsToString(server.notify_keyspace_events));
+ sds flags = keyspaceEventsFlagsToString(server.notify_keyspace_events);
addReplyBulkCString(c,"notify-keyspace-events");
- addReplyBulk(c,flagsobj);
- decrRefCount(flagsobj);
+ addReplyBulkSds(c,flags);
matches++;
}
if (stringmatch(pattern,"bind",1)) {
@@ -2167,6 +2178,7 @@ int rewriteConfig(char *path) {
rewriteConfigNumericalOption(state,"slowlog-log-slower-than",server.slowlog_log_slower_than,CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN);
rewriteConfigNumericalOption(state,"latency-monitor-threshold",server.latency_monitor_threshold,CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD);
rewriteConfigNumericalOption(state,"slowlog-max-len",server.slowlog_max_len,CONFIG_DEFAULT_SLOWLOG_MAX_LEN);
+ rewriteConfigNumericalOption(state,"tracking-table-max-fill",server.tracking_table_max_fill,CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL);
rewriteConfigNotifykeyspaceeventsOption(state);
rewriteConfigNumericalOption(state,"hash-max-ziplist-entries",server.hash_max_ziplist_entries,OBJ_HASH_MAX_ZIPLIST_ENTRIES);
rewriteConfigNumericalOption(state,"hash-max-ziplist-value",server.hash_max_ziplist_value,OBJ_HASH_MAX_ZIPLIST_VALUE);
diff --git a/src/db.c b/src/db.c
index 51f5a12b4..95eaf04e9 100644
--- a/src/db.c
+++ b/src/db.c
@@ -353,6 +353,11 @@ long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(
return -1;
}
+ /* Make sure the WATCHed keys are affected by the FLUSH* commands.
+ * Note that we need to call the function while the keys are still
+ * there. */
+ signalFlushedDb(dbnum);
+
int startdb, enddb;
if (dbnum == -1) {
startdb = 0;
@@ -412,11 +417,12 @@ long long dbTotalServerKeyCount() {
void signalModifiedKey(redisDb *db, robj *key) {
touchWatchedKey(db,key);
- if (server.tracking_clients) trackingInvalidateKey(key);
+ trackingInvalidateKey(key);
}
void signalFlushedDb(int dbid) {
touchWatchedKeysOnFlush(dbid);
+ trackingInvalidateKeysOnFlush(dbid);
}
/*-----------------------------------------------------------------------------
@@ -452,7 +458,6 @@ void flushdbCommand(client *c) {
int flags;
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
- signalFlushedDb(c->db->id);
server.dirty += emptyDb(c->db->id,flags,NULL);
addReply(c,shared.ok);
}
@@ -464,7 +469,6 @@ void flushallCommand(client *c) {
int flags;
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
- signalFlushedDb(-1);
server.dirty += emptyDb(-1,flags,NULL);
addReply(c,shared.ok);
if (server.rdb_child_pid != -1) killRDBChild();
diff --git a/src/expire.c b/src/expire.c
index b23117a3c..598b27f96 100644
--- a/src/expire.c
+++ b/src/expire.c
@@ -64,7 +64,7 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
dbSyncDelete(db,keyobj);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",keyobj,db->id);
- if (server.tracking_clients) trackingInvalidateKey(keyobj);
+ trackingInvalidateKey(keyobj);
decrRefCount(keyobj);
server.stat_expiredkeys++;
return 1;
diff --git a/src/hyperloglog.c b/src/hyperloglog.c
index e01ea6042..e0557f985 100644
--- a/src/hyperloglog.c
+++ b/src/hyperloglog.c
@@ -701,6 +701,7 @@ int hllSparseSet(robj *o, long index, uint8_t count) {
first += span;
}
if (span == 0) return -1; /* Invalid format. */
+ if (span >= end) return -1; /* Invalid format. */
next = HLL_SPARSE_IS_XZERO(p) ? p+2 : p+1;
if (next >= end) next = NULL;
diff --git a/src/module.c b/src/module.c
index 36384c018..e5a1ea1ed 100644
--- a/src/module.c
+++ b/src/module.c
@@ -29,6 +29,7 @@
#include "server.h"
#include "cluster.h"
+#include "rdb.h"
#include <dlfcn.h>
#define REDISMODULE_CORE 1
@@ -3092,6 +3093,11 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver,
moduleTypeMemUsageFunc mem_usage;
moduleTypeDigestFunc digest;
moduleTypeFreeFunc free;
+ struct {
+ moduleTypeAuxLoadFunc aux_load;
+ moduleTypeAuxSaveFunc aux_save;
+ int aux_save_triggers;
+ } v2;
} *tms = (struct typemethods*) typemethods_ptr;
moduleType *mt = zcalloc(sizeof(*mt));
@@ -3103,6 +3109,11 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver,
mt->mem_usage = tms->mem_usage;
mt->digest = tms->digest;
mt->free = tms->free;
+ if (tms->version >= 2) {
+ mt->aux_load = tms->v2.aux_load;
+ mt->aux_save = tms->v2.aux_save;
+ mt->aux_save_triggers = tms->v2.aux_save_triggers;
+ }
memcpy(mt->name,name,sizeof(mt->name));
listAddNodeTail(ctx->module->types,mt);
return mt;
@@ -3405,6 +3416,36 @@ loaderr:
return 0;
}
+/* Iterate over modules, and trigger rdb aux saving for the ones modules types
+ * who asked for it. */
+ssize_t rdbSaveModulesAux(rio *rdb, int when) {
+ size_t total_written = 0;
+ dictIterator *di = dictGetIterator(modules);
+ dictEntry *de;
+
+ while ((de = dictNext(di)) != NULL) {
+ struct RedisModule *module = dictGetVal(de);
+ listIter li;
+ listNode *ln;
+
+ listRewind(module->types,&li);
+ while((ln = listNext(&li))) {
+ moduleType *mt = ln->value;
+ if (!mt->aux_save || !(mt->aux_save_triggers & when))
+ continue;
+ ssize_t ret = rdbSaveSingleModuleAux(rdb, when, mt);
+ if (ret==-1) {
+ dictReleaseIterator(di);
+ return -1;
+ }
+ total_written += ret;
+ }
+ }
+
+ dictReleaseIterator(di);
+ return total_written;
+}
+
/* --------------------------------------------------------------------------
* Key digest API (DEBUG DIGEST interface for modules types)
* -------------------------------------------------------------------------- */
@@ -3565,7 +3606,7 @@ void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_li
if (level < server.verbosity) return;
- name_len = snprintf(msg, sizeof(msg),"<%s> ", module->name);
+ name_len = snprintf(msg, sizeof(msg),"<%s> ", module? module->name: "module");
vsnprintf(msg + name_len, sizeof(msg) - name_len, fmt, ap);
serverLogRaw(level,msg);
}
@@ -3583,13 +3624,15 @@ void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_li
* There is a fixed limit to the length of the log line this function is able
* to emit, this limit is not specified but is guaranteed to be more than
* a few lines of text.
+ *
+ * The ctx argument may be NULL if cannot be provided in the context of the
+ * caller for instance threads or callbacks, in which case a generic "module"
+ * will be used instead of the module name.
*/
void RM_Log(RedisModuleCtx *ctx, const char *levelstr, const char *fmt, ...) {
- if (!ctx->module) return; /* Can only log if module is initialized */
-
va_list ap;
va_start(ap, fmt);
- RM_LogRaw(ctx->module,levelstr,fmt,ap);
+ RM_LogRaw(ctx? ctx->module: NULL,levelstr,fmt,ap);
va_end(ap);
}
diff --git a/src/rdb.c b/src/rdb.c
index c566378fb..4e00fad67 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -42,31 +42,35 @@
#include <sys/stat.h>
#include <sys/param.h>
-#define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__)
+/* This macro is called when the internal RDB stracture is corrupt */
+#define rdbExitReportCorruptRDB(...) rdbReportError(1, __LINE__,__VA_ARGS__)
+/* This macro is called when RDB read failed (possibly a short read) */
+#define rdbReportReadError(...) rdbReportError(0, __LINE__,__VA_ARGS__)
char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */
extern int rdbCheckMode;
void rdbCheckError(const char *fmt, ...);
void rdbCheckSetError(const char *fmt, ...);
-void rdbCheckThenExit(int linenum, char *reason, ...) {
+void rdbReportError(int corruption_error, int linenum, char *reason, ...) {
va_list ap;
char msg[1024];
int len;
len = snprintf(msg,sizeof(msg),
- "Internal error in RDB reading function at rdb.c:%d -> ", linenum);
+ "Internal error in RDB reading offset %llu, function at rdb.c:%d -> ",
+ (unsigned long long)server.loading_loaded_bytes, linenum);
va_start(ap,reason);
vsnprintf(msg+len,sizeof(msg)-len,reason,ap);
va_end(ap);
if (!rdbCheckMode) {
- serverLog(LL_WARNING, "%s", msg);
- if (rdbFileBeingLoaded) {
+ if (rdbFileBeingLoaded || corruption_error) {
+ serverLog(LL_WARNING, "%s", msg);
char *argv[2] = {"",rdbFileBeingLoaded};
redis_check_rdb_main(2,argv,NULL);
} else {
- serverLog(LL_WARNING, "Failure loading rdb format from socket, assuming connection error, resuming operation.");
+ serverLog(LL_WARNING, "%s. Failure loading rdb format from socket, assuming connection error, resuming operation.", msg);
return;
}
} else {
@@ -82,18 +86,6 @@ static int rdbWriteRaw(rio *rdb, void *p, size_t len) {
return len;
}
-/* This is just a wrapper for the low level function rioRead() that will
- * automatically abort if it is not possible to read the specified amount
- * of bytes. */
-void rdbLoadRaw(rio *rdb, void *buf, uint64_t len) {
- if (rioRead(rdb,buf,len) == 0) {
- rdbExitReportCorruptRDB(
- "Impossible to read %llu bytes in rdbLoadRaw()",
- (unsigned long long) len);
- return; /* Not reached. */
- }
-}
-
int rdbSaveType(rio *rdb, unsigned char type) {
return rdbWriteRaw(rdb,&type,1);
}
@@ -109,10 +101,12 @@ int rdbLoadType(rio *rdb) {
/* This is only used to load old databases stored with the RDB_OPCODE_EXPIRETIME
* opcode. New versions of Redis store using the RDB_OPCODE_EXPIRETIME_MS
- * opcode. */
+ * opcode. On error -1 is returned, however this could be a valid time, so
+ * to check for loading errors the caller should call rioGetReadError() after
+ * calling this function. */
time_t rdbLoadTime(rio *rdb) {
int32_t t32;
- rdbLoadRaw(rdb,&t32,4);
+ if (rioRead(rdb,&t32,4) == 0) return -1;
return (time_t)t32;
}
@@ -132,10 +126,14 @@ int rdbSaveMillisecondTime(rio *rdb, long long t) {
* after upgrading to Redis version 5 they will no longer be able to load their
* own old RDB files. Because of that, we instead fix the function only for new
* RDB versions, and load older RDB versions as we used to do in the past,
- * allowing big endian systems to load their own old RDB files. */
+ * allowing big endian systems to load their own old RDB files.
+ *
+ * On I/O error the function returns LLONG_MAX, however if this is also a
+ * valid stored value, the caller should use rioGetReadError() to check for
+ * errors after calling this function. */
long long rdbLoadMillisecondTime(rio *rdb, int rdbver) {
int64_t t64;
- rdbLoadRaw(rdb,&t64,8);
+ if (rioRead(rdb,&t64,8) == 0) return LLONG_MAX;
if (rdbver >= 9) /* Check the top comment of this function. */
memrev64ifbe(&t64); /* Convert in big endian if the system is BE. */
return (long long)t64;
@@ -284,8 +282,8 @@ void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) {
v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
val = (int32_t)v;
} else {
- val = 0; /* anti-warning */
rdbExitReportCorruptRDB("Unknown RDB integer encoding type %d",enctype);
+ return NULL; /* Never reached. */
}
if (plain || sds) {
char buf[LONG_STR_SIZE], *p;
@@ -388,8 +386,7 @@ void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) {
/* Load the compressed representation and uncompress it to target. */
if (rioRead(rdb,c,clen) == 0) goto err;
if (lzf_decompress(c,clen,val,len) == 0) {
- if (rdbCheckMode) rdbCheckSetError("Invalid LZF compressed string");
- goto err;
+ rdbExitReportCorruptRDB("Invalid LZF compressed string");
}
zfree(c);
@@ -503,6 +500,7 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) {
return rdbLoadLzfStringObject(rdb,flags,lenptr);
default:
rdbExitReportCorruptRDB("Unknown RDB string encoding type %d",len);
+ return NULL; /* Never reached. */
}
}
@@ -973,7 +971,6 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
RedisModuleIO io;
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
- moduleInitIOContext(io,mt,rdb,key);
/* Write the "module" identifier as prefix, so that we'll be able
* to call the right module during loading. */
@@ -982,10 +979,13 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
io.bytes += retval;
/* Then write the module-specific representation + EOF marker. */
+ moduleInitIOContext(io,mt,rdb,key);
mt->rdb_save(&io,mv->value);
retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
- if (retval == -1) return -1;
- io.bytes += retval;
+ if (retval == -1)
+ io.error = 1;
+ else
+ io.bytes += retval;
if (io.ctx) {
moduleFreeContext(io.ctx);
@@ -1103,6 +1103,40 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
return 1;
}
+ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) {
+ /* Save a module-specific aux value. */
+ RedisModuleIO io;
+ int retval = rdbSaveType(rdb, RDB_OPCODE_MODULE_AUX);
+
+ /* Write the "module" identifier as prefix, so that we'll be able
+ * to call the right module during loading. */
+ retval = rdbSaveLen(rdb,mt->id);
+ if (retval == -1) return -1;
+ io.bytes += retval;
+
+ /* write the 'when' so that we can provide it on loading */
+ retval = rdbSaveLen(rdb,when);
+ if (retval == -1) return -1;
+ io.bytes += retval;
+
+ /* Then write the module-specific representation + EOF marker. */
+ moduleInitIOContext(io,mt,rdb,NULL);
+ mt->aux_save(&io,when);
+ retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
+ if (retval == -1)
+ io.error = 1;
+ else
+ io.bytes += retval;
+
+ if (io.ctx) {
+ moduleFreeContext(io.ctx);
+ zfree(io.ctx);
+ }
+ if (io.error)
+ return -1;
+ return io.bytes;
+}
+
/* Produces a dump of the database in RDB format sending it to the specified
* Redis I/O channel. On success C_OK is returned, otherwise C_ERR
* is returned and part of the output, or all the output, can be
@@ -1124,6 +1158,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
+ if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
@@ -1185,6 +1220,8 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
di = NULL; /* So that we don't release it again on error. */
}
+ if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;
+
/* EOF opcode */
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
@@ -1644,6 +1681,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
hashTypeConvert(o, OBJ_ENCODING_HT);
break;
default:
+ /* totally unreachable */
rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
break;
}
@@ -1651,6 +1689,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
o = createStreamObject();
stream *s = o->ptr;
uint64_t listpacks = rdbLoadLen(rdb,NULL);
+ if (listpacks == RDB_LENERR) {
+ rdbReportReadError("Stream listpacks len loading failed.");
+ decrRefCount(o);
+ return NULL;
+ }
while(listpacks--) {
/* Get the master ID, the one we'll use as key of the radix tree
@@ -1658,7 +1701,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
* relatively to this ID. */
sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
if (nodekey == NULL) {
- rdbExitReportCorruptRDB("Stream master ID loading failed: invalid encoding or I/O error.");
+ rdbReportReadError("Stream master ID loading failed: invalid encoding or I/O error.");
+ decrRefCount(o);
+ return NULL;
}
if (sdslen(nodekey) != sizeof(streamID)) {
rdbExitReportCorruptRDB("Stream node key entry is not the "
@@ -1668,7 +1713,12 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
/* Load the listpack. */
unsigned char *lp =
rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL);
- if (lp == NULL) return NULL;
+ if (lp == NULL) {
+ rdbReportReadError("Stream listpacks loading failed.");
+ sdsfree(nodekey);
+ decrRefCount(o);
+ return NULL;
+ }
unsigned char *first = lpFirst(lp);
if (first == NULL) {
/* Serialized listpacks should never be empty, since on
@@ -1686,12 +1736,24 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
}
/* Load total number of items inside the stream. */
s->length = rdbLoadLen(rdb,NULL);
+
/* Load the last entry ID. */
s->last_id.ms = rdbLoadLen(rdb,NULL);
s->last_id.seq = rdbLoadLen(rdb,NULL);
+ if (rioGetReadError(rdb)) {
+ rdbReportReadError("Stream object metadata loading failed.");
+ decrRefCount(o);
+ return NULL;
+ }
+
/* Consumer groups loading */
- size_t cgroups_count = rdbLoadLen(rdb,NULL);
+ uint64_t cgroups_count = rdbLoadLen(rdb,NULL);
+ if (cgroups_count == RDB_LENERR) {
+ rdbReportReadError("Stream cgroup count loading failed.");
+ decrRefCount(o);
+ return NULL;
+ }
while(cgroups_count--) {
/* Get the consumer group name and ID. We can then create the
* consumer group ASAP and populate its structure as
@@ -1699,11 +1761,21 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
streamID cg_id;
sds cgname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
if (cgname == NULL) {
- rdbExitReportCorruptRDB(
+ rdbReportReadError(
"Error reading the consumer group name from Stream");
+ decrRefCount(o);
+ return NULL;
}
+
cg_id.ms = rdbLoadLen(rdb,NULL);
cg_id.seq = rdbLoadLen(rdb,NULL);
+ if (rioGetReadError(rdb)) {
+ rdbReportReadError("Stream cgroup ID loading failed.");
+ sdsfree(cgname);
+ decrRefCount(o);
+ return NULL;
+ }
+
streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id);
if (cgroup == NULL)
rdbExitReportCorruptRDB("Duplicated consumer group name %s",
@@ -1715,13 +1787,28 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
* owner, since consumers for this group and their messages will
* be read as a next step. So for now leave them not resolved
* and later populate it. */
- size_t pel_size = rdbLoadLen(rdb,NULL);
+ uint64_t pel_size = rdbLoadLen(rdb,NULL);
+ if (pel_size == RDB_LENERR) {
+ rdbReportReadError("Stream PEL size loading failed.");
+ decrRefCount(o);
+ return NULL;
+ }
while(pel_size--) {
unsigned char rawid[sizeof(streamID)];
- rdbLoadRaw(rdb,rawid,sizeof(rawid));
+ if (rioRead(rdb,rawid,sizeof(rawid)) == 0) {
+ rdbReportReadError("Stream PEL ID loading failed.");
+ decrRefCount(o);
+ return NULL;
+ }
streamNACK *nack = streamCreateNACK(NULL);
nack->delivery_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
nack->delivery_count = rdbLoadLen(rdb,NULL);
+ if (rioGetReadError(rdb)) {
+ rdbReportReadError("Stream PEL NACK loading failed.");
+ decrRefCount(o);
+ streamFreeNACK(nack);
+ return NULL;
+ }
if (!raxInsert(cgroup->pel,rawid,sizeof(rawid),nack,NULL))
rdbExitReportCorruptRDB("Duplicated gobal PEL entry "
"loading stream consumer group");
@@ -1729,24 +1816,47 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
/* Now that we loaded our global PEL, we need to load the
* consumers and their local PELs. */
- size_t consumers_num = rdbLoadLen(rdb,NULL);
+ uint64_t consumers_num = rdbLoadLen(rdb,NULL);
+ if (consumers_num == RDB_LENERR) {
+ rdbReportReadError("Stream consumers num loading failed.");
+ decrRefCount(o);
+ return NULL;
+ }
while(consumers_num--) {
sds cname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
if (cname == NULL) {
- rdbExitReportCorruptRDB(
- "Error reading the consumer name from Stream group");
+ rdbReportReadError(
+ "Error reading the consumer name from Stream group.");
+ decrRefCount(o);
+ return NULL;
}
streamConsumer *consumer = streamLookupConsumer(cgroup,cname,
1);
sdsfree(cname);
consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
+ if (rioGetReadError(rdb)) {
+ rdbReportReadError("Stream short read reading seen time.");
+ decrRefCount(o);
+ return NULL;
+ }
/* Load the PEL about entries owned by this specific
* consumer. */
pel_size = rdbLoadLen(rdb,NULL);
+ if (pel_size == RDB_LENERR) {
+ rdbReportReadError(
+ "Stream consumer PEL num loading failed.");
+ decrRefCount(o);
+ return NULL;
+ }
while(pel_size--) {
unsigned char rawid[sizeof(streamID)];
- rdbLoadRaw(rdb,rawid,sizeof(rawid));
+ if (rioRead(rdb,rawid,sizeof(rawid)) == 0) {
+ rdbReportReadError(
+ "Stream short read reading PEL streamID.");
+ decrRefCount(o);
+ return NULL;
+ }
streamNACK *nack = raxFind(cgroup->pel,rawid,sizeof(rawid));
if (nack == raxNotFound)
rdbExitReportCorruptRDB("Consumer entry not found in "
@@ -1765,6 +1875,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
}
} else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) {
uint64_t moduleid = rdbLoadLen(rdb,NULL);
+ if (rioGetReadError(rdb)) return NULL;
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
char name[10];
@@ -1792,6 +1903,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
/* Module v2 serialization has an EOF mark at the end. */
if (io.ver == 2) {
uint64_t eof = rdbLoadLen(rdb,NULL);
+ if (eof == RDB_LENERR) {
+ o = createModuleObject(mt,ptr); /* creating just in order to easily destroy */
+ decrRefCount(o);
+ return NULL;
+ }
if (eof != RDB_MODULE_OPCODE_EOF) {
serverLog(LL_WARNING,"The RDB file contains module data for the module '%s' that is not terminated by the proper module value EOF marker", name);
exit(1);
@@ -1805,7 +1921,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
}
o = createModuleObject(mt,ptr);
} else {
- rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
+ rdbReportReadError("Unknown RDB encoding type %d",rdbtype);
+ return NULL;
}
return o;
}
@@ -1904,11 +2021,13 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
* load the actual type, and continue. */
expiretime = rdbLoadTime(rdb);
expiretime *= 1000;
+ if (rioGetReadError(rdb)) goto eoferr;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_EXPIRETIME_MS) {
/* EXPIRETIME_MS: milliseconds precision expire times introduced
* with RDB v3. Like EXPIRETIME but no with more precision. */
expiretime = rdbLoadMillisecondTime(rdb,rdbver);
+ if (rioGetReadError(rdb)) goto eoferr;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_FREQ) {
/* FREQ: LFU frequency. */
@@ -2009,15 +2128,12 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
decrRefCount(auxval);
continue; /* Read type again. */
} else if (type == RDB_OPCODE_MODULE_AUX) {
- /* This is just for compatibility with the future: we have plans
- * to add the ability for modules to store anything in the RDB
- * file, like data that is not related to the Redis key space.
- * Such data will potentially be stored both before and after the
- * RDB keys-values section. For this reason since RDB version 9,
- * we have the ability to read a MODULE_AUX opcode followed by an
- * identifier of the module, and a serialized value in "MODULE V2"
- * format. */
+ /* Load module data that is not related to the Redis key space.
+ * Such data can be potentially be stored both before and after the
+ * RDB keys-values section. */
uint64_t moduleid = rdbLoadLen(rdb,NULL);
+ int when = rdbLoadLen(rdb,NULL);
+ if (rioGetReadError(rdb)) goto eoferr;
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
char name[10];
moduleTypeNameByID(name,moduleid);
@@ -2027,14 +2143,37 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load: no matching module '%s'", name);
exit(1);
} else if (!rdbCheckMode && mt != NULL) {
- /* This version of Redis actually does not know what to do
- * with modules AUX data... */
- serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load for the module '%s'. Probably you want to use a newer version of Redis which implements aux data callbacks", name);
- exit(1);
+ if (!mt->aux_load) {
+ /* Module doesn't support AUX. */
+ serverLog(LL_WARNING,"The RDB file contains module AUX data, but the module '%s' doesn't seem to support it.", name);
+ exit(1);
+ }
+
+ RedisModuleIO io;
+ moduleInitIOContext(io,mt,rdb,NULL);
+ io.ver = 2;
+ /* Call the rdb_load method of the module providing the 10 bit
+ * encoding version in the lower 10 bits of the module ID. */
+ if (mt->aux_load(&io,moduleid&1023, when) || io.error) {
+ moduleTypeNameByID(name,moduleid);
+ serverLog(LL_WARNING,"The RDB file contains module AUX data for the module type '%s', that the responsible module is not able to load. Check for modules log above for additional clues.", name);
+ exit(1);
+ }
+ if (io.ctx) {
+ moduleFreeContext(io.ctx);
+ zfree(io.ctx);
+ }
+ uint64_t eof = rdbLoadLen(rdb,NULL);
+ if (eof != RDB_MODULE_OPCODE_EOF) {
+ serverLog(LL_WARNING,"The RDB file contains module AUX data for the module '%s' that is not terminated by the proper module value EOF marker", name);
+ exit(1);
+ }
+ continue;
} else {
/* RDB check mode. */
robj *aux = rdbLoadCheckModuleValue(rdb,name);
decrRefCount(aux);
+ continue; /* Read next opcode. */
}
}
@@ -2088,10 +2227,15 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
}
return C_OK;
-eoferr: /* unexpected end of file is handled here with a fatal exit */
- serverLog(LL_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
- rdbExitReportCorruptRDB("Unexpected EOF reading RDB file");
- return C_ERR; /* Just to avoid warning */
+ /* Unexpected end of file is handled here calling rdbReportReadError():
+ * this will in turn either abort Redis in most cases, or if we are loading
+ * the RDB file from a socket during initial SYNC (diskless replica mode),
+ * we'll report the error to the caller, so that we can retry. */
+eoferr:
+ serverLog(LL_WARNING,
+ "Short read or OOM loading DB. Unrecoverable error, aborting now.");
+ rdbReportReadError("Unexpected EOF reading RDB file");
+ return C_ERR;
}
/* Like rdbLoadRio() but takes a filename instead of a rio stream. The
diff --git a/src/rdb.h b/src/rdb.h
index 0acddf9ab..40a50f7ba 100644
--- a/src/rdb.h
+++ b/src/rdb.h
@@ -145,6 +145,7 @@ size_t rdbSavedObjectLen(robj *o);
robj *rdbLoadObject(int type, rio *rdb, robj *key);
void backgroundSaveDoneHandler(int exitcode, int bysignal);
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime);
+ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt);
robj *rdbLoadStringObject(rio *rdb);
ssize_t rdbSaveStringObject(rio *rdb, robj *obj);
ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len);
diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c
index 1d16fa4ee..2df41580b 100644
--- a/src/redis-benchmark.c
+++ b/src/redis-benchmark.c
@@ -104,6 +104,7 @@ static struct config {
int is_fetching_slots;
int is_updating_slots;
int slots_last_update;
+ int enable_tracking;
/* Thread mutexes to be used as fallbacks by atomicvar.h */
pthread_mutex_t requests_issued_mutex;
pthread_mutex_t requests_finished_mutex;
@@ -255,7 +256,7 @@ static redisConfig *getRedisConfig(const char *ip, int port,
goto fail;
}
- if(config.auth){
+ if(config.auth) {
void *authReply = NULL;
redisAppendCommand(c, "AUTH %s", config.auth);
if (REDIS_OK != redisGetReply(c, &authReply)) goto fail;
@@ -633,6 +634,14 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
c->prefix_pending++;
}
+ if (config.enable_tracking) {
+ char *buf = NULL;
+ int len = redisFormatCommand(&buf, "CLIENT TRACKING on");
+ c->obuf = sdscatlen(c->obuf, buf, len);
+ free(buf);
+ c->prefix_pending++;
+ }
+
/* If a DB number different than zero is selected, prefix our request
* buffer with the SELECT command, that will be discarded the first
* time the replies are received, so if the client is reused the
@@ -1350,6 +1359,8 @@ int parseOptions(int argc, const char **argv) {
} else if (config.num_threads < 0) config.num_threads = 0;
} else if (!strcmp(argv[i],"--cluster")) {
config.cluster_mode = 1;
+ } else if (!strcmp(argv[i],"--enable-tracking")) {
+ config.enable_tracking = 1;
} else if (!strcmp(argv[i],"--help")) {
exit_status = 0;
goto usage;
@@ -1380,6 +1391,7 @@ usage:
" --dbnum <db> SELECT the specified db number (default 0)\n"
" --threads <num> Enable multi-thread mode.\n"
" --cluster Enable cluster mode.\n"
+" --enable-tracking Send CLIENT TRACKING on before starting benchmark.\n"
" -k <boolean> 1=keep alive 0=reconnect (default 1)\n"
" -r <keyspacelen> Use random keys for SET/GET/INCR, random values for SADD\n"
" Using this option the benchmark will expand the string __rand_int__\n"
@@ -1504,6 +1516,7 @@ int main(int argc, const char **argv) {
config.is_fetching_slots = 0;
config.is_updating_slots = 0;
config.slots_last_update = 0;
+ config.enable_tracking = 0;
i = parseOptions(argc,argv);
argc -= i;
diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c
index e2d71b5a5..5e7415046 100644
--- a/src/redis-check-rdb.c
+++ b/src/redis-check-rdb.c
@@ -216,14 +216,16 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
/* EXPIRETIME: load an expire associated with the next key
* to load. Note that after loading an expire we need to
* load the actual type, and continue. */
- if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr;
+ expiretime = rdbLoadTime(&rdb);
expiretime *= 1000;
+ if (rioGetReadError(&rdb)) goto eoferr;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_EXPIRETIME_MS) {
/* EXPIRETIME_MS: milliseconds precision expire times introduced
* with RDB v3. Like EXPIRETIME but no with more precision. */
rdbstate.doing = RDB_CHECK_DOING_READ_EXPIRE;
- if ((expiretime = rdbLoadMillisecondTime(&rdb, rdbver)) == -1) goto eoferr;
+ expiretime = rdbLoadMillisecondTime(&rdb, rdbver);
+ if (rioGetReadError(&rdb)) goto eoferr;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_FREQ) {
/* FREQ: LFU frequency. */
diff --git a/src/redismodule.h b/src/redismodule.h
index f0f27c067..e08aa16d4 100644
--- a/src/redismodule.h
+++ b/src/redismodule.h
@@ -129,6 +129,10 @@
#define REDISMODULE_NOT_USED(V) ((void) V)
+/* Bit flags for aux_save_triggers and the aux_load and aux_save callbacks */
+#define REDISMODULE_AUX_BEFORE_RDB (1<<0)
+#define REDISMODULE_AUX_AFTER_RDB (1<<1)
+
/* This type represents a timer handle, and is returned when a timer is
* registered and used in order to invalidate a timer. It's just a 64 bit
* number, because this is how each timer is represented inside the radix tree
@@ -169,6 +173,8 @@ typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlocke
typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
+typedef int (*RedisModuleTypeAuxLoadFunc)(RedisModuleIO *rdb, int encver, int when);
+typedef void (*RedisModuleTypeAuxSaveFunc)(RedisModuleIO *rdb, int when);
typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value);
typedef size_t (*RedisModuleTypeMemUsageFunc)(const void *value);
typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value);
@@ -177,7 +183,7 @@ typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const cha
typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter);
-#define REDISMODULE_TYPE_METHOD_VERSION 1
+#define REDISMODULE_TYPE_METHOD_VERSION 2
typedef struct RedisModuleTypeMethods {
uint64_t version;
RedisModuleTypeLoadFunc rdb_load;
@@ -186,6 +192,9 @@ typedef struct RedisModuleTypeMethods {
RedisModuleTypeMemUsageFunc mem_usage;
RedisModuleTypeDigestFunc digest;
RedisModuleTypeFreeFunc free;
+ RedisModuleTypeAuxLoadFunc aux_load;
+ RedisModuleTypeAuxSaveFunc aux_save;
+ int aux_save_triggers;
} RedisModuleTypeMethods;
#define REDISMODULE_GET_API(name) \
diff --git a/src/rio.c b/src/rio.c
index 5359bc3d6..bdbc5d0e9 100644
--- a/src/rio.c
+++ b/src/rio.c
@@ -92,6 +92,7 @@ static const rio rioBufferIO = {
rioBufferFlush,
NULL, /* update_checksum */
0, /* current checksum */
+ 0, /* flags */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
@@ -145,6 +146,7 @@ static const rio rioFileIO = {
rioFileFlush,
NULL, /* update_checksum */
0, /* current checksum */
+ 0, /* flags */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
@@ -239,6 +241,7 @@ static const rio rioFdIO = {
rioFdFlush,
NULL, /* update_checksum */
0, /* current checksum */
+ 0, /* flags */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
@@ -374,6 +377,7 @@ static const rio rioFdsetIO = {
rioFdsetFlush,
NULL, /* update_checksum */
0, /* current checksum */
+ 0, /* flags */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
diff --git a/src/rio.h b/src/rio.h
index beea06888..6199bb039 100644
--- a/src/rio.h
+++ b/src/rio.h
@@ -1,6 +1,6 @@
/*
* Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
- * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * Copyright (c) 2009-2019, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -36,6 +36,9 @@
#include <stdint.h>
#include "sds.h"
+#define RIO_FLAG_READ_ERROR (1<<0)
+#define RIO_FLAG_WRITE_ERROR (1<<1)
+
struct _rio {
/* Backend functions.
* Since this functions do not tolerate short writes or reads the return
@@ -51,8 +54,8 @@ struct _rio {
* computation. */
void (*update_cksum)(struct _rio *, const void *buf, size_t len);
- /* The current checksum */
- uint64_t cksum;
+ /* The current checksum and flags (see RIO_FLAG_*) */
+ uint64_t cksum, flags;
/* number of bytes read or written */
size_t processed_bytes;
@@ -99,11 +102,14 @@ typedef struct _rio rio;
* if needed. */
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
+ if (r->flags & RIO_FLAG_WRITE_ERROR) return 0;
while (len) {
size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
- if (r->write(r,buf,bytes_to_write) == 0)
+ if (r->write(r,buf,bytes_to_write) == 0) {
+ r->flags |= RIO_FLAG_WRITE_ERROR;
return 0;
+ }
buf = (char*)buf + bytes_to_write;
len -= bytes_to_write;
r->processed_bytes += bytes_to_write;
@@ -112,10 +118,13 @@ static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
}
static inline size_t rioRead(rio *r, void *buf, size_t len) {
+ if (r->flags & RIO_FLAG_READ_ERROR) return 0;
while (len) {
size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
- if (r->read(r,buf,bytes_to_read) == 0)
+ if (r->read(r,buf,bytes_to_read) == 0) {
+ r->flags |= RIO_FLAG_READ_ERROR;
return 0;
+ }
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read);
buf = (char*)buf + bytes_to_read;
len -= bytes_to_read;
@@ -132,6 +141,22 @@ static inline int rioFlush(rio *r) {
return r->flush(r);
}
+/* This function allows to know if there was a read error in any past
+ * operation, since the rio stream was created or since the last call
+ * to rioClearError(). */
+static inline int rioGetReadError(rio *r) {
+ return (r->flags & RIO_FLAG_READ_ERROR) != 0;
+}
+
+/* Like rioGetReadError() but for write errors. */
+static inline int rioGetWriteError(rio *r) {
+ return (r->flags & RIO_FLAG_READ_ERROR) != 0;
+}
+
+static inline void rioClearErrors(rio *r) {
+ r->flags &= ~(RIO_FLAG_READ_ERROR|RIO_FLAG_WRITE_ERROR);
+}
+
void rioInitWithFile(rio *r, FILE *fp);
void rioInitWithBuffer(rio *r, sds s);
void rioInitWithFd(rio *r, int fd, size_t read_limit);
diff --git a/src/server.c b/src/server.c
index 4337b8f01..eb5cef386 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2403,6 +2403,9 @@ void initServerConfig(void) {
/* Latency monitor */
server.latency_monitor_threshold = CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD;
+ /* Tracking. */
+ server.tracking_table_max_fill = CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL;
+
/* Debugging */
server.assert_failed = "<no assertion failed>";
server.assert_file = "<no file>";
@@ -3401,6 +3404,10 @@ int processCommand(client *c) {
}
}
+ /* Make sure to use a reasonable amount of memory for client side
+ * caching metadata. */
+ if (server.tracking_clients) trackingLimitUsedSlots();
+
/* Don't accept write commands if there are problems persisting on disk
* and if this is a master instance. */
int deny_write_type = writeCommandsDeniedByDiskError();
@@ -4140,7 +4147,8 @@ sds genRedisInfoString(char *section) {
"active_defrag_hits:%lld\r\n"
"active_defrag_misses:%lld\r\n"
"active_defrag_key_hits:%lld\r\n"
- "active_defrag_key_misses:%lld\r\n",
+ "active_defrag_key_misses:%lld\r\n"
+ "tracking_used_slots:%lld\r\n",
server.stat_numconnections,
server.stat_numcommands,
getInstantaneousMetric(STATS_METRIC_COMMAND),
@@ -4166,7 +4174,8 @@ sds genRedisInfoString(char *section) {
server.stat_active_defrag_hits,
server.stat_active_defrag_misses,
server.stat_active_defrag_key_hits,
- server.stat_active_defrag_key_misses);
+ server.stat_active_defrag_key_misses,
+ trackingGetUsedSlots());
}
/* Replication */
diff --git a/src/server.h b/src/server.h
index 5991cfa6c..a7f38a7a9 100644
--- a/src/server.h
+++ b/src/server.h
@@ -171,6 +171,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define CONFIG_DEFAULT_DEFRAG_CYCLE_MAX 75 /* 75% CPU max (at upper threshold) */
#define CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS 1000 /* keys with more than 1000 fields will be processed separately */
#define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */
+#define CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL 10 /* 10% tracking table max fill. */
#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */
@@ -536,6 +537,10 @@ typedef long long mstime_t; /* millisecond time type. */
#define REDISMODULE_TYPE_ENCVER(id) (id & REDISMODULE_TYPE_ENCVER_MASK)
#define REDISMODULE_TYPE_SIGN(id) ((id & ~((uint64_t)REDISMODULE_TYPE_ENCVER_MASK)) >>REDISMODULE_TYPE_ENCVER_BITS)
+/* Bit flags for moduleTypeAuxSaveFunc */
+#define REDISMODULE_AUX_BEFORE_RDB (1<<0)
+#define REDISMODULE_AUX_AFTER_RDB (1<<1)
+
struct RedisModule;
struct RedisModuleIO;
struct RedisModuleDigest;
@@ -548,6 +553,8 @@ struct redisObject;
* is deleted. */
typedef void *(*moduleTypeLoadFunc)(struct RedisModuleIO *io, int encver);
typedef void (*moduleTypeSaveFunc)(struct RedisModuleIO *io, void *value);
+typedef int (*moduleTypeAuxLoadFunc)(struct RedisModuleIO *rdb, int encver, int when);
+typedef void (*moduleTypeAuxSaveFunc)(struct RedisModuleIO *rdb, int when);
typedef void (*moduleTypeRewriteFunc)(struct RedisModuleIO *io, struct redisObject *key, void *value);
typedef void (*moduleTypeDigestFunc)(struct RedisModuleDigest *digest, void *value);
typedef size_t (*moduleTypeMemUsageFunc)(const void *value);
@@ -564,6 +571,9 @@ typedef struct RedisModuleType {
moduleTypeMemUsageFunc mem_usage;
moduleTypeDigestFunc digest;
moduleTypeFreeFunc free;
+ moduleTypeAuxLoadFunc aux_load;
+ moduleTypeAuxSaveFunc aux_save;
+ int aux_save_triggers;
char name[10]; /* 9 bytes name + null term. Charset: A-Z a-z 0-9 _- */
} moduleType;
@@ -1316,6 +1326,7 @@ struct redisServer {
list *ready_keys; /* List of readyList structures for BLPOP & co */
/* Client side caching. */
unsigned int tracking_clients; /* # of clients with tracking enabled.*/
+ int tracking_table_max_fill; /* Max fill percentage. */
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
@@ -1528,6 +1539,7 @@ void moduleAcquireGIL(void);
void moduleReleaseGIL(void);
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
void moduleCallCommandFilters(client *c);
+ssize_t rdbSaveModulesAux(rio *rdb, int when);
int moduleAllDatatypesHandleErrors();
/* Utils */
@@ -1639,6 +1651,9 @@ void enableTracking(client *c, uint64_t redirect_to);
void disableTracking(client *c);
void trackingRememberKeys(client *c);
void trackingInvalidateKey(robj *keyobj);
+void trackingInvalidateKeysOnFlush(int dbid);
+void trackingLimitUsedSlots(void);
+unsigned long long trackingGetUsedSlots(void);
/* List data type */
void listTypeTryConversion(robj *subject, robj *value);
diff --git a/src/stream.h b/src/stream.h
index ef08753b5..8ae90ce77 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -109,5 +109,6 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id);
streamNACK *streamCreateNACK(streamConsumer *consumer);
void streamDecodeID(void *buf, streamID *id);
int streamCompareID(streamID *a, streamID *b);
+void streamFreeNACK(streamNACK *na);
#endif
diff --git a/src/t_zset.c b/src/t_zset.c
index fb7078abd..2680e76a9 100644
--- a/src/t_zset.c
+++ b/src/t_zset.c
@@ -1357,9 +1357,8 @@ int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
/* Optimize: check if the element is too large or the list
* becomes too long *before* executing zzlInsert. */
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
- if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
- zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
- if (sdslen(ele) > server.zset_max_ziplist_value)
+ if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||
+ sdslen(ele) > server.zset_max_ziplist_value)
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
if (newscore) *newscore = score;
*flags |= ZADD_ADDED;
diff --git a/src/tracking.c b/src/tracking.c
index bbfc66a72..f7f0fc755 100644
--- a/src/tracking.c
+++ b/src/tracking.c
@@ -60,6 +60,7 @@
* use the most significant bits instead of the full 24 bits. */
#define TRACKING_TABLE_SIZE (1<<24)
rax **TrackingTable = NULL;
+unsigned long TrackingTableUsedSlots = 0;
robj *TrackingChannelName;
/* Remove the tracking state from the client 'c'. Note that there is not much
@@ -109,67 +110,187 @@ void trackingRememberKeys(client *c) {
sds sdskey = c->argv[idx]->ptr;
uint64_t hash = crc64(0,
(unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
- if (TrackingTable[hash] == NULL)
+ if (TrackingTable[hash] == NULL) {
TrackingTable[hash] = raxNew();
+ TrackingTableUsedSlots++;
+ }
raxTryInsert(TrackingTable[hash],
(unsigned char*)&c->id,sizeof(c->id),NULL,NULL);
}
getKeysFreeResult(keys);
}
-/* This function is called from signalModifiedKey() or other places in Redis
- * when a key changes value. In the context of keys tracking, our task here is
- * to send a notification to every client that may have keys about such . */
-void trackingInvalidateKey(robj *keyobj) {
- sds sdskey = keyobj->ptr;
- uint64_t hash = crc64(0,
- (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
- if (TrackingTable == NULL || TrackingTable[hash] == NULL) return;
+void sendTrackingMessage(client *c, long long hash) {
+ int using_redirection = 0;
+ if (c->client_tracking_redirection) {
+ client *redir = lookupClientByID(c->client_tracking_redirection);
+ if (!redir) {
+ /* We need to signal to the original connection that we
+ * are unable to send invalidation messages to the redirected
+ * connection, because the client no longer exist. */
+ if (c->resp > 2) {
+ addReplyPushLen(c,3);
+ addReplyBulkCBuffer(c,"tracking-redir-broken",21);
+ addReplyLongLong(c,c->client_tracking_redirection);
+ }
+ return;
+ }
+ c = redir;
+ using_redirection = 1;
+ }
+
+ /* Only send such info for clients in RESP version 3 or more. However
+ * if redirection is active, and the connection we redirect to is
+ * in Pub/Sub mode, we can support the feature with RESP 2 as well,
+ * by sending Pub/Sub messages in the __redis__:invalidate channel. */
+ if (c->resp > 2) {
+ addReplyPushLen(c,2);
+ addReplyBulkCBuffer(c,"invalidate",10);
+ addReplyLongLong(c,hash);
+ } else if (using_redirection && c->flags & CLIENT_PUBSUB) {
+ robj *msg = createStringObjectFromLongLong(hash);
+ addReplyPubsubMessage(c,TrackingChannelName,msg);
+ decrRefCount(msg);
+ }
+}
+
+/* Invalidates a caching slot: this is actually the low level implementation
+ * of the API that Redis calls externally, that is trackingInvalidateKey(). */
+void trackingInvalidateSlot(uint64_t slot) {
+ if (TrackingTable == NULL || TrackingTable[slot] == NULL) return;
raxIterator ri;
- raxStart(&ri,TrackingTable[hash]);
+ raxStart(&ri,TrackingTable[slot]);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
uint64_t id;
memcpy(&id,ri.key,ri.key_len);
client *c = lookupClientByID(id);
- if (c == NULL) continue;
- int using_redirection = 0;
- if (c->client_tracking_redirection) {
- client *redir = lookupClientByID(c->client_tracking_redirection);
- if (!redir) {
- /* We need to signal to the original connection that we
- * are unable to send invalidation messages to the redirected
- * connection, because the client no longer exist. */
- if (c->resp > 2) {
- addReplyPushLen(c,3);
- addReplyBulkCBuffer(c,"tracking-redir-broken",21);
- addReplyLongLong(c,c->client_tracking_redirection);
- }
- continue;
+ if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue;
+ sendTrackingMessage(c,slot);
+ }
+ raxStop(&ri);
+
+ /* Free the tracking table: we'll create the radix tree and populate it
+ * again if more keys will be modified in this caching slot. */
+ raxFree(TrackingTable[slot]);
+ TrackingTable[slot] = NULL;
+ TrackingTableUsedSlots--;
+}
+
+/* This function is called from signalModifiedKey() or other places in Redis
+ * when a key changes value. In the context of keys tracking, our task here is
+ * to send a notification to every client that may have keys about such caching
+ * slot. */
+void trackingInvalidateKey(robj *keyobj) {
+ if (TrackingTable == NULL || TrackingTableUsedSlots == 0) return;
+
+ sds sdskey = keyobj->ptr;
+ uint64_t hash = crc64(0,
+ (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
+ trackingInvalidateSlot(hash);
+}
+
+/* This function is called when one or all the Redis databases are flushed
+ * (dbid == -1 in case of FLUSHALL). Caching slots are not specific for
+ * each DB but are global: currently what we do is sending a special
+ * notification to clients with tracking enabled, invalidating the caching
+ * slot "-1", which means, "all the keys", in order to avoid flooding clients
+ * with many invalidation messages for all the keys they may hold.
+ *
+ * However trying to flush the tracking table here is very costly:
+ * we need scanning 16 million caching slots in the table to check
+ * if they are used, this introduces a big delay. So what we do is to really
+ * flush the table in the case of FLUSHALL. When a FLUSHDB is called instead
+ * we just send the invalidation message to all the clients, but don't
+ * flush the table: it will slowly get garbage collected as more keys
+ * are modified in the used caching slots. */
+void trackingInvalidateKeysOnFlush(int dbid) {
+ if (server.tracking_clients) {
+ listNode *ln;
+ listIter li;
+ listRewind(server.clients,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ client *c = listNodeValue(ln);
+ if (c->flags & CLIENT_TRACKING) {
+ sendTrackingMessage(c,-1);
+ }
+ }
+ }
+
+ /* In case of FLUSHALL, reclaim all the memory used by tracking. */
+ if (dbid == -1 && TrackingTable) {
+ for (int j = 0; j < TRACKING_TABLE_SIZE && TrackingTableUsedSlots > 0; j++) {
+ if (TrackingTable[j] != NULL) {
+ raxFree(TrackingTable[j]);
+ TrackingTable[j] = NULL;
+ TrackingTableUsedSlots--;
}
- c = redir;
- using_redirection = 1;
}
- /* Only send such info for clients in RESP version 3 or more. However
- * if redirection is active, and the connection we redirect to is
- * in Pub/Sub mode, we can support the feature with RESP 2 as well,
- * by sending Pub/Sub messages in the __redis__:invalidate channel. */
- if (c->resp > 2) {
- addReplyPushLen(c,2);
- addReplyBulkCBuffer(c,"invalidate",10);
- addReplyLongLong(c,hash);
- } else if (using_redirection && c->flags & CLIENT_PUBSUB) {
- robj *msg = createStringObjectFromLongLong(hash);
- addReplyPubsubMessage(c,TrackingChannelName,msg);
- decrRefCount(msg);
+ /* If there are no clients with tracking enabled, we can even
+ * reclaim the memory used by the table itself. The code assumes
+ * the table is allocated only if there is at least one client alive
+ * with tracking enabled. */
+ if (server.tracking_clients == 0) {
+ zfree(TrackingTable);
+ TrackingTable = NULL;
}
}
- raxStop(&ri);
+}
- /* Free the tracking table: we'll create the radix tree and populate it
- * again if more keys will be modified in this hash slot. */
- raxFree(TrackingTable[hash]);
- TrackingTable[hash] = NULL;
+/* Tracking forces Redis to remember information about which client may have
+ * keys about certian caching slots. In workloads where there are a lot of
+ * reads, but keys are hardly modified, the amount of information we have
+ * to remember server side could be a lot: for each 16 millions of caching
+ * slots we may end with a radix tree containing many entries.
+ *
+ * So Redis allows the user to configure a maximum fill rate for the
+ * invalidation table. This function makes sure that we don't go over the
+ * specified fill rate: if we are over, we can just evict informations about
+ * random caching slots, and send invalidation messages to clients like if
+ * the key was modified. */
+void trackingLimitUsedSlots(void) {
+ static unsigned int timeout_counter = 0;
+
+ if (server.tracking_table_max_fill == 0) return; /* No limits set. */
+ unsigned int max_slots =
+ (TRACKING_TABLE_SIZE/100) * server.tracking_table_max_fill;
+ if (TrackingTableUsedSlots <= max_slots) {
+ timeout_counter = 0;
+ return; /* Limit not reached. */
+ }
+
+ /* We have to invalidate a few slots to reach the limit again. The effort
+ * we do here is proportional to the number of times we entered this
+ * function and found that we are still over the limit. */
+ int effort = 100 * (timeout_counter+1);
+
+ /* Let's start at a random position, and perform linear probing, in order
+ * to improve cache locality. However once we are able to find an used
+ * slot, jump again randomly, in order to avoid creating big holes in the
+ * table (that will make this funciton use more resourced later). */
+ while(effort > 0) {
+ unsigned int idx = rand() % TRACKING_TABLE_SIZE;
+ do {
+ effort--;
+ idx = (idx+1) % TRACKING_TABLE_SIZE;
+ if (TrackingTable[idx] != NULL) {
+ trackingInvalidateSlot(idx);
+ if (TrackingTableUsedSlots <= max_slots) {
+ timeout_counter = 0;
+ return; /* Return ASAP: we are again under the limit. */
+ } else {
+ break; /* Jump to next random position. */
+ }
+ }
+ } while(effort > 0);
+ }
+ timeout_counter++;
+}
+
+/* This is just used in order to access the amount of used slots in the
+ * tracking table. */
+unsigned long long trackingGetUsedSlots(void) {
+ return TrackingTableUsedSlots;
}
diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl
index d69a1761a..5d32555b0 100644
--- a/tests/integration/replication.tcl
+++ b/tests/integration/replication.tcl
@@ -192,12 +192,6 @@ foreach mdl {no yes} {
set master_host [srv 0 host]
set master_port [srv 0 port]
set slaves {}
- set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000]
- set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000]
- set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000000]
- set load_handle3 [start_write_load $master_host $master_port 8]
- set load_handle4 [start_write_load $master_host $master_port 4]
- after 5000 ;# wait for some data to accumulate so that we have RDB part for the fork
start_server {} {
lappend slaves [srv 0 client]
start_server {} {
@@ -205,6 +199,14 @@ foreach mdl {no yes} {
start_server {} {
lappend slaves [srv 0 client]
test "Connect multiple replicas at the same time (issue #141), master diskless=$mdl, replica diskless=$sdl" {
+ # start load handles only inside the test, so that the test can be skipped
+ set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000]
+ set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000]
+ set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000000]
+ set load_handle3 [start_write_load $master_host $master_port 8]
+ set load_handle4 [start_write_load $master_host $master_port 4]
+ after 5000 ;# wait for some data to accumulate so that we have RDB part for the fork
+
# Send SLAVEOF commands to slaves
[lindex $slaves 0] config set repl-diskless-load $sdl
[lindex $slaves 1] config set repl-diskless-load $sdl
@@ -278,9 +280,9 @@ start_server {tags {"repl"}} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
- set load_handle0 [start_write_load $master_host $master_port 3]
start_server {} {
test "Master stream is correctly processed while the replica has a script in -BUSY state" {
+ set load_handle0 [start_write_load $master_host $master_port 3]
set slave [srv 0 client]
$slave config set lua-time-limit 500
$slave slaveof $master_host $master_port
@@ -383,3 +385,84 @@ test {slave fails full sync and diskless load swapdb recoveres it} {
}
}
}
+
+test {diskless loading short read} {
+ start_server {tags {"repl"}} {
+ set replica [srv 0 client]
+ set replica_host [srv 0 host]
+ set replica_port [srv 0 port]
+ start_server {} {
+ set master [srv 0 client]
+ set master_host [srv 0 host]
+ set master_port [srv 0 port]
+
+ # Set master and replica to use diskless replication
+ $master config set repl-diskless-sync yes
+ $master config set rdbcompression no
+ $replica config set repl-diskless-load swapdb
+ # Try to fill the master with all types of data types / encodings
+ for {set k 0} {$k < 3} {incr k} {
+ for {set i 0} {$i < 10} {incr i} {
+ r set "$k int_$i" [expr {int(rand()*10000)}]
+ r expire "$k int_$i" [expr {int(rand()*10000)}]
+ r set "$k string_$i" [string repeat A [expr {int(rand()*1000000)}]]
+ r hset "$k hash_small" [string repeat A [expr {int(rand()*10)}]] 0[string repeat A [expr {int(rand()*10)}]]
+ r hset "$k hash_large" [string repeat A [expr {int(rand()*10000)}]] [string repeat A [expr {int(rand()*1000000)}]]
+ r sadd "$k set_small" [string repeat A [expr {int(rand()*10)}]]
+ r sadd "$k set_large" [string repeat A [expr {int(rand()*1000000)}]]
+ r zadd "$k zset_small" [expr {rand()}] [string repeat A [expr {int(rand()*10)}]]
+ r zadd "$k zset_large" [expr {rand()}] [string repeat A [expr {int(rand()*1000000)}]]
+ r lpush "$k list_small" [string repeat A [expr {int(rand()*10)}]]
+ r lpush "$k list_large" [string repeat A [expr {int(rand()*1000000)}]]
+ for {set j 0} {$j < 10} {incr j} {
+ r xadd "$k stream" * foo "asdf" bar "1234"
+ }
+ r xgroup create "$k stream" "mygroup_$i" 0
+ r xreadgroup GROUP "mygroup_$i" Alice COUNT 1 STREAMS "$k stream" >
+ }
+ }
+
+ # Start the replication process...
+ $master config set repl-diskless-sync-delay 0
+ $replica replicaof $master_host $master_port
+
+ # kill the replication at various points
+ set attempts 3
+ if {$::accurate} { set attempts 10 }
+ for {set i 0} {$i < $attempts} {incr i} {
+ # wait for the replica to start reading the rdb
+ # using the log file since the replica only responds to INFO once in 2mb
+ wait_for_log_message -1 "*Loading DB in memory*" 5 2000 1
+
+ # add some additional random sleep so that we kill the master on a different place each time
+ after [expr {int(rand()*100)}]
+
+ # kill the replica connection on the master
+ set killed [$master client kill type replica]
+
+ if {[catch {
+ set res [wait_for_log_message -1 "*Internal error in RDB*" 5 100 10]
+ if {$::verbose} {
+ puts $res
+ }
+ }]} {
+ puts "failed triggering short read"
+ # force the replica to try another full sync
+ $master client kill type replica
+ $master set asdf asdf
+ # the side effect of resizing the backlog is that it is flushed (16k is the min size)
+ $master config set repl-backlog-size [expr {16384 + $i}]
+ }
+ # wait for loading to stop (fail)
+ wait_for_condition 100 10 {
+ [s -1 loading] eq 0
+ } else {
+ fail "Replica didn't disconnect"
+ }
+ }
+ # enable fast shutdown
+ $master config set rdb-key-save-delay 0
+ }
+ }
+}
+
diff --git a/tests/modules/Makefile b/tests/modules/Makefile
index 014d20afa..6e4574747 100644
--- a/tests/modules/Makefile
+++ b/tests/modules/Makefile
@@ -13,12 +13,16 @@ endif
.SUFFIXES: .c .so .xo .o
-all: commandfilter.so
+all: commandfilter.so testrdb.so
.c.xo:
$(CC) -I../../src $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@
commandfilter.xo: ../../src/redismodule.h
+testrdb.xo: ../../src/redismodule.h
commandfilter.so: commandfilter.xo
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
+
+testrdb.so: testrdb.xo
+ $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
diff --git a/tests/modules/testrdb.c b/tests/modules/testrdb.c
new file mode 100644
index 000000000..415497a2f
--- /dev/null
+++ b/tests/modules/testrdb.c
@@ -0,0 +1,229 @@
+#include "redismodule.h"
+
+#include <string.h>
+#include <assert.h>
+
+/* Module configuration, save aux or not? */
+long long conf_aux_count = 0;
+
+/* Registered type */
+RedisModuleType *testrdb_type = NULL;
+
+/* Global values to store and persist to aux */
+RedisModuleString *before_str = NULL;
+RedisModuleString *after_str = NULL;
+
+void *testrdb_type_load(RedisModuleIO *rdb, int encver) {
+ int count = RedisModule_LoadSigned(rdb);
+ assert(count==1);
+ assert(encver==1);
+ RedisModuleString *str = RedisModule_LoadString(rdb);
+ return str;
+}
+
+void testrdb_type_save(RedisModuleIO *rdb, void *value) {
+ RedisModuleString *str = (RedisModuleString*)value;
+ RedisModule_SaveSigned(rdb, 1);
+ RedisModule_SaveString(rdb, str);
+}
+
+void testrdb_aux_save(RedisModuleIO *rdb, int when) {
+ if (conf_aux_count==1) assert(when == REDISMODULE_AUX_AFTER_RDB);
+ if (conf_aux_count==0) assert(0);
+ if (when == REDISMODULE_AUX_BEFORE_RDB) {
+ if (before_str) {
+ RedisModule_SaveSigned(rdb, 1);
+ RedisModule_SaveString(rdb, before_str);
+ } else {
+ RedisModule_SaveSigned(rdb, 0);
+ }
+ } else {
+ if (after_str) {
+ RedisModule_SaveSigned(rdb, 1);
+ RedisModule_SaveString(rdb, after_str);
+ } else {
+ RedisModule_SaveSigned(rdb, 0);
+ }
+ }
+}
+
+int testrdb_aux_load(RedisModuleIO *rdb, int encver, int when) {
+ assert(encver == 1);
+ if (conf_aux_count==1) assert(when == REDISMODULE_AUX_AFTER_RDB);
+ if (conf_aux_count==0) assert(0);
+ RedisModuleCtx *ctx = RedisModule_GetContextFromIO(rdb);
+ if (when == REDISMODULE_AUX_BEFORE_RDB) {
+ if (before_str)
+ RedisModule_FreeString(ctx, before_str);
+ before_str = NULL;
+ int count = RedisModule_LoadSigned(rdb);
+ if (count)
+ before_str = RedisModule_LoadString(rdb);
+ } else {
+ if (after_str)
+ RedisModule_FreeString(ctx, after_str);
+ after_str = NULL;
+ int count = RedisModule_LoadSigned(rdb);
+ if (count)
+ after_str = RedisModule_LoadString(rdb);
+ }
+ return REDISMODULE_OK;
+}
+
+void testrdb_type_free(void *value) {
+ RedisModule_FreeString(NULL, (RedisModuleString*)value);
+}
+
+int testrdb_set_before(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ if (argc != 2) {
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+
+ if (before_str)
+ RedisModule_FreeString(ctx, before_str);
+ before_str = argv[1];
+ RedisModule_RetainString(ctx, argv[1]);
+ RedisModule_ReplyWithLongLong(ctx, 1);
+ return REDISMODULE_OK;
+}
+
+int testrdb_get_before(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ REDISMODULE_NOT_USED(argv);
+ if (argc != 1){
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+ if (before_str)
+ RedisModule_ReplyWithString(ctx, before_str);
+ else
+ RedisModule_ReplyWithStringBuffer(ctx, "", 0);
+ return REDISMODULE_OK;
+}
+
+int testrdb_set_after(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ if (argc != 2){
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+
+ if (after_str)
+ RedisModule_FreeString(ctx, after_str);
+ after_str = argv[1];
+ RedisModule_RetainString(ctx, argv[1]);
+ RedisModule_ReplyWithLongLong(ctx, 1);
+ return REDISMODULE_OK;
+}
+
+int testrdb_get_after(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ REDISMODULE_NOT_USED(argv);
+ if (argc != 1){
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+ if (after_str)
+ RedisModule_ReplyWithString(ctx, after_str);
+ else
+ RedisModule_ReplyWithStringBuffer(ctx, "", 0);
+ return REDISMODULE_OK;
+}
+
+int testrdb_set_key(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ if (argc != 3){
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+
+ RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
+ RedisModuleString *str = RedisModule_ModuleTypeGetValue(key);
+ if (str)
+ RedisModule_FreeString(ctx, str);
+ RedisModule_ModuleTypeSetValue(key, testrdb_type, argv[2]);
+ RedisModule_RetainString(ctx, argv[2]);
+ RedisModule_CloseKey(key);
+ RedisModule_ReplyWithLongLong(ctx, 1);
+ return REDISMODULE_OK;
+}
+
+int testrdb_get_key(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ if (argc != 2){
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+
+ RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
+ RedisModuleString *str = RedisModule_ModuleTypeGetValue(key);
+ RedisModule_CloseKey(key);
+ RedisModule_ReplyWithString(ctx, str);
+ return REDISMODULE_OK;
+}
+
+int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ if (RedisModule_Init(ctx,"testrdb",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (argc > 0)
+ RedisModule_StringToLongLong(argv[0], &conf_aux_count);
+
+ if (conf_aux_count==0) {
+ RedisModuleTypeMethods datatype_methods = {
+ .version = 1,
+ .rdb_load = testrdb_type_load,
+ .rdb_save = testrdb_type_save,
+ .aof_rewrite = NULL,
+ .digest = NULL,
+ .free = testrdb_type_free,
+ };
+
+ testrdb_type = RedisModule_CreateDataType(ctx, "test__rdb", 1, &datatype_methods);
+ if (testrdb_type == NULL)
+ return REDISMODULE_ERR;
+ } else {
+ RedisModuleTypeMethods datatype_methods = {
+ .version = REDISMODULE_TYPE_METHOD_VERSION,
+ .rdb_load = testrdb_type_load,
+ .rdb_save = testrdb_type_save,
+ .aof_rewrite = NULL,
+ .digest = NULL,
+ .free = testrdb_type_free,
+ .aux_load = testrdb_aux_load,
+ .aux_save = testrdb_aux_save,
+ .aux_save_triggers = (conf_aux_count == 1 ?
+ REDISMODULE_AUX_AFTER_RDB :
+ REDISMODULE_AUX_BEFORE_RDB | REDISMODULE_AUX_AFTER_RDB)
+ };
+
+ testrdb_type = RedisModule_CreateDataType(ctx, "test__rdb", 1, &datatype_methods);
+ if (testrdb_type == NULL)
+ return REDISMODULE_ERR;
+ }
+
+ if (RedisModule_CreateCommand(ctx,"testrdb.set.before", testrdb_set_before,"deny-oom",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"testrdb.get.before", testrdb_get_before,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"testrdb.set.after", testrdb_set_after,"deny-oom",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"testrdb.get.after", testrdb_get_after,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"testrdb.set.key", testrdb_set_key,"deny-oom",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"testrdb.get.key", testrdb_get_key,"",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ return REDISMODULE_OK;
+}
diff --git a/tests/support/util.tcl b/tests/support/util.tcl
index 41cc5612a..c2e76afad 100644
--- a/tests/support/util.tcl
+++ b/tests/support/util.tcl
@@ -99,6 +99,25 @@ proc wait_for_ofs_sync {r1 r2} {
}
}
+proc wait_for_log_message {srv_idx pattern last_lines maxtries delay} {
+ set retry $maxtries
+ set stdout [srv $srv_idx stdout]
+ while {$retry} {
+ set result [exec tail -$last_lines < $stdout]
+ set result [split $result "\n"]
+ foreach line $result {
+ if {[string match $pattern $line]} {
+ return $line
+ }
+ }
+ incr retry -1
+ after $delay
+ }
+ if {$retry == 0} {
+ fail "log message of '$pattern' not found"
+ }
+}
+
# Random integer between 0 and max (excluded).
proc randomInt {max} {
expr {int(rand()*$max)}
diff --git a/tests/unit/geo.tcl b/tests/unit/geo.tcl
index 604697be4..49e421ee9 100644
--- a/tests/unit/geo.tcl
+++ b/tests/unit/geo.tcl
@@ -61,6 +61,7 @@ set regression_vectors {
{939895 151 59.149620271823181 65.204186651485145}
{1412 156 149.29737817929004 15.95807862745508}
{564862 149 84.062063109158544 -65.685403922426232}
+ {1546032440391 16751 -1.8175081637769495 20.665668878082954}
}
set rv_idx 0
@@ -274,8 +275,19 @@ start_server {tags {"geo"}} {
foreach place $diff {
set mydist [geo_distance $lon $lat $search_lon $search_lat]
set mydist [expr $mydist/1000]
- if {($mydist / $radius_km) > 0.999} {incr rounding_errors}
+ if {($mydist / $radius_km) > 0.999} {
+ incr rounding_errors
+ continue
+ }
+ if {$mydist < $radius_m} {
+ # This is a false positive for redis since given the
+ # same points the higher precision calculation provided
+ # by TCL shows the point within range
+ incr rounding_errors
+ continue
+ }
}
+
# Make sure this is a real error and not a rounidng issue.
if {[llength $diff] == $rounding_errors} {
set res $res2; # Error silenced
diff --git a/tests/unit/moduleapi/testrdb.tcl b/tests/unit/moduleapi/testrdb.tcl
new file mode 100644
index 000000000..22201a08e
--- /dev/null
+++ b/tests/unit/moduleapi/testrdb.tcl
@@ -0,0 +1,62 @@
+set testmodule [file normalize tests/modules/testrdb.so]
+
+proc restart_and_wait {} {
+ catch {
+ r debug restart
+ }
+
+ # wait for the server to come back up
+ set retry 50
+ while {$retry} {
+ if {[catch { r ping }]} {
+ after 100
+ } else {
+ break
+ }
+ incr retry -1
+ }
+}
+
+tags "modules" {
+ start_server [list overrides [list loadmodule "$testmodule"]] {
+ test {modules are able to persist types} {
+ r testrdb.set.key key1 value1
+ assert_equal "value1" [r testrdb.get.key key1]
+ r debug reload
+ assert_equal "value1" [r testrdb.get.key key1]
+ }
+
+ test {modules global are lost without aux} {
+ r testrdb.set.before global1
+ assert_equal "global1" [r testrdb.get.before]
+ restart_and_wait
+ assert_equal "" [r testrdb.get.before]
+ }
+ }
+
+ start_server [list overrides [list loadmodule "$testmodule 2"]] {
+ test {modules are able to persist globals before and after} {
+ r testrdb.set.before global1
+ r testrdb.set.after global2
+ assert_equal "global1" [r testrdb.get.before]
+ assert_equal "global2" [r testrdb.get.after]
+ restart_and_wait
+ assert_equal "global1" [r testrdb.get.before]
+ assert_equal "global2" [r testrdb.get.after]
+ }
+
+ }
+
+ start_server [list overrides [list loadmodule "$testmodule 1"]] {
+ test {modules are able to persist globals just after} {
+ r testrdb.set.after global2
+ assert_equal "global2" [r testrdb.get.after]
+ restart_and_wait
+ assert_equal "global2" [r testrdb.get.after]
+ }
+ }
+
+
+ # TODO: test short read handling
+
+}