summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSalvatore Sanfilippo <antirez@gmail.com>2019-11-21 10:06:15 +0100
committerGitHub <noreply@github.com>2019-11-21 10:06:15 +0100
commit64c2508ee3e7ab86f1cc03d214061305a1a410fb (patch)
tree82900519c72a729bac321ae732b4866eea45d6fd
parent04233097688ee35d451c6f5cd64c28e57ca81b53 (diff)
parentf1f259de5bc1785f22b663b8af78b887ddea5c6c (diff)
downloadredis-64c2508ee3e7ab86f1cc03d214061305a1a410fb.tar.gz
Merge branch 'unstable' into rm_get_server_info
-rw-r--r--redis.conf39
-rwxr-xr-xruntest-moduleapi3
-rw-r--r--src/adlist.h2
-rw-r--r--src/aof.c16
-rw-r--r--src/blocked.c11
-rw-r--r--src/cluster.c2
-rw-r--r--src/config.c14
-rw-r--r--src/db.c75
-rw-r--r--src/debug.c2
-rw-r--r--src/defrag.c44
-rw-r--r--src/expire.c150
-rwxr-xr-xsrc/mkreleasehdr.sh2
-rw-r--r--src/module.c723
-rw-r--r--src/networking.c7
-rw-r--r--src/object.c11
-rw-r--r--src/rax.c3
-rw-r--r--src/rdb.c84
-rw-r--r--src/rdb.h11
-rw-r--r--src/redis-check-rdb.c6
-rw-r--r--src/redismodule.h140
-rw-r--r--src/replication.c55
-rw-r--r--src/sentinel.c13
-rw-r--r--src/server.c59
-rw-r--r--src/server.h37
-rw-r--r--src/stream.h1
-rw-r--r--src/t_hash.c2
-rw-r--r--src/t_stream.c37
-rw-r--r--src/util.c60
-rw-r--r--src/util.h9
-rw-r--r--tests/modules/Makefile5
-rw-r--r--tests/modules/blockonkeys.c261
-rw-r--r--tests/modules/datatype.c161
-rw-r--r--tests/modules/hooks.c256
-rw-r--r--tests/modules/misc.c158
-rw-r--r--tests/modules/scan.c109
-rw-r--r--tests/modules/testrdb.c14
-rw-r--r--tests/support/test.tcl39
-rw-r--r--tests/unit/moduleapi/blockonkeys.tcl85
-rw-r--r--tests/unit/moduleapi/datatype.tcl27
-rw-r--r--tests/unit/moduleapi/hooks.tcl134
-rw-r--r--tests/unit/moduleapi/misc.tcl51
-rw-r--r--tests/unit/moduleapi/scan.tcl47
-rw-r--r--tests/unit/scripting.tcl2
-rw-r--r--tests/unit/type/stream.tcl12
44 files changed, 2692 insertions, 287 deletions
diff --git a/redis.conf b/redis.conf
index 0ec3321a5..c4bf60222 100644
--- a/redis.conf
+++ b/redis.conf
@@ -813,11 +813,11 @@ replica-priority 100
# MAXMEMORY POLICY: how Redis will select what to remove when maxmemory
# is reached. You can select among five behaviors:
#
-# volatile-lru -> Evict using approximated LRU among the keys with an expire set.
+# volatile-lru -> Evict using approximated LRU, only keys with an expire set.
# allkeys-lru -> Evict any key using approximated LRU.
-# volatile-lfu -> Evict using approximated LFU among the keys with an expire set.
+# volatile-lfu -> Evict using approximated LFU, only keys with an expire set.
# allkeys-lfu -> Evict any key using approximated LFU.
-# volatile-random -> Remove a random key among the ones with an expire set.
+# volatile-random -> Remove a random key having an expire set.
# allkeys-random -> Remove a random key, any key.
# volatile-ttl -> Remove the key with the nearest expire time (minor TTL)
# noeviction -> Don't evict anything, just return an error on write operations.
@@ -872,6 +872,23 @@ replica-priority 100
#
# replica-ignore-maxmemory yes
+# Redis reclaims expired keys in two ways: upon access when those keys are
+# found to be expired, and also in background, in what is called the
+# "active expire key". The key space is slowly and interactively scanned
+# looking for expired keys to reclaim, so that it is possible to free memory
+# of keys that are expired and will never be accessed again in a short time.
+#
+# The default effort of the expire cycle will try to avoid having more than
+# ten percent of expired keys still in memory, and will try to avoid consuming
+# more than 25% of total memory and to add latency to the system. However
+# it is possible to increase the expire "effort" that is normally set to
+# "1", to a greater value, up to the value "10". At its maximum value the
+# system will use more CPU, longer cycles (and technically may introduce
+# more latency), and will tollerate less already expired keys still present
+# in the system. It's a tradeoff betweeen memory, CPU and latecy.
+#
+# active-expire-effort 1
+
############################# LAZY FREEING ####################################
# Redis has two primitives to delete keys. One is called DEL and is a blocking
@@ -1606,10 +1623,6 @@ rdb-save-incremental-fsync yes
########################### ACTIVE DEFRAGMENTATION #######################
#
-# WARNING THIS FEATURE IS EXPERIMENTAL. However it was stress tested
-# even in production and manually tested by multiple engineers for some
-# time.
-#
# What is active defragmentation?
# -------------------------------
#
@@ -1649,7 +1662,7 @@ rdb-save-incremental-fsync yes
# a good idea to leave the defaults untouched.
# Enabled active defragmentation
-# activedefrag yes
+# activedefrag no
# Minimum amount of fragmentation waste to start active defrag
# active-defrag-ignore-bytes 100mb
@@ -1660,11 +1673,13 @@ rdb-save-incremental-fsync yes
# Maximum percentage of fragmentation at which we use maximum effort
# active-defrag-threshold-upper 100
-# Minimal effort for defrag in CPU percentage
-# active-defrag-cycle-min 5
+# Minimal effort for defrag in CPU percentage, to be used when the lower
+# threshold is reached
+# active-defrag-cycle-min 1
-# Maximal effort for defrag in CPU percentage
-# active-defrag-cycle-max 75
+# Maximal effort for defrag in CPU percentage, to be used when the upper
+# threshold is reached
+# active-defrag-cycle-max 25
# Maximum number of set/hash/zset/list fields that will be processed from
# the main dictionary scan
diff --git a/runtest-moduleapi b/runtest-moduleapi
index 9301002c9..32d5c2b8d 100755
--- a/runtest-moduleapi
+++ b/runtest-moduleapi
@@ -21,4 +21,7 @@ $TCLSH tests/test_helper.tcl \
--single unit/moduleapi/propagate \
--single unit/moduleapi/hooks \
--single unit/moduleapi/misc \
+--single unit/moduleapi/blockonkeys \
+--single unit/moduleapi/scan \
+--single unit/moduleapi/datatype \
"${@}"
diff --git a/src/adlist.h b/src/adlist.h
index c954fac87..28b9016ce 100644
--- a/src/adlist.h
+++ b/src/adlist.h
@@ -66,7 +66,7 @@ typedef struct list {
#define listSetMatchMethod(l,m) ((l)->match = (m))
#define listGetDupMethod(l) ((l)->dup)
-#define listGetFree(l) ((l)->free)
+#define listGetFreeMethod(l) ((l)->free)
#define listGetMatchMethod(l) ((l)->match)
/* Prototypes */
diff --git a/src/aof.c b/src/aof.c
index 0e3648ff0..0ef59cfb6 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -731,7 +731,7 @@ int loadAppendOnlyFile(char *filename) {
server.aof_state = AOF_OFF;
fakeClient = createAOFClient();
- startLoadingFile(fp, filename);
+ startLoadingFile(fp, filename, RDBFLAGS_AOF_PREAMBLE);
/* Check if this AOF file has an RDB preamble. In that case we need to
* load the RDB file and later continue loading the AOF tail. */
@@ -746,7 +746,7 @@ int loadAppendOnlyFile(char *filename) {
serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
rioInitWithFile(&rdb,fp);
- if (rdbLoadRio(&rdb,NULL,1) != C_OK) {
+ if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) {
serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
goto readerr;
} else {
@@ -767,6 +767,7 @@ int loadAppendOnlyFile(char *filename) {
if (!(loops++ % 1000)) {
loadingProgress(ftello(fp));
processEventsWhileBlocked();
+ processModuleLoadingProgressEvent(1);
}
if (fgets(buf,sizeof(buf),fp) == NULL) {
@@ -859,7 +860,7 @@ loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
fclose(fp);
freeFakeClient(fakeClient);
server.aof_state = old_aof_state;
- stopLoading();
+ stopLoading(1);
aofUpdateCurrentSize();
server.aof_rewrite_base_size = server.aof_current_size;
server.aof_fsync_offset = server.aof_current_size;
@@ -1400,9 +1401,11 @@ int rewriteAppendOnlyFile(char *filename) {
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
+ startSaving(RDBFLAGS_AOF_PREAMBLE);
+
if (server.aof_use_rdb_preamble) {
int error;
- if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
+ if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
@@ -1465,15 +1468,18 @@ int rewriteAppendOnlyFile(char *filename) {
if (rename(tmpfile,filename) == -1) {
serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
unlink(tmpfile);
+ stopSaving(0);
return C_ERR;
}
serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
+ stopSaving(1);
return C_OK;
werr:
serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
+ stopSaving(0);
return C_ERR;
}
@@ -1760,7 +1766,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
aofUpdateCurrentSize();
server.aof_rewrite_base_size = server.aof_current_size;
- server.aof_current_size = server.aof_current_size;
+ server.aof_fsync_offset = server.aof_current_size;
/* Clear regular AOF buffer since its contents was just written to
* the new AOF from the background rewrite buffer. */
diff --git a/src/blocked.c b/src/blocked.c
index 14c2ff830..20c0e760a 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -514,6 +514,16 @@ void handleClientsBlockedOnKeys(void) {
* we can safely call signalKeyAsReady() against this key. */
dictDelete(rl->db->ready_keys,rl->key);
+ /* Even if we are not inside call(), increment the call depth
+ * in order to make sure that keys are expired against a fixed
+ * reference time, and not against the wallclock time. This
+ * way we can lookup an object multiple times (BRPOPLPUSH does
+ * that) without the risk of it being freed in the second
+ * lookup, invalidating the first one.
+ * See https://github.com/antirez/redis/pull/6554. */
+ server.fixed_time_expire++;
+ updateCachedTime(0);
+
/* Serve clients blocked on list key. */
robj *o = lookupKeyWrite(rl->db,rl->key);
@@ -529,6 +539,7 @@ void handleClientsBlockedOnKeys(void) {
* module is trying to accomplish right now. */
serveClientsBlockedOnKeyByModule(rl);
}
+ server.fixed_time_expire--;
/* Free this item. */
decrRefCount(rl->key);
diff --git a/src/cluster.c b/src/cluster.c
index a7d8a02c3..9e6ddb2c4 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -4966,7 +4966,7 @@ void restoreCommand(client *c) {
if (!absttl) ttl+=mstime();
setExpire(c,c->db,c->argv[1],ttl);
}
- objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock);
+ objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000);
signalModifiedKey(c->db,c->argv[1]);
addReply(c,shared.ok);
server.dirty++;
diff --git a/src/config.c b/src/config.c
index 505dabc9c..de8e8d8cf 100644
--- a/src/config.c
+++ b/src/config.c
@@ -256,7 +256,7 @@ void loadServerConfigFromString(char *config) {
for (configYesNo *config = configs_yesno; config->name != NULL; config++) {
if ((!strcasecmp(argv[0],config->name) ||
(config->alias && !strcasecmp(argv[0],config->alias))) &&
- (argc == 2))
+ (argc == 2))
{
if ((*(config->config) = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
@@ -580,6 +580,14 @@ void loadServerConfigFromString(char *config) {
err = "active-defrag-max-scan-fields must be positive";
goto loaderr;
}
+ } else if (!strcasecmp(argv[0],"active-expire-effort") && argc == 2) {
+ server.active_expire_effort = atoi(argv[1]);
+ if (server.active_expire_effort < 1 ||
+ server.active_expire_effort > 10)
+ {
+ err = "active-expire-effort must be between 1 and 10";
+ goto loaderr;
+ }
} else if (!strcasecmp(argv[0],"hash-max-ziplist-entries") && argc == 2) {
server.hash_max_ziplist_entries = memtoll(argv[1], NULL);
} else if (!strcasecmp(argv[0],"hash-max-ziplist-value") && argc == 2) {
@@ -1166,6 +1174,8 @@ void configSetCommand(client *c) {
} config_set_numerical_field(
"active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,1,LONG_MAX) {
} config_set_numerical_field(
+ "active-expire-effort",server.active_expire_effort,1,10) {
+ } config_set_numerical_field(
"auto-aof-rewrite-percentage",server.aof_rewrite_perc,0,INT_MAX){
} config_set_numerical_field(
"hash-max-ziplist-entries",server.hash_max_ziplist_entries,0,LONG_MAX) {
@@ -1478,6 +1488,7 @@ void configGetCommand(client *c) {
config_get_numerical_field("active-defrag-cycle-min",server.active_defrag_cycle_min);
config_get_numerical_field("active-defrag-cycle-max",server.active_defrag_cycle_max);
config_get_numerical_field("active-defrag-max-scan-fields",server.active_defrag_max_scan_fields);
+ config_get_numerical_field("active-expire-effort",server.active_expire_effort);
config_get_numerical_field("auto-aof-rewrite-percentage",
server.aof_rewrite_perc);
config_get_numerical_field("auto-aof-rewrite-min-size",
@@ -2327,6 +2338,7 @@ int rewriteConfig(char *path) {
rewriteConfigNumericalOption(state,"active-defrag-cycle-min",server.active_defrag_cycle_min,CONFIG_DEFAULT_DEFRAG_CYCLE_MIN);
rewriteConfigNumericalOption(state,"active-defrag-cycle-max",server.active_defrag_cycle_max,CONFIG_DEFAULT_DEFRAG_CYCLE_MAX);
rewriteConfigNumericalOption(state,"active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS);
+ rewriteConfigNumericalOption(state,"active-expire-effort",server.active_expire_effort,CONFIG_DEFAULT_ACTIVE_EXPIRE_EFFORT);
rewriteConfigYesNoOption(state,"appendonly",server.aof_enabled,0);
rewriteConfigStringOption(state,"appendfilename",server.aof_filename,CONFIG_DEFAULT_AOF_FILENAME);
rewriteConfigEnumOption(state,"appendfsync",server.aof_fsync,aof_fsync_enum,CONFIG_DEFAULT_AOF_FSYNC);
diff --git a/src/db.c b/src/db.c
index 2c0a0cdd3..aac049eb7 100644
--- a/src/db.c
+++ b/src/db.c
@@ -151,9 +151,13 @@ robj *lookupKeyRead(redisDb *db, robj *key) {
*
* Returns the linked value object if the key exists or NULL if the key
* does not exist in the specified DB. */
-robj *lookupKeyWrite(redisDb *db, robj *key) {
+robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
expireIfNeeded(db,key);
- return lookupKey(db,key,LOOKUP_NONE);
+ return lookupKey(db,key,flags);
+}
+
+robj *lookupKeyWrite(redisDb *db, robj *key) {
+ return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE);
}
robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
@@ -461,6 +465,29 @@ int getFlushCommandFlags(client *c, int *flags) {
return C_OK;
}
+/* Flushes the whole server data set. */
+void flushAllDataAndResetRDB(int flags) {
+ server.dirty += emptyDb(-1,flags,NULL);
+ if (server.rdb_child_pid != -1) killRDBChild();
+ if (server.saveparamslen > 0) {
+ /* Normally rdbSave() will reset dirty, but we don't want this here
+ * as otherwise FLUSHALL will not be replicated nor put into the AOF. */
+ int saved_dirty = server.dirty;
+ rdbSaveInfo rsi, *rsiptr;
+ rsiptr = rdbPopulateSaveInfo(&rsi);
+ rdbSave(server.rdb_filename,rsiptr);
+ server.dirty = saved_dirty;
+ }
+ server.dirty++;
+#if defined(USE_JEMALLOC)
+ /* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
+ * for large databases, flushdb blocks for long anyway, so a bit more won't
+ * harm and this way the flush and purge will be synchroneus. */
+ if (!(flags & EMPTYDB_ASYNC))
+ jemalloc_purge();
+#endif
+}
+
/* FLUSHDB [ASYNC]
*
* Flushes the currently SELECTed Redis DB. */
@@ -484,28 +511,9 @@ void flushdbCommand(client *c) {
* Flushes the whole server data set. */
void flushallCommand(client *c) {
int flags;
-
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
- server.dirty += emptyDb(-1,flags,NULL);
+ flushAllDataAndResetRDB(flags);
addReply(c,shared.ok);
- if (server.rdb_child_pid != -1) killRDBChild();
- if (server.saveparamslen > 0) {
- /* Normally rdbSave() will reset dirty, but we don't want this here
- * as otherwise FLUSHALL will not be replicated nor put into the AOF. */
- int saved_dirty = server.dirty;
- rdbSaveInfo rsi, *rsiptr;
- rsiptr = rdbPopulateSaveInfo(&rsi);
- rdbSave(server.rdb_filename,rsiptr);
- server.dirty = saved_dirty;
- }
- server.dirty++;
-#if defined(USE_JEMALLOC)
- /* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
- * for large databases, flushdb blocks for long anyway, so a bit more won't
- * harm and this way the flush and purge will be synchroneus. */
- if (!(flags & EMPTYDB_ASYNC))
- jemalloc_purge();
-#endif
}
/* This command implements DEL and LAZYDEL. */
@@ -1069,10 +1077,12 @@ int dbSwapDatabases(long id1, long id2) {
db1->dict = db2->dict;
db1->expires = db2->expires;
db1->avg_ttl = db2->avg_ttl;
+ db1->expires_cursor = db2->expires_cursor;
db2->dict = aux.dict;
db2->expires = aux.expires;
db2->avg_ttl = aux.avg_ttl;
+ db2->expires_cursor = aux.expires_cursor;
/* Now we need to handle clients blocked on lists: as an effect
* of swapping the two DBs, a client that was waiting for list
@@ -1188,6 +1198,7 @@ void propagateExpire(redisDb *db, robj *key, int lazy) {
/* Check if the key is expired. */
int keyIsExpired(redisDb *db, robj *key) {
mstime_t when = getExpire(db,key);
+ mstime_t now;
if (when < 0) return 0; /* No expire for this key */
@@ -1199,8 +1210,26 @@ int keyIsExpired(redisDb *db, robj *key) {
* only the first time it is accessed and not in the middle of the
* script execution, making propagation to slaves / AOF consistent.
* See issue #1525 on Github for more information. */
- mstime_t now = server.lua_caller ? server.lua_time_start : mstime();
+ if (server.lua_caller) {
+ now = server.lua_time_start;
+ }
+ /* If we are in the middle of a command execution, we still want to use
+ * a reference time that does not change: in that case we just use the
+ * cached time, that we update before each call in the call() function.
+ * This way we avoid that commands such as RPOPLPUSH or similar, that
+ * may re-open the same key multiple times, can invalidate an already
+ * open object in a next call, if the next call will see the key expired,
+ * while the first did not. */
+ else if (server.fixed_time_expire > 0) {
+ now = server.mstime;
+ }
+ /* For the other cases, we want to use the most fresh time we have. */
+ else {
+ now = mstime();
+ }
+ /* The key expired if the current (virtual or real) time is greater
+ * than the expire time of the key. */
return now > when;
}
diff --git a/src/debug.c b/src/debug.c
index 179f6d2c9..a2d37337d 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -417,7 +417,7 @@ NULL
}
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
protectClient(c);
- int ret = rdbLoad(server.rdb_filename,NULL);
+ int ret = rdbLoad(server.rdb_filename,NULL,RDBFLAGS_NONE);
unprotectClient(c);
if (ret != C_OK) {
addReplyError(c,"Error trying to load the RDB dump");
diff --git a/src/defrag.c b/src/defrag.c
index e794c8e41..04e57955b 100644
--- a/src/defrag.c
+++ b/src/defrag.c
@@ -919,10 +919,12 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) {
return 0;
}
+/* static variables serving defragLaterStep to continue scanning a key from were we stopped last time. */
+static sds defrag_later_current_key = NULL;
+static unsigned long defrag_later_cursor = 0;
+
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
int defragLaterStep(redisDb *db, long long endtime) {
- static sds current_key = NULL;
- static unsigned long cursor = 0;
unsigned int iterations = 0;
unsigned long long prev_defragged = server.stat_active_defrag_hits;
unsigned long long prev_scanned = server.stat_active_defrag_scanned;
@@ -930,16 +932,15 @@ int defragLaterStep(redisDb *db, long long endtime) {
do {
/* if we're not continuing a scan from the last call or loop, start a new one */
- if (!cursor) {
+ if (!defrag_later_cursor) {
listNode *head = listFirst(db->defrag_later);
/* Move on to next key */
- if (current_key) {
- serverAssert(current_key == head->value);
- sdsfree(head->value);
+ if (defrag_later_current_key) {
+ serverAssert(defrag_later_current_key == head->value);
listDelNode(db->defrag_later, head);
- cursor = 0;
- current_key = NULL;
+ defrag_later_cursor = 0;
+ defrag_later_current_key = NULL;
}
/* stop if we reached the last one. */
@@ -948,21 +949,21 @@ int defragLaterStep(redisDb *db, long long endtime) {
return 0;
/* start a new key */
- current_key = head->value;
- cursor = 0;
+ defrag_later_current_key = head->value;
+ defrag_later_cursor = 0;
}
/* each time we enter this function we need to fetch the key from the dict again (if it still exists) */
- dictEntry *de = dictFind(db->dict, current_key);
+ dictEntry *de = dictFind(db->dict, defrag_later_current_key);
key_defragged = server.stat_active_defrag_hits;
do {
int quit = 0;
- if (defragLaterItem(de, &cursor, endtime))
+ if (defragLaterItem(de, &defrag_later_cursor, endtime))
quit = 1; /* time is up, we didn't finish all the work */
/* Don't start a new BIG key in this loop, this is because the
* next key can be a list, and scanLaterList must be done in once cycle */
- if (!cursor)
+ if (!defrag_later_cursor)
quit = 1;
/* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields
@@ -982,7 +983,7 @@ int defragLaterStep(redisDb *db, long long endtime) {
prev_defragged = server.stat_active_defrag_hits;
prev_scanned = server.stat_active_defrag_scanned;
}
- } while(cursor);
+ } while(defrag_later_cursor);
if(key_defragged != server.stat_active_defrag_hits)
server.stat_active_defrag_key_hits++;
else
@@ -1039,6 +1040,21 @@ void activeDefragCycle(void) {
mstime_t latency;
int quit = 0;
+ if (!server.active_defrag_enabled) {
+ if (server.active_defrag_running) {
+ /* if active defrag was disabled mid-run, start from fresh next time. */
+ server.active_defrag_running = 0;
+ if (db)
+ listEmpty(db->defrag_later);
+ defrag_later_current_key = NULL;
+ defrag_later_cursor = 0;
+ current_db = -1;
+ cursor = 0;
+ db = NULL;
+ }
+ return;
+ }
+
if (hasActiveChildProcess())
return; /* Defragging memory while there's a fork will just do damage. */
diff --git a/src/expire.c b/src/expire.c
index 598b27f96..b4ab9ab18 100644
--- a/src/expire.c
+++ b/src/expire.c
@@ -78,24 +78,63 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
* it will get more aggressive to avoid that too much memory is used by
* keys that can be removed from the keyspace.
*
- * No more than CRON_DBS_PER_CALL databases are tested at every
- * iteration.
+ * Every expire cycle tests multiple databases: the next call will start
+ * again from the next db, with the exception of exists for time limit: in that
+ * case we restart again from the last database we were processing. Anyway
+ * no more than CRON_DBS_PER_CALL databases are tested at every iteration.
*
- * This kind of call is used when Redis detects that timelimit_exit is
- * true, so there is more work to do, and we do it more incrementally from
- * the beforeSleep() function of the event loop.
+ * The function can perform more or less work, depending on the "type"
+ * argument. It can execute a "fast cycle" or a "slow cycle". The slow
+ * cycle is the main way we collect expired cycles: this happens with
+ * the "server.hz" frequency (usually 10 hertz).
*
- * Expire cycle type:
+ * However the slow cycle can exit for timeout, since it used too much time.
+ * For this reason the function is also invoked to perform a fast cycle
+ * at every event loop cycle, in the beforeSleep() function. The fast cycle
+ * will try to perform less work, but will do it much more often.
+ *
+ * The following are the details of the two expire cycles and their stop
+ * conditions:
*
* If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a
* "fast" expire cycle that takes no longer than EXPIRE_FAST_CYCLE_DURATION
* microseconds, and is not repeated again before the same amount of time.
+ * The cycle will also refuse to run at all if the latest slow cycle did not
+ * terminate because of a time limit condition.
*
* If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is
* executed, where the time limit is a percentage of the REDIS_HZ period
- * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. */
+ * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. In the
+ * fast cycle, the check of every database is interrupted once the number
+ * of already expired keys in the database is estimated to be lower than
+ * a given percentage, in order to avoid doing too much work to gain too
+ * little memory.
+ *
+ * The configured expire "effort" will modify the baseline parameters in
+ * order to do more work in both the fast and slow expire cycles.
+ */
+
+#define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 20 /* Keys for each DB loop. */
+#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds. */
+#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Max % of CPU to use. */
+#define ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE 10 /* % of stale keys after which
+ we do extra efforts. */
void activeExpireCycle(int type) {
+ /* Adjust the running parameters according to the configured expire
+ * effort. The default effort is 1, and the maximum configurable effort
+ * is 10. */
+ unsigned long
+ effort = server.active_expire_effort-1, /* Rescale from 0 to 9. */
+ config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP +
+ ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4*effort,
+ config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION +
+ ACTIVE_EXPIRE_CYCLE_FAST_DURATION/4*effort,
+ config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC +
+ 2*effort,
+ config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE-
+ effort;
+
/* This function has some global state in order to continue the work
* incrementally across calls. */
static unsigned int current_db = 0; /* Last DB tested. */
@@ -113,10 +152,16 @@ void activeExpireCycle(int type) {
if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
/* Don't start a fast cycle if the previous cycle did not exit
- * for time limit. Also don't repeat a fast cycle for the same period
+ * for time limit, unless the percentage of estimated stale keys is
+ * too high. Also never repeat a fast cycle for the same period
* as the fast cycle total duration itself. */
- if (!timelimit_exit) return;
- if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return;
+ if (!timelimit_exit &&
+ server.stat_expired_stale_perc < config_cycle_acceptable_stale)
+ return;
+
+ if (start < last_fast_cycle + (long long)config_cycle_fast_duration*2)
+ return;
+
last_fast_cycle = start;
}
@@ -130,16 +175,16 @@ void activeExpireCycle(int type) {
if (dbs_per_call > server.dbnum || timelimit_exit)
dbs_per_call = server.dbnum;
- /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time
- * per iteration. Since this function gets called with a frequency of
+ /* We can use at max 'config_cycle_slow_time_perc' percentage of CPU
+ * time per iteration. Since this function gets called with a frequency of
* server.hz times per second, the following is the max amount of
* microseconds we can spend in this function. */
- timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;
+ timelimit = config_cycle_slow_time_perc*1000000/server.hz/100;
timelimit_exit = 0;
if (timelimit <= 0) timelimit = 1;
if (type == ACTIVE_EXPIRE_CYCLE_FAST)
- timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */
+ timelimit = config_cycle_fast_duration; /* in microseconds. */
/* Accumulate some global stats as we expire keys, to have some idea
* about the number of keys that are already logically expired, but still
@@ -148,7 +193,9 @@ void activeExpireCycle(int type) {
long total_expired = 0;
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
- int expired;
+ /* Expired and checked in a single loop. */
+ unsigned long expired, sampled;
+
redisDb *db = server.db+(current_db % server.dbnum);
/* Increment the DB now so we are sure if we run out of time
@@ -172,8 +219,8 @@ void activeExpireCycle(int type) {
slots = dictSlots(db->expires);
now = mstime();
- /* When there are less than 1% filled slots getting random
- * keys is expensive, so stop here waiting for better times...
+ /* When there are less than 1% filled slots, sampling the key
+ * space is expensive, so stop here waiting for better times...
* The dictionary will be resized asap. */
if (num && slots > DICT_HT_INITIAL_SIZE &&
(num*100/slots < 1)) break;
@@ -181,27 +228,58 @@ void activeExpireCycle(int type) {
/* The main collection cycle. Sample random keys among keys
* with an expire set, checking for expired ones. */
expired = 0;
+ sampled = 0;
ttl_sum = 0;
ttl_samples = 0;
- if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
- num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
-
- while (num--) {
- dictEntry *de;
- long long ttl;
-
- if ((de = dictGetRandomKey(db->expires)) == NULL) break;
- ttl = dictGetSignedIntegerVal(de)-now;
- if (activeExpireCycleTryExpire(db,de,now)) expired++;
- if (ttl > 0) {
- /* We want the average TTL of keys yet not expired. */
- ttl_sum += ttl;
- ttl_samples++;
+ if (num > config_keys_per_loop)
+ num = config_keys_per_loop;
+
+ /* Here we access the low level representation of the hash table
+ * for speed concerns: this makes this code coupled with dict.c,
+ * but it hardly changed in ten years.
+ *
+ * Note that certain places of the hash table may be empty,
+ * so we want also a stop condition about the number of
+ * buckets that we scanned. However scanning for free buckets
+ * is very fast: we are in the cache line scanning a sequential
+ * array of NULL pointers, so we can scan a lot more buckets
+ * than keys in the same time. */
+ long max_buckets = num*20;
+ long checked_buckets = 0;
+
+ while (sampled < num && checked_buckets < max_buckets) {
+ for (int table = 0; table < 2; table++) {
+ if (table == 1 && !dictIsRehashing(db->expires)) break;
+
+ unsigned long idx = db->expires_cursor;
+ idx &= db->expires->ht[table].sizemask;
+ dictEntry *de = db->expires->ht[table].table[idx];
+ long long ttl;
+
+ /* Scan the current bucket of the current table. */
+ checked_buckets++;
+ while(de) {
+ /* Get the next entry now since this entry may get
+ * deleted. */
+ dictEntry *e = de;
+ de = de->next;
+
+ ttl = dictGetSignedIntegerVal(e)-now;
+ if (activeExpireCycleTryExpire(db,e,now)) expired++;
+ if (ttl > 0) {
+ /* We want the average TTL of keys yet
+ * not expired. */
+ ttl_sum += ttl;
+ ttl_samples++;
+ }
+ sampled++;
+ }
}
- total_sampled++;
+ db->expires_cursor++;
}
total_expired += expired;
+ total_sampled += sampled;
/* Update the average TTL stats for this database. */
if (ttl_samples) {
@@ -225,12 +303,14 @@ void activeExpireCycle(int type) {
break;
}
}
- /* We don't repeat the cycle if there are less than 25% of keys
- * found expired in the current DB. */
- } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
+ /* We don't repeat the cycle for the current database if there are
+ * an acceptable amount of stale keys (logically expired but yet
+ * not reclained). */
+ } while ((expired*100/sampled) > config_cycle_acceptable_stale);
}
elapsed = ustime()-start;
+ server.stat_expire_cycle_time_used += elapsed;
latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
/* Update our estimate of keys existing but yet to be expired.
diff --git a/src/mkreleasehdr.sh b/src/mkreleasehdr.sh
index e6d558b17..236c26c2b 100755
--- a/src/mkreleasehdr.sh
+++ b/src/mkreleasehdr.sh
@@ -3,7 +3,7 @@ GIT_SHA1=`(git show-ref --head --hash=8 2> /dev/null || echo 00000000) | head -n
GIT_DIRTY=`git diff --no-ext-diff 2> /dev/null | wc -l`
BUILD_ID=`uname -n`"-"`date +%s`
if [ -n "$SOURCE_DATE_EPOCH" ]; then
- BUILD_ID=$(date -u -d "@$SOURCE_DATE_EPOCH" +%s 2>/dev/null || date -u -r "$SOURCE_DATE_EPOCH" +%s 2>/dev/null || date -u %s)
+ BUILD_ID=$(date -u -d "@$SOURCE_DATE_EPOCH" +%s 2>/dev/null || date -u -r "$SOURCE_DATE_EPOCH" +%s 2>/dev/null || date -u +%s)
fi
test -f release.h || touch release.h
(cat release.h | grep SHA1 | grep $GIT_SHA1) && \
diff --git a/src/module.c b/src/module.c
index 2644883c9..be64af368 100644
--- a/src/module.c
+++ b/src/module.c
@@ -524,7 +524,8 @@ int moduleDelKeyIfEmpty(RedisModuleKey *key) {
case OBJ_LIST: isempty = listTypeLength(o) == 0; break;
case OBJ_SET: isempty = setTypeSize(o) == 0; break;
case OBJ_ZSET: isempty = zsetLength(o) == 0; break;
- case OBJ_HASH : isempty = hashTypeLength(o) == 0; break;
+ case OBJ_HASH: isempty = hashTypeLength(o) == 0; break;
+ case OBJ_STREAM: isempty = streamLength(o) == 0; break;
default: isempty = 0;
}
@@ -1019,6 +1020,21 @@ RedisModuleString *RM_CreateStringFromLongLong(RedisModuleCtx *ctx, long long ll
return RM_CreateString(ctx,buf,len);
}
+/* Like RedisModule_CreatString(), but creates a string starting from a long
+ * double.
+ *
+ * The returned string must be released with RedisModule_FreeString() or by
+ * enabling automatic memory management.
+ *
+ * The passed context 'ctx' may be NULL if necessary, see the
+ * RedisModule_CreateString() documentation for more info. */
+RedisModuleString *RM_CreateStringFromLongDouble(RedisModuleCtx *ctx, long double ld, int humanfriendly) {
+ char buf[MAX_LONG_DOUBLE_CHARS];
+ size_t len = ld2string(buf,sizeof(buf),ld,
+ (humanfriendly ? LD_STR_HUMAN : LD_STR_AUTO));
+ return RM_CreateString(ctx,buf,len);
+}
+
/* Like RedisModule_CreatString(), but creates a string starting from another
* RedisModuleString.
*
@@ -1123,6 +1139,14 @@ int RM_StringToDouble(const RedisModuleString *str, double *d) {
return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR;
}
+/* Convert the string into a long double, storing it at `*ld`.
+ * Returns REDISMODULE_OK on success or REDISMODULE_ERR if the string is
+ * not a valid string representation of a double value. */
+int RM_StringToLongDouble(const RedisModuleString *str, long double *ld) {
+ int retval = string2ld(str->ptr,sdslen(str->ptr),ld);
+ return retval ? REDISMODULE_OK : REDISMODULE_ERR;
+}
+
/* Compare two string objects, returning -1, 0 or 1 respectively if
* a < b, a == b, a > b. Strings are compared byte by byte as two
* binary blobs without any encoding care / collation attempt. */
@@ -1396,7 +1420,7 @@ int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) {
int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) {
client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK;
- addReplyBulkCBuffer(c, "", 0);
+ addReply(c,shared.emptybulk);
return REDISMODULE_OK;
}
@@ -1411,8 +1435,7 @@ int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len)
return REDISMODULE_OK;
}
-/* Reply to the client with a NULL. In the RESP protocol a NULL is encoded
- * as the string "$-1\r\n".
+/* Reply to the client with a NULL.
*
* The function always returns REDISMODULE_OK. */
int RM_ReplyWithNull(RedisModuleCtx *ctx) {
@@ -1449,6 +1472,21 @@ int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) {
return REDISMODULE_OK;
}
+/* Send a string reply obtained converting the long double 'ld' into a bulk
+ * string. This function is basically equivalent to converting a long double
+ * into a string into a C buffer, and then calling the function
+ * RedisModule_ReplyWithStringBuffer() with the buffer and length.
+ * The double string uses human readable formatting (see
+ * `addReplyHumanLongDouble` in networking.c).
+ *
+ * The function always returns REDISMODULE_OK. */
+int RM_ReplyWithLongDouble(RedisModuleCtx *ctx, long double ld) {
+ client *c = moduleGetReplyClient(ctx);
+ if (c == NULL) return REDISMODULE_OK;
+ addReplyHumanLongDouble(c, ld);
+ return REDISMODULE_OK;
+}
+
/* --------------------------------------------------------------------------
* Commands replication API
* -------------------------------------------------------------------------- */
@@ -1632,6 +1670,27 @@ int modulePopulateClientInfoStructure(void *ci, client *client, int structver) {
return REDISMODULE_OK;
}
+/* This is an helper for moduleFireServerEvent() and other functions:
+ * It populates the replication info structure with the appropriate
+ * fields depending on the version provided. If the version is not valid
+ * then REDISMODULE_ERR is returned. Otherwise the function returns
+ * REDISMODULE_OK and the structure pointed by 'ri' gets populated. */
+int modulePopulateReplicationInfoStructure(void *ri, int structver) {
+ if (structver != 1) return REDISMODULE_ERR;
+
+ RedisModuleReplicationInfoV1 *ri1 = ri;
+ memset(ri1,0,sizeof(*ri1));
+ ri1->version = structver;
+ ri1->master = server.masterhost==NULL;
+ ri1->masterhost = server.masterhost? server.masterhost: "";
+ ri1->masterport = server.masterport;
+ ri1->replid1 = server.replid;
+ ri1->replid2 = server.replid2;
+ ri1->repl1_offset = server.master_repl_offset;
+ ri1->repl2_offset = server.second_replid_offset;
+ return REDISMODULE_OK;
+}
+
/* Return information about the client with the specified ID (that was
* previously obtained via the RedisModule_GetClientId() API). If the
* client exists, REDISMODULE_OK is returned, otherwise REDISMODULE_ERR
@@ -1684,6 +1743,15 @@ int RM_GetClientInfoById(void *ci, uint64_t id) {
return modulePopulateClientInfoStructure(ci,client,structver);
}
+/* Publish a message to subscribers (see PUBLISH command). */
+int RM_PublishMessage(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) {
+ UNUSED(ctx);
+ int receivers = pubsubPublishMessage(channel, message);
+ if (server.cluster_enabled)
+ clusterPropagatePublish(channel, message);
+ return receivers;
+}
+
/* Return the currently selected DB. */
int RM_GetSelectedDb(RedisModuleCtx *ctx) {
return ctx->client->db->id;
@@ -1726,6 +1794,8 @@ int RM_GetSelectedDb(RedisModuleCtx *ctx) {
* * REDISMODULE_CTX_FLAGS_OOM_WARNING: Less than 25% of memory remains before
* reaching the maxmemory level.
*
+ * * REDISMODULE_CTX_FLAGS_LOADING: Server is loading RDB/AOF
+ *
* * REDISMODULE_CTX_FLAGS_REPLICA_IS_STALE: No active link with the master.
*
* * REDISMODULE_CTX_FLAGS_REPLICA_IS_CONNECTING: The replica is trying to
@@ -1825,6 +1895,18 @@ int RM_SelectDb(RedisModuleCtx *ctx, int newid) {
return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR;
}
+/* Initialize a RedisModuleKey struct */
+static void moduleInitKey(RedisModuleKey *kp, RedisModuleCtx *ctx, robj *keyname, robj *value, int mode){
+ kp->ctx = ctx;
+ kp->db = ctx->client->db;
+ kp->key = keyname;
+ incrRefCount(keyname);
+ kp->value = value;
+ kp->iter = NULL;
+ kp->mode = mode;
+ zsetKeyReset(kp);
+}
+
/* Return an handle representing a Redis key, so that it is possible
* to call other APIs with the key handle as argument to perform
* operations on the key.
@@ -1842,11 +1924,12 @@ int RM_SelectDb(RedisModuleCtx *ctx, int newid) {
void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
RedisModuleKey *kp;
robj *value;
+ int flags = mode & REDISMODULE_OPEN_KEY_NOTOUCH? LOOKUP_NOTOUCH: 0;
if (mode & REDISMODULE_WRITE) {
- value = lookupKeyWrite(ctx->client->db,keyname);
+ value = lookupKeyWriteWithFlags(ctx->client->db,keyname, flags);
} else {
- value = lookupKeyRead(ctx->client->db,keyname);
+ value = lookupKeyReadWithFlags(ctx->client->db,keyname, flags);
if (value == NULL) {
return NULL;
}
@@ -1854,27 +1937,25 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
/* Setup the key handle. */
kp = zmalloc(sizeof(*kp));
- kp->ctx = ctx;
- kp->db = ctx->client->db;
- kp->key = keyname;
- incrRefCount(keyname);
- kp->value = value;
- kp->iter = NULL;
- kp->mode = mode;
- zsetKeyReset(kp);
+ moduleInitKey(kp, ctx, keyname, value, mode);
autoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp);
return (void*)kp;
}
-/* Close a key handle. */
-void RM_CloseKey(RedisModuleKey *key) {
- if (key == NULL) return;
+/* Destroy a RedisModuleKey struct (freeing is the responsibility of the caller). */
+static void moduleCloseKey(RedisModuleKey *key) {
int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx);
if ((key->mode & REDISMODULE_WRITE) && signal)
signalModifiedKey(key->db,key->key);
/* TODO: if (key->iter) RM_KeyIteratorStop(kp); */
RM_ZsetRangeStop(key);
decrRefCount(key->key);
+}
+
+/* Close a key handle. */
+void RM_CloseKey(RedisModuleKey *key) {
+ if (key == NULL) return;
+ moduleCloseKey(key);
autoMemoryFreed(key->ctx,REDISMODULE_AM_KEY,key);
zfree(key);
}
@@ -1892,6 +1973,7 @@ int RM_KeyType(RedisModuleKey *key) {
case OBJ_ZSET: return REDISMODULE_KEYTYPE_ZSET;
case OBJ_HASH: return REDISMODULE_KEYTYPE_HASH;
case OBJ_MODULE: return REDISMODULE_KEYTYPE_MODULE;
+ case OBJ_STREAM: return REDISMODULE_KEYTYPE_STREAM;
default: return 0;
}
}
@@ -1909,6 +1991,7 @@ size_t RM_ValueLength(RedisModuleKey *key) {
case OBJ_SET: return setTypeSize(key->value);
case OBJ_ZSET: return zsetLength(key->value);
case OBJ_HASH: return hashTypeLength(key->value);
+ case OBJ_STREAM: return streamLength(key->value);
default: return 0;
}
}
@@ -1926,7 +2009,7 @@ int RM_DeleteKey(RedisModuleKey *key) {
return REDISMODULE_OK;
}
-/* If the key is open for writing, unlink it (that is delete it in a
+/* If the key is open for writing, unlink it (that is delete it in a
* non-blocking way, not reclaiming memory immediately) and setup the key to
* accept new writes as an empty key (that will be created on demand).
* On success REDISMODULE_OK is returned. If the key is not open for
@@ -1971,6 +2054,28 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) {
return REDISMODULE_OK;
}
+/* Performs similar operation to FLUSHALL, and optionally start a new AOF file (if enabled)
+ * If restart_aof is true, you must make sure the command that triggered this call is not
+ * propagated to the AOF file.
+ * When async is set to true, db contents will be freed by a background thread. */
+void RM_ResetDataset(int restart_aof, int async) {
+ if (restart_aof && server.aof_state != AOF_OFF) stopAppendOnly();
+ flushAllDataAndResetRDB(async? EMPTYDB_ASYNC: EMPTYDB_NO_FLAGS);
+ if (server.aof_enabled && restart_aof) restartAOFAfterSYNC();
+}
+
+/* Returns the number of keys in the current db. */
+unsigned long long RM_DbSize(RedisModuleCtx *ctx) {
+ return dictSize(ctx->client->db->dict);
+}
+
+/* Returns a name of a random key, or NULL if current db is empty. */
+RedisModuleString *RM_RandomKey(RedisModuleCtx *ctx) {
+ robj *key = dbRandomKey(ctx->client->db);
+ autoMemoryAdd(ctx,REDISMODULE_AM_STRING,key);
+ return key;
+}
+
/* --------------------------------------------------------------------------
* Key API for String type
* -------------------------------------------------------------------------- */
@@ -3064,7 +3169,9 @@ fmterr:
* On success a RedisModuleCallReply object is returned, otherwise
* NULL is returned and errno is set to the following values:
*
- * EINVAL: command non existing, wrong arity, wrong format specifier.
+ * EBADF: wrong format specifier.
+ * EINVAL: wrong command arity.
+ * ENOENT: command does not exist.
* EPERM: operation in Cluster instance with key in non local slot.
*
* This API is documented here: https://redis.io/topics/modules-intro
@@ -3096,7 +3203,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
/* We handle the above format error only when the client is setup so that
* we can free it normally. */
if (argv == NULL) {
- errno = EINVAL;
+ errno = EBADF;
goto cleanup;
}
@@ -3108,7 +3215,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
*/
cmd = lookupCommand(c->argv[0]->ptr);
if (!cmd) {
- errno = EINVAL;
+ errno = ENOENT;
goto cleanup;
}
c->cmd = c->lastcmd = cmd;
@@ -3723,6 +3830,31 @@ loaderr:
return 0;
}
+/* In the context of the rdb_save method of a module data type, saves a long double
+ * value to the RDB file. The double can be a valid number, a NaN or infinity.
+ * It is possible to load back the value with RedisModule_LoadLongDouble(). */
+void RM_SaveLongDouble(RedisModuleIO *io, long double value) {
+ if (io->error) return;
+ char buf[MAX_LONG_DOUBLE_CHARS];
+ /* Long double has different number of bits in different platforms, so we
+ * save it as a string type. */
+ size_t len = ld2string(buf,sizeof(buf),value,LD_STR_HEX);
+ RM_SaveStringBuffer(io,buf,len+1); /* len+1 for '\0' */
+}
+
+/* In the context of the rdb_save method of a module data type, loads back the
+ * long double value saved by RedisModule_SaveLongDouble(). */
+long double RM_LoadLongDouble(RedisModuleIO *io) {
+ if (io->error) return 0;
+ long double value;
+ size_t len;
+ char* str = RM_LoadStringBuffer(io,&len);
+ if (!str) return 0;
+ string2ld(str,len,&value);
+ RM_Free(str);
+ return value;
+}
+
/* Iterate over modules, and trigger rdb aux saving for the ones modules types
* who asked for it. */
ssize_t rdbSaveModulesAux(rio *rdb, int when) {
@@ -3813,6 +3945,59 @@ void RM_DigestEndSequence(RedisModuleDigest *md) {
memset(md->o,0,sizeof(md->o));
}
+/* Decode a serialized representation of a module data type 'mt' from string
+ * 'str' and return a newly allocated value, or NULL if decoding failed.
+ *
+ * This call basically reuses the 'rdb_load' callback which module data types
+ * implement in order to allow a module to arbitrarily serialize/de-serialize
+ * keys, similar to how the Redis 'DUMP' and 'RESTORE' commands are implemented.
+ *
+ * Modules should generally use the REDISMODULE_OPTIONS_HANDLE_IO_ERRORS flag and
+ * make sure the de-serialization code properly checks and handles IO errors
+ * (freeing allocated buffers and returning a NULL).
+ *
+ * If this is NOT done, Redis will handle corrupted (or just truncated) serialized
+ * data by producing an error message and terminating the process.
+ */
+
+void *RM_LoadDataTypeFromString(const RedisModuleString *str, const moduleType *mt) {
+ rio payload;
+ RedisModuleIO io;
+
+ rioInitWithBuffer(&payload, str->ptr);
+ moduleInitIOContext(io,(moduleType *)mt,&payload,NULL);
+
+ /* All RM_Save*() calls always write a version 2 compatible format, so we
+ * need to make sure we read the same.
+ */
+ io.ver = 2;
+ return mt->rdb_load(&io,0);
+}
+
+/* Encode a module data type 'mt' value 'data' into serialized form, and return it
+ * as a newly allocated RedisModuleString.
+ *
+ * This call basically reuses the 'rdb_save' callback which module data types
+ * implement in order to allow a module to arbitrarily serialize/de-serialize
+ * keys, similar to how the Redis 'DUMP' and 'RESTORE' commands are implemented.
+ */
+
+RedisModuleString *RM_SaveDataTypeToString(RedisModuleCtx *ctx, void *data, const moduleType *mt) {
+ rio payload;
+ RedisModuleIO io;
+
+ rioInitWithBuffer(&payload,sdsempty());
+ moduleInitIOContext(io,(moduleType *)mt,&payload,NULL);
+ mt->rdb_save(&io,data);
+ if (io.error) {
+ return NULL;
+ } else {
+ robj *str = createObject(OBJ_STRING,payload.io.buffer.ptr);
+ autoMemoryAdd(ctx,REDISMODULE_AM_STRING,str);
+ return str;
+ }
+}
+
/* --------------------------------------------------------------------------
* AOF API for modules data types
* -------------------------------------------------------------------------- */
@@ -5940,6 +6125,263 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos)
return REDISMODULE_OK;
}
+/* For a given pointer allocated via RedisModule_Alloc() or
+ * RedisModule_Realloc(), return the amount of memory allocated for it.
+ * Note that this may be different (larger) than the memory we allocated
+ * with the allocation calls, since sometimes the underlying allocator
+ * will allocate more memory.
+ */
+size_t RM_MallocSize(void* ptr){
+ return zmalloc_size(ptr);
+}
+
+/* Return the a number between 0 to 1 indicating the amount of memory
+ * currently used, relative to the Redis "maxmemory" configuration.
+ *
+ * 0 - No memory limit configured.
+ * Between 0 and 1 - The percentage of the memory used normalized in 0-1 range.
+ * Exactly 1 - Memory limit reached.
+ * Greater 1 - More memory used than the configured limit.
+ */
+float RM_GetUsedMemoryRatio(){
+ float level;
+ getMaxmemoryState(NULL, NULL, NULL, &level);
+ return level;
+}
+
+/* --------------------------------------------------------------------------
+ * Scanning keyspace and hashes
+ * -------------------------------------------------------------------------- */
+
+typedef void (*RedisModuleScanCB)(RedisModuleCtx *ctx, RedisModuleString *keyname, RedisModuleKey *key, void *privdata);
+typedef struct {
+ RedisModuleCtx *ctx;
+ void* user_data;
+ RedisModuleScanCB fn;
+} ScanCBData;
+
+typedef struct RedisModuleScanCursor{
+ int cursor;
+ int done;
+}RedisModuleScanCursor;
+
+static void moduleScanCallback(void *privdata, const dictEntry *de) {
+ ScanCBData *data = privdata;
+ sds key = dictGetKey(de);
+ robj* val = dictGetVal(de);
+ RedisModuleString *keyname = createObject(OBJ_STRING,sdsdup(key));
+
+ /* Setup the key handle. */
+ RedisModuleKey kp = {0};
+ moduleInitKey(&kp, data->ctx, keyname, val, REDISMODULE_READ);
+
+ data->fn(data->ctx, keyname, &kp, data->user_data);
+
+ moduleCloseKey(&kp);
+ decrRefCount(keyname);
+}
+
+/* Create a new cursor to be used with RedisModule_Scan */
+RedisModuleScanCursor *RM_ScanCursorCreate() {
+ RedisModuleScanCursor* cursor = zmalloc(sizeof(*cursor));
+ cursor->cursor = 0;
+ cursor->done = 0;
+ return cursor;
+}
+
+/* Restart an existing cursor. The keys will be rescanned. */
+void RM_ScanCursorRestart(RedisModuleScanCursor *cursor) {
+ cursor->cursor = 0;
+ cursor->done = 0;
+}
+
+/* Destroy the cursor struct. */
+void RM_ScanCursorDestroy(RedisModuleScanCursor *cursor) {
+ zfree(cursor);
+}
+
+/* Scan api that allows a module to scan all the keys and value in the selected db.
+ *
+ * Callback for scan implementation.
+ * void scan_callback(RedisModuleCtx *ctx, RedisModuleString *keyname, RedisModuleKey *key, void *privdata);
+ * - ctx - the redis module context provided to for the scan.
+ * - keyname - owned by the caller and need to be retained if used after this function.
+ * - key - holds info on the key and value, it is provided as best effort, in some cases it might
+ * be NULL, in which case the user should (can) use RedisModule_OpenKey (and CloseKey too).
+ * when it is provided, it is owned by the caller and will be free when the callback returns.
+ * - privdata - the user data provided to RedisModule_Scan.
+ *
+ * The way it should be used:
+ * RedisModuleCursor *c = RedisModule_ScanCursorCreate();
+ * while(RedisModule_Scan(ctx, c, callback, privateData));
+ * RedisModule_ScanCursorDestroy(c);
+ *
+ * It is also possible to use this API from another thread while the lock is acquired durring
+ * the actuall call to RM_Scan:
+ * RedisModuleCursor *c = RedisModule_ScanCursorCreate();
+ * RedisModule_ThreadSafeContextLock(ctx);
+ * while(RedisModule_Scan(ctx, c, callback, privateData)){
+ * RedisModule_ThreadSafeContextUnlock(ctx);
+ * // do some background job
+ * RedisModule_ThreadSafeContextLock(ctx);
+ * }
+ * RedisModule_ScanCursorDestroy(c);
+ *
+ * The function will return 1 if there are more elements to scan and 0 otherwise,
+ * possibly setting errno if the call failed.
+ * It is also possible to restart and existing cursor using RM_CursorRestart. */
+int RM_Scan(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanCB fn, void *privdata) {
+ if (cursor->done) {
+ errno = ENOENT;
+ return 0;
+ }
+ int ret = 1;
+ ScanCBData data = { ctx, privdata, fn };
+ cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, moduleScanCallback, NULL, &data);
+ if (cursor->cursor == 0) {
+ cursor->done = 1;
+ ret = 0;
+ }
+ errno = 0;
+ return ret;
+}
+
+typedef void (*RedisModuleScanKeyCB)(RedisModuleKey *key, RedisModuleString *field, RedisModuleString *value, void *privdata);
+typedef struct {
+ RedisModuleKey *key;
+ void* user_data;
+ RedisModuleScanKeyCB fn;
+} ScanKeyCBData;
+
+static void moduleScanKeyCallback(void *privdata, const dictEntry *de) {
+ ScanKeyCBData *data = privdata;
+ sds key = dictGetKey(de);
+ robj *o = data->key->value;
+ robj *field = createStringObject(key, sdslen(key));
+ robj *value = NULL;
+ if (o->type == OBJ_SET) {
+ value = NULL;
+ } else if (o->type == OBJ_HASH) {
+ sds val = dictGetVal(de);
+ value = createStringObject(val, sdslen(val));
+ } else if (o->type == OBJ_ZSET) {
+ double *val = (double*)dictGetVal(de);
+ value = createStringObjectFromLongDouble(*val, 0);
+ }
+
+ data->fn(data->key, field, value, data->user_data);
+ decrRefCount(field);
+ if (value) decrRefCount(value);
+}
+
+/* Scan api that allows a module to scan the elements in a hash, set or sorted set key
+ *
+ * Callback for scan implementation.
+ * void scan_callback(RedisModuleKey *key, RedisModuleString* field, RedisModuleString* value, void *privdata);
+ * - key - the redis key context provided to for the scan.
+ * - field - field name, owned by the caller and need to be retained if used
+ * after this function.
+ * - value - value string or NULL for set type, owned by the caller and need to
+ * be retained if used after this function.
+ * - privdata - the user data provided to RedisModule_ScanKey.
+ *
+ * The way it should be used:
+ * RedisModuleCursor *c = RedisModule_ScanCursorCreate();
+ * RedisModuleKey *key = RedisModule_OpenKey(...)
+ * while(RedisModule_ScanKey(key, c, callback, privateData));
+ * RedisModule_CloseKey(key);
+ * RedisModule_ScanCursorDestroy(c);
+ *
+ * It is also possible to use this API from another thread while the lock is acquired durring
+ * the actuall call to RM_Scan, and re-opening the key each time:
+ * RedisModuleCursor *c = RedisModule_ScanCursorCreate();
+ * RedisModule_ThreadSafeContextLock(ctx);
+ * RedisModuleKey *key = RedisModule_OpenKey(...)
+ * while(RedisModule_ScanKey(ctx, c, callback, privateData)){
+ * RedisModule_CloseKey(key);
+ * RedisModule_ThreadSafeContextUnlock(ctx);
+ * // do some background job
+ * RedisModule_ThreadSafeContextLock(ctx);
+ * RedisModuleKey *key = RedisModule_OpenKey(...)
+ * }
+ * RedisModule_CloseKey(key);
+ * RedisModule_ScanCursorDestroy(c);
+ *
+ * The function will return 1 if there are more elements to scan and 0 otherwise,
+ * possibly setting errno if the call failed.
+ * It is also possible to restart and existing cursor using RM_CursorRestart. */
+int RM_ScanKey(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleScanKeyCB fn, void *privdata) {
+ if (key == NULL || key->value == NULL) {
+ errno = EINVAL;
+ return 0;
+ }
+ dict *ht = NULL;
+ robj *o = key->value;
+ if (o->type == OBJ_SET) {
+ if (o->encoding == OBJ_ENCODING_HT)
+ ht = o->ptr;
+ } else if (o->type == OBJ_HASH) {
+ if (o->encoding == OBJ_ENCODING_HT)
+ ht = o->ptr;
+ } else if (o->type == OBJ_ZSET) {
+ if (o->encoding == OBJ_ENCODING_SKIPLIST)
+ ht = ((zset *)o->ptr)->dict;
+ } else {
+ errno = EINVAL;
+ return 0;
+ }
+ if (cursor->done) {
+ errno = ENOENT;
+ return 0;
+ }
+ int ret = 1;
+ if (ht) {
+ ScanKeyCBData data = { key, privdata, fn };
+ cursor->cursor = dictScan(ht, cursor->cursor, moduleScanKeyCallback, NULL, &data);
+ if (cursor->cursor == 0) {
+ cursor->done = 1;
+ ret = 0;
+ }
+ } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_INTSET) {
+ int pos = 0;
+ int64_t ll;
+ while(intsetGet(o->ptr,pos++,&ll)) {
+ robj *field = createStringObjectFromLongLong(ll);
+ fn(key, field, NULL, privdata);
+ decrRefCount(field);
+ }
+ cursor->cursor = 1;
+ cursor->done = 1;
+ ret = 0;
+ } else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) {
+ unsigned char *p = ziplistIndex(o->ptr,0);
+ unsigned char *vstr;
+ unsigned int vlen;
+ long long vll;
+ while(p) {
+ ziplistGet(p,&vstr,&vlen,&vll);
+ robj *field = (vstr != NULL) ?
+ createStringObject((char*)vstr,vlen) :
+ createStringObjectFromLongLong(vll);
+ p = ziplistNext(o->ptr,p);
+ ziplistGet(p,&vstr,&vlen,&vll);
+ robj *value = (vstr != NULL) ?
+ createStringObject((char*)vstr,vlen) :
+ createStringObjectFromLongLong(vll);
+ fn(key, field, value, privdata);
+ p = ziplistNext(o->ptr,p);
+ decrRefCount(field);
+ decrRefCount(value);
+ }
+ cursor->cursor = 1;
+ cursor->done = 1;
+ ret = 0;
+ }
+ errno = 0;
+ return ret;
+}
+
+
/* --------------------------------------------------------------------------
* Module fork API
* -------------------------------------------------------------------------- */
@@ -6076,8 +6518,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
*
* The following sub events are available:
*
- * REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER
- * REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA
+ * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_MASTER
+ * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_REPLICA
*
* The 'data' field can be casted by the callback to a
* RedisModuleReplicationInfo structure with the following fields:
@@ -6087,24 +6529,30 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* int masterport; // master instance port for NOW_REPLICA
* char *replid1; // Main replication ID
* char *replid2; // Secondary replication ID
+ * uint64_t repl1_offset; // Main replication offset
* uint64_t repl2_offset; // Offset of replid2 validity
- * uint64_t main_repl_offset; // Replication offset
*
* RedisModuleEvent_Persistence
*
* This event is called when RDB saving or AOF rewriting starts
* and ends. The following sub events are available:
*
- * REDISMODULE_EVENT_LOADING_RDB_START // BGSAVE start
- * REDISMODULE_EVENT_LOADING_RDB_END // BGSAVE end
- * REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE start
- * REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE end
- * REDISMODULE_EVENT_LOADING_AOF_START // AOF rewrite start
- * REDISMODULE_EVENT_LOADING_AOF_END // AOF rewrite end
+ * REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START
+ * REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START
+ * REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START
+ * REDISMODULE_SUBEVENT_PERSISTENCE_ENDED
+ * REDISMODULE_SUBEVENT_PERSISTENCE_FAILED
*
* The above events are triggered not just when the user calls the
* relevant commands like BGSAVE, but also when a saving operation
* or AOF rewriting occurs because of internal server triggers.
+ * The SYNC_RDB_START sub events are happening in the forground due to
+ * SAVE command, FLUSHALL, or server shutdown, and the other RDB and
+ * AOF sub events are executed in a background fork child, so any
+ * action the module takes can only affect the generated AOF or RDB,
+ * but will not be reflected in the parent process and affect connected
+ * clients and commands. Also note that the AOF_START sub event may end
+ * up saving RDB content in case of an AOF with rdb-preamble.
*
* RedisModuleEvent_FlushDB
*
@@ -6112,8 +6560,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* because of replication, after the replica synchronization)
* happened. The following sub events are available:
*
- * REDISMODULE_EVENT_FLUSHDB_START
- * REDISMODULE_EVENT_FLUSHDB_END
+ * REDISMODULE_SUBEVENT_FLUSHDB_START
+ * REDISMODULE_SUBEVENT_FLUSHDB_END
*
* The data pointer can be casted to a RedisModuleFlushInfo
* structure with the following fields:
@@ -6137,12 +6585,15 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* replica is loading the RDB file from the master.
* The following sub events are available:
*
- * REDISMODULE_EVENT_LOADING_RDB_START
- * REDISMODULE_EVENT_LOADING_RDB_END
- * REDISMODULE_EVENT_LOADING_MASTER_RDB_START
- * REDISMODULE_EVENT_LOADING_MASTER_RDB_END
- * REDISMODULE_EVENT_LOADING_AOF_START
- * REDISMODULE_EVENT_LOADING_AOF_END
+ * REDISMODULE_SUBEVENT_LOADING_RDB_START
+ * REDISMODULE_SUBEVENT_LOADING_AOF_START
+ * REDISMODULE_SUBEVENT_LOADING_REPL_START
+ * REDISMODULE_SUBEVENT_LOADING_ENDED
+ * REDISMODULE_SUBEVENT_LOADING_FAILED
+ *
+ * Note that AOF loading may start with an RDB data in case of
+ * rdb-preamble, in which case you'll only recieve an AOF_START event.
+ *
*
* RedisModuleEvent_ClientChange
*
@@ -6151,8 +6602,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* structure, documented in RedisModule_GetClientInfoById().
* The following sub events are available:
*
- * REDISMODULE_EVENT_CLIENT_CHANGE_CONNECTED
- * REDISMODULE_EVENT_CLIENT_CHANGE_DISCONNECTED
+ * REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED
+ * REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED
*
* RedisModuleEvent_Shutdown
*
@@ -6165,8 +6616,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* replica since it gets disconnected.
* The following sub events are availble:
*
- * REDISMODULE_EVENT_REPLICA_CHANGE_ONLINE
- * REDISMODULE_EVENT_REPLICA_CHANGE_OFFLINE
+ * REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE
+ * REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE
*
* No additional information is available so far: future versions
* of Redis will have an API in order to enumerate the replicas
@@ -6181,6 +6632,11 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* this changes depending on the "hz" configuration.
* No sub events are available.
*
+ * The data pointer can be casted to a RedisModuleCronLoop
+ * structure with the following fields:
+ *
+ * int32_t hz; // Approximate number of events per second.
+ *
* RedisModuleEvent_MasterLinkChange
*
* This is called for replicas in order to notify when the
@@ -6190,8 +6646,38 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* replication is happening correctly.
* The following sub events are available:
*
- * REDISMODULE_EVENT_MASTER_LINK_UP
- * REDISMODULE_EVENT_MASTER_LINK_DOWN
+ * REDISMODULE_SUBEVENT_MASTER_LINK_UP
+ * REDISMODULE_SUBEVENT_MASTER_LINK_DOWN
+ *
+ * RedisModuleEvent_ModuleChange
+ *
+ * This event is called when a new module is loaded or one is unloaded.
+ * The following sub events are availble:
+ *
+ * REDISMODULE_SUBEVENT_MODULE_LOADED
+ * REDISMODULE_SUBEVENT_MODULE_UNLOADED
+ *
+ * The data pointer can be casted to a RedisModuleModuleChange
+ * structure with the following fields:
+ *
+ * const char* module_name; // Name of module loaded or unloaded.
+ * int32_t module_version; // Module version.
+ *
+ * RedisModuleEvent_LoadingProgress
+ *
+ * This event is called repeatedly called while an RDB or AOF file
+ * is being loaded.
+ * The following sub events are availble:
+ *
+ * REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB
+ * REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF
+ *
+ * The data pointer can be casted to a RedisModuleLoadingProgress
+ * structure with the following fields:
+ *
+ * int32_t hz; // Approximate number of events per second.
+ * int32_t progress; // Approximate progress between 0 and 1024,
+ * or -1 if unknown.
*
* The function returns REDISMODULE_OK if the module was successfully subscrived
* for the specified event. If the API is called from a wrong context then
@@ -6250,7 +6736,7 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
listRewind(RedisModule_EventListeners,&li);
while((ln = listNext(&li))) {
RedisModuleEventListener *el = ln->value;
- if (el->event.id == eid && !el->module->in_hook) {
+ if (el->event.id == eid) {
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.module = el->module;
@@ -6264,6 +6750,8 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
void *moduledata = NULL;
RedisModuleClientInfoV1 civ1;
+ RedisModuleReplicationInfoV1 riv1;
+ RedisModuleModuleChangeV1 mcv1;
/* Start at DB zero by default when calling the handler. It's
* up to the specific event setup to change it when it makes
* sense. For instance for FLUSHDB events we select the correct
@@ -6275,11 +6763,26 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
modulePopulateClientInfoStructure(&civ1,data,
el->event.dataver);
moduledata = &civ1;
+ } else if (eid == REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED) {
+ modulePopulateReplicationInfoStructure(&riv1,el->event.dataver);
+ moduledata = &riv1;
} else if (eid == REDISMODULE_EVENT_FLUSHDB) {
moduledata = data;
RedisModuleFlushInfoV1 *fi = data;
if (fi->dbnum != -1)
selectDb(ctx.client, fi->dbnum);
+ } else if (eid == REDISMODULE_EVENT_MODULE_CHANGE) {
+ RedisModule *m = data;
+ if (m == el->module)
+ continue;
+ mcv1.version = REDISMODULE_MODULE_CHANGE_VERSION;
+ mcv1.module_name = m->name;
+ mcv1.module_version = m->ver;
+ moduledata = &mcv1;
+ } else if (eid == REDISMODULE_EVENT_LOADING_PROGRESS) {
+ moduledata = data;
+ } else if (eid == REDISMODULE_EVENT_CRON_LOOP) {
+ moduledata = data;
}
ModulesInHooks++;
@@ -6311,6 +6814,27 @@ void moduleUnsubscribeAllServerEvents(RedisModule *module) {
}
}
+void processModuleLoadingProgressEvent(int is_aof) {
+ long long now = ustime();
+ static long long next_event = 0;
+ if (now >= next_event) {
+ /* Fire the loading progress modules end event. */
+ int progress = -1;
+ if (server.loading_total_bytes)
+ progress = (server.loading_total_bytes<<10) / server.loading_total_bytes;
+ RedisModuleFlushInfoV1 fi = {REDISMODULE_LOADING_PROGRESS_VERSION,
+ server.hz,
+ progress};
+ moduleFireServerEvent(REDISMODULE_EVENT_LOADING_PROGRESS,
+ is_aof?
+ REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF:
+ REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB,
+ &fi);
+ /* decide when the next event should fire. */
+ next_event = now + 1000000 / server.hz;
+ }
+}
+
/* --------------------------------------------------------------------------
* Modules API internals
* -------------------------------------------------------------------------- */
@@ -6479,6 +7003,11 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) {
ctx.module->blocked_clients = 0;
ctx.module->handle = handle;
serverLog(LL_NOTICE,"Module '%s' loaded from %s",ctx.module->name,path);
+ /* Fire the loaded modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE,
+ REDISMODULE_SUBEVENT_MODULE_LOADED,
+ ctx.module);
+
moduleFreeContext(&ctx);
return C_OK;
}
@@ -6541,6 +7070,11 @@ int moduleUnload(sds name) {
module->name, error);
}
+ /* Fire the unloaded modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE,
+ REDISMODULE_SUBEVENT_MODULE_UNLOADED,
+ module);
+
/* Remove from list of modules. */
serverLog(LL_NOTICE,"Module %s unloaded",module->name);
dictDelete(modules,module->name);
@@ -6693,6 +7227,82 @@ size_t moduleCount(void) {
return dictSize(modules);
}
+/* Set the key last access time for LRU based eviction. not relevent if the
+ * servers's maxmemory policy is LFU based. Value is idle time in milliseconds.
+ * returns REDISMODULE_OK if the LRU was updated, REDISMODULE_ERR otherwise. */
+int RM_SetLRU(RedisModuleKey *key, mstime_t lru_idle) {
+ if (!key->value)
+ return REDISMODULE_ERR;
+ if (objectSetLRUOrLFU(key->value, -1, lru_idle, lru_idle>=0 ? LRU_CLOCK() : 0, 1))
+ return REDISMODULE_OK;
+ return REDISMODULE_ERR;
+}
+
+/* Gets the key last access time.
+ * Value is idletime in milliseconds or -1 if the server's eviction policy is
+ * LFU based.
+ * returns REDISMODULE_OK if when key is valid. */
+int RM_GetLRU(RedisModuleKey *key, mstime_t *lru_idle) {
+ *lru_idle = -1;
+ if (!key->value)
+ return REDISMODULE_ERR;
+ if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU)
+ return REDISMODULE_OK;
+ *lru_idle = estimateObjectIdleTime(key->value);
+ return REDISMODULE_OK;
+}
+
+/* Set the key access frequency. only relevant if the server's maxmemory policy
+ * is LFU based.
+ * The frequency is a logarithmic counter that provides an indication of
+ * the access frequencyonly (must be <= 255).
+ * returns REDISMODULE_OK if the LFU was updated, REDISMODULE_ERR otherwise. */
+int RM_SetLFU(RedisModuleKey *key, long long lfu_freq) {
+ if (!key->value)
+ return REDISMODULE_ERR;
+ if (objectSetLRUOrLFU(key->value, lfu_freq, -1, 0, 1))
+ return REDISMODULE_OK;
+ return REDISMODULE_ERR;
+}
+
+/* Gets the key access frequency or -1 if the server's eviction policy is not
+ * LFU based.
+ * returns REDISMODULE_OK if when key is valid. */
+int RM_GetLFU(RedisModuleKey *key, long long *lfu_freq) {
+ *lfu_freq = -1;
+ if (!key->value)
+ return REDISMODULE_ERR;
+ if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU)
+ *lfu_freq = LFUDecrAndReturn(key->value);
+ return REDISMODULE_OK;
+}
+
+/* Replace the value assigned to a module type.
+ *
+ * The key must be open for writing, have an existing value, and have a moduleType
+ * that matches the one specified by the caller.
+ *
+ * Unlike RM_ModuleTypeSetValue() which will free the old value, this function
+ * simply swaps the old value with the new value.
+ *
+ * The function returns the old value, or NULL if any of the above conditions is
+ * not met.
+ */
+void *RM_ModuleTypeReplaceValue(RedisModuleKey *key, moduleType *mt, void *new_value) {
+ if (!(key->mode & REDISMODULE_WRITE) || key->iter)
+ return NULL;
+ if (!key->value || key->value->type != OBJ_MODULE)
+ return NULL;
+
+ moduleValue *mv = key->value->ptr;
+ if (mv->type != mt)
+ return NULL;
+
+ void *old_val = mv->value;
+ mv->value = new_value;
+ return old_val;
+}
+
/* Register all the APIs we export. Keep this function at the end of the
* file so that's easy to seek it to add new entries. */
void moduleRegisterCoreAPI(void) {
@@ -6722,6 +7332,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(ReplyWithNull);
REGISTER_API(ReplyWithCallReply);
REGISTER_API(ReplyWithDouble);
+ REGISTER_API(ReplyWithLongDouble);
REGISTER_API(GetSelectedDb);
REGISTER_API(SelectDb);
REGISTER_API(OpenKey);
@@ -6732,6 +7343,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(ListPop);
REGISTER_API(StringToLongLong);
REGISTER_API(StringToDouble);
+ REGISTER_API(StringToLongDouble);
REGISTER_API(Call);
REGISTER_API(CallReplyProto);
REGISTER_API(FreeCallReply);
@@ -6743,6 +7355,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(CreateStringFromCallReply);
REGISTER_API(CreateString);
REGISTER_API(CreateStringFromLongLong);
+ REGISTER_API(CreateStringFromLongDouble);
REGISTER_API(CreateStringFromString);
REGISTER_API(CreateStringPrintf);
REGISTER_API(FreeString);
@@ -6757,6 +7370,9 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(StringTruncate);
REGISTER_API(SetExpire);
REGISTER_API(GetExpire);
+ REGISTER_API(ResetDataset);
+ REGISTER_API(DbSize);
+ REGISTER_API(RandomKey);
REGISTER_API(ZsetAdd);
REGISTER_API(ZsetIncrby);
REGISTER_API(ZsetScore);
@@ -6779,6 +7395,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(PoolAlloc);
REGISTER_API(CreateDataType);
REGISTER_API(ModuleTypeSetValue);
+ REGISTER_API(ModuleTypeReplaceValue);
REGISTER_API(ModuleTypeGetType);
REGISTER_API(ModuleTypeGetValue);
REGISTER_API(IsIOError);
@@ -6796,6 +7413,10 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(LoadDouble);
REGISTER_API(SaveFloat);
REGISTER_API(LoadFloat);
+ REGISTER_API(SaveLongDouble);
+ REGISTER_API(LoadLongDouble);
+ REGISTER_API(SaveDataTypeToString);
+ REGISTER_API(LoadDataTypeFromString);
REGISTER_API(EmitAOF);
REGISTER_API(Log);
REGISTER_API(LogIOError);
@@ -6891,8 +7512,20 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(ServerInfoGetFieldUnsigned);
REGISTER_API(ServerInfoGetFieldDouble);
REGISTER_API(GetClientInfoById);
+ REGISTER_API(PublishMessage);
REGISTER_API(SubscribeToServerEvent);
+ REGISTER_API(SetLRU);
+ REGISTER_API(GetLRU);
+ REGISTER_API(SetLFU);
+ REGISTER_API(GetLFU);
REGISTER_API(BlockClientOnKeys);
REGISTER_API(SignalKeyAsReady);
REGISTER_API(GetBlockedClientReadyKey);
+ REGISTER_API(GetUsedMemoryRatio);
+ REGISTER_API(MallocSize);
+ REGISTER_API(ScanCursorCreate);
+ REGISTER_API(ScanCursorDestroy);
+ REGISTER_API(ScanCursorRestart);
+ REGISTER_API(Scan);
+ REGISTER_API(ScanKey);
}
diff --git a/src/networking.c b/src/networking.c
index e7cc561fa..901ce0a7b 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -530,7 +530,7 @@ void addReplyHumanLongDouble(client *c, long double d) {
decrRefCount(o);
} else {
char buf[MAX_LONG_DOUBLE_CHARS];
- int len = ld2string(buf,sizeof(buf),d,1);
+ int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN);
addReplyProto(c,",",1);
addReplyProto(c,buf,len);
addReplyProto(c,"\r\n",2);
@@ -1118,6 +1118,11 @@ void freeClient(client *c) {
if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0)
server.repl_no_slaves_since = server.unixtime;
refreshGoodSlavesCount();
+ /* Fire the replica change modules event. */
+ if (c->replstate == SLAVE_STATE_ONLINE)
+ moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
+ REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE,
+ NULL);
}
/* Master/slave cleanup Case 2:
diff --git a/src/object.c b/src/object.c
index 7ce79bbbb..2201a317a 100644
--- a/src/object.c
+++ b/src/object.c
@@ -178,7 +178,7 @@ robj *createStringObjectFromLongLongForValue(long long value) {
* The 'humanfriendly' option is used for INCRBYFLOAT and HINCRBYFLOAT. */
robj *createStringObjectFromLongDouble(long double value, int humanfriendly) {
char buf[MAX_LONG_DOUBLE_CHARS];
- int len = ld2string(buf,sizeof(buf),value,humanfriendly);
+ int len = ld2string(buf,sizeof(buf),value,humanfriendly? LD_STR_HUMAN: LD_STR_AUTO);
return createStringObject(buf,len);
}
@@ -1201,19 +1201,20 @@ sds getMemoryDoctorReport(void) {
* The lru_idle and lru_clock args are only relevant if policy
* is MAXMEMORY_FLAG_LRU.
* Either or both of them may be <0, in that case, nothing is set. */
-void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
- long long lru_clock) {
+int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
+ long long lru_clock, int lru_multiplier) {
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
if (lfu_freq >= 0) {
serverAssert(lfu_freq <= 255);
val->lru = (LFUGetTimeInMinutes()<<8) | lfu_freq;
+ return 1;
}
} else if (lru_idle >= 0) {
/* Provided LRU idle time is in seconds. Scale
* according to the LRU clock resolution this Redis
* instance was compiled with (normally 1000 ms, so the
* below statement will expand to lru_idle*1000/1000. */
- lru_idle = lru_idle*1000/LRU_CLOCK_RESOLUTION;
+ lru_idle = lru_idle*lru_multiplier/LRU_CLOCK_RESOLUTION;
long lru_abs = lru_clock - lru_idle; /* Absolute access time. */
/* If the LRU field underflows (since LRU it is a wrapping
* clock), the best we can do is to provide a large enough LRU
@@ -1223,7 +1224,9 @@ void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
if (lru_abs < 0)
lru_abs = (lru_clock+(LRU_CLOCK_MAX/2)) % LRU_CLOCK_MAX;
val->lru = lru_abs;
+ return 1;
}
+ return 0;
}
/* ======================= The OBJECT and MEMORY commands =================== */
diff --git a/src/rax.c b/src/rax.c
index be474b058..29b74ae90 100644
--- a/src/rax.c
+++ b/src/rax.c
@@ -1673,6 +1673,7 @@ int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len) {
* node, but will be our match, representing the key "f".
*
* So in that case, we don't seek backward. */
+ it->data = raxGetData(it->node);
} else {
if (gt && !raxIteratorNextStep(it,0)) return 0;
if (lt && !raxIteratorPrevStep(it,0)) return 0;
@@ -1791,7 +1792,7 @@ int raxCompare(raxIterator *iter, const char *op, unsigned char *key, size_t key
if (eq && key_len == iter->key_len) return 1;
else if (lt) return iter->key_len < key_len;
else if (gt) return iter->key_len > key_len;
- return 0;
+ else return 0; /* Avoid warning, just 'eq' is handled before. */
} else if (cmp > 0) {
return gt ? 1 : 0;
} else /* (cmp < 0) */ {
diff --git a/src/rdb.c b/src/rdb.c
index f530219a4..27e1b3135 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -1080,9 +1080,9 @@ ssize_t rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
}
/* Save a few default AUX fields with information about the RDB generated. */
-int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
+int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
- int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0;
+ int aof_preamble = (rdbflags & RDBFLAGS_AOF_PREAMBLE) != 0;
/* Add a few fields about the state when the RDB was created. */
if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
@@ -1150,7 +1150,7 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) {
* When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
-int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
+int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
@@ -1162,7 +1162,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
- if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
+ if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) {
@@ -1199,7 +1199,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
* order to have a smaller final write. */
- if (flags & RDB_SAVE_AOF_PREAMBLE &&
+ if (rdbflags & RDBFLAGS_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb->processed_bytes;
@@ -1254,18 +1254,21 @@ werr:
int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) {
char eofmark[RDB_EOF_MARK_SIZE];
+ startSaving(RDBFLAGS_REPLICATION);
getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE);
if (error) *error = 0;
if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
- if (rdbSaveRio(rdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr;
+ if (rdbSaveRio(rdb,error,RDBFLAGS_NONE,rsi) == C_ERR) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
+ stopSaving(1);
return C_OK;
werr: /* Write error. */
/* Set 'error' only if not already set by rdbSaveRio() call. */
if (error && *error == 0) *error = errno;
+ stopSaving(0);
return C_ERR;
}
@@ -1291,11 +1294,12 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
}
rioInitWithFile(&rdb,fp);
+ startSaving(RDBFLAGS_NONE);
if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
- if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
+ if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}
@@ -1317,6 +1321,7 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
cwdp ? cwdp : "unknown",
strerror(errno));
unlink(tmpfile);
+ stopSaving(0);
return C_ERR;
}
@@ -1324,12 +1329,14 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
server.dirty = 0;
server.lastsave = time(NULL);
server.lastbgsave_status = C_OK;
+ stopSaving(1);
return C_OK;
werr:
serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
+ stopSaving(0);
return C_ERR;
}
@@ -1918,23 +1925,33 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
/* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats. */
-void startLoading(size_t size) {
+void startLoading(size_t size, int rdbflags) {
/* Load the DB */
server.loading = 1;
server.loading_start_time = time(NULL);
server.loading_loaded_bytes = 0;
server.loading_total_bytes = size;
+
+ /* Fire the loading modules start event. */
+ int subevent;
+ if (rdbflags & RDBFLAGS_AOF_PREAMBLE)
+ subevent = REDISMODULE_SUBEVENT_LOADING_AOF_START;
+ else if(rdbflags & RDBFLAGS_REPLICATION)
+ subevent = REDISMODULE_SUBEVENT_LOADING_REPL_START;
+ else
+ subevent = REDISMODULE_SUBEVENT_LOADING_RDB_START;
+ moduleFireServerEvent(REDISMODULE_EVENT_LOADING,subevent,NULL);
}
/* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats.
* 'filename' is optional and used for rdb-check on error */
-void startLoadingFile(FILE *fp, char* filename) {
+void startLoadingFile(FILE *fp, char* filename, int rdbflags) {
struct stat sb;
if (fstat(fileno(fp), &sb) == -1)
sb.st_size = 0;
rdbFileBeingLoaded = filename;
- startLoading(sb.st_size);
+ startLoading(sb.st_size, rdbflags);
}
/* Refresh the loading progress info */
@@ -1945,9 +1962,37 @@ void loadingProgress(off_t pos) {
}
/* Loading finished */
-void stopLoading(void) {
+void stopLoading(int success) {
server.loading = 0;
rdbFileBeingLoaded = NULL;
+
+ /* Fire the loading modules end event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_LOADING,
+ success?
+ REDISMODULE_SUBEVENT_LOADING_ENDED:
+ REDISMODULE_SUBEVENT_LOADING_FAILED,
+ NULL);
+}
+
+void startSaving(int rdbflags) {
+ /* Fire the persistence modules end event. */
+ int subevent;
+ if (rdbflags & RDBFLAGS_AOF_PREAMBLE)
+ subevent = REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START;
+ else if (getpid()!=server.pid)
+ subevent = REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START;
+ else
+ subevent = REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START;
+ moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,subevent,NULL);
+}
+
+void stopSaving(int success) {
+ /* Fire the persistence modules end event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,
+ success?
+ REDISMODULE_SUBEVENT_PERSISTENCE_ENDED:
+ REDISMODULE_SUBEVENT_PERSISTENCE_FAILED,
+ NULL);
}
/* Track loading progress in order to serve client's from time to time
@@ -1961,17 +2006,18 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
/* The DB can take some non trivial amount of time to load. Update
* our cached time since it is used to create and update the last
* interaction time with clients and for other important things. */
- updateCachedTime();
+ updateCachedTime(0);
if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER)
replicationSendNewlineToMaster();
loadingProgress(r->processed_bytes);
processEventsWhileBlocked();
+ processModuleLoadingProgressEvent(0);
}
}
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
-int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
+int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
uint64_t dbid;
int type, rdbver;
redisDb *db = server.db+0;
@@ -2182,7 +2228,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
* received from the master. In the latter case, the master is
* responsible for key expiry. If we would expire keys here, the
* snapshot taken by the master may not be reflected on the slave. */
- if (server.masterhost == NULL && !loading_aof && expiretime != -1 && expiretime < now) {
+ if (server.masterhost == NULL && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now) {
decrRefCount(key);
decrRefCount(val);
} else {
@@ -2193,7 +2239,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
if (expiretime != -1) setExpire(NULL,db,key,expiretime);
/* Set usage information (for eviction). */
- objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock);
+ objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000);
/* Decrement the key refcount since dbAdd() will take its
* own reference. */
@@ -2243,17 +2289,17 @@ eoferr:
*
* If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the
* loading code will fiil the information fields in the structure. */
-int rdbLoad(char *filename, rdbSaveInfo *rsi) {
+int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
FILE *fp;
rio rdb;
int retval;
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
- startLoadingFile(fp, filename);
+ startLoadingFile(fp, filename,rdbflags);
rioInitWithFile(&rdb,fp);
- retval = rdbLoadRio(&rdb,rsi,0);
+ retval = rdbLoadRio(&rdb,rdbflags,rsi);
fclose(fp);
- stopLoading();
+ stopLoading(retval==C_OK);
return retval;
}
diff --git a/src/rdb.h b/src/rdb.h
index 40a50f7ba..4229beea8 100644
--- a/src/rdb.h
+++ b/src/rdb.h
@@ -121,8 +121,10 @@
#define RDB_LOAD_PLAIN (1<<1)
#define RDB_LOAD_SDS (1<<2)
-#define RDB_SAVE_NONE 0
-#define RDB_SAVE_AOF_PREAMBLE (1<<0)
+/* flags on the purpose of rdb save or load */
+#define RDBFLAGS_NONE 0
+#define RDBFLAGS_AOF_PREAMBLE (1<<0)
+#define RDBFLAGS_REPLICATION (1<<1)
int rdbSaveType(rio *rdb, unsigned char type);
int rdbLoadType(rio *rdb);
@@ -135,7 +137,7 @@ uint64_t rdbLoadLen(rio *rdb, int *isencoded);
int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr);
int rdbSaveObjectType(rio *rdb, robj *o);
int rdbLoadObjectType(rio *rdb);
-int rdbLoad(char *filename, rdbSaveInfo *rsi);
+int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags);
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi);
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid);
@@ -154,7 +156,8 @@ int rdbSaveBinaryDoubleValue(rio *rdb, double val);
int rdbLoadBinaryDoubleValue(rio *rdb, double *val);
int rdbSaveBinaryFloatValue(rio *rdb, float val);
int rdbLoadBinaryFloatValue(rio *rdb, float *val);
-int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof);
+int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi);
+int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
#endif
diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c
index 5e7415046..1210d49b4 100644
--- a/src/redis-check-rdb.c
+++ b/src/redis-check-rdb.c
@@ -202,7 +202,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
}
expiretime = -1;
- startLoadingFile(fp, rdbfilename);
+ startLoadingFile(fp, rdbfilename, RDBFLAGS_NONE);
while(1) {
robj *key, *val;
@@ -316,7 +316,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
}
if (closefile) fclose(fp);
- stopLoading();
+ stopLoading(1);
return 0;
eoferr: /* unexpected end of file is handled here with a fatal exit */
@@ -327,7 +327,7 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
}
err:
if (closefile) fclose(fp);
- stopLoading();
+ stopLoading(0);
return 1;
}
diff --git a/src/redismodule.h b/src/redismodule.h
index fd6dc77ce..6214af7e6 100644
--- a/src/redismodule.h
+++ b/src/redismodule.h
@@ -18,6 +18,10 @@
#define REDISMODULE_READ (1<<0)
#define REDISMODULE_WRITE (1<<1)
+/* RedisModule_OpenKey extra flags for the 'mode' argument.
+ * Avoid touching the LRU/LFU of the key when opened. */
+#define REDISMODULE_OPEN_KEY_NOTOUCH (1<<16)
+
#define REDISMODULE_LIST_HEAD 0
#define REDISMODULE_LIST_TAIL 1
@@ -29,6 +33,7 @@
#define REDISMODULE_KEYTYPE_SET 4
#define REDISMODULE_KEYTYPE_ZSET 5
#define REDISMODULE_KEYTYPE_MODULE 6
+#define REDISMODULE_KEYTYPE_STREAM 7
/* Reply types. */
#define REDISMODULE_REPLY_UNKNOWN -1
@@ -181,6 +186,8 @@ typedef uint64_t RedisModuleTimerID;
#define REDISMODULE_EVENT_REPLICA_CHANGE 6
#define REDISMODULE_EVENT_MASTER_LINK_CHANGE 7
#define REDISMODULE_EVENT_CRON_LOOP 8
+#define REDISMODULE_EVENT_MODULE_CHANGE 9
+#define REDISMODULE_EVENT_LOADING_PROGRESS 10
typedef struct RedisModuleEvent {
uint64_t id; /* REDISMODULE_EVENT_... defines. */
@@ -226,18 +233,28 @@ static const RedisModuleEvent
RedisModuleEvent_MasterLinkChange = {
REDISMODULE_EVENT_MASTER_LINK_CHANGE,
1
+ },
+ RedisModuleEvent_ModuleChange = {
+ REDISMODULE_EVENT_MODULE_CHANGE,
+ 1
+ },
+ RedisModuleEvent_LoadingProgress = {
+ REDISMODULE_EVENT_LOADING_PROGRESS,
+ 1
};
/* Those are values that are used for the 'subevent' callback argument. */
#define REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START 0
-#define REDISMODULE_SUBEVENT_PERSISTENCE_RDB_END 1
-#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START 2
-#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_END 3
+#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START 1
+#define REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START 2
+#define REDISMODULE_SUBEVENT_PERSISTENCE_ENDED 3
+#define REDISMODULE_SUBEVENT_PERSISTENCE_FAILED 4
#define REDISMODULE_SUBEVENT_LOADING_RDB_START 0
-#define REDISMODULE_SUBEVENT_LOADING_RDB_END 1
-#define REDISMODULE_SUBEVENT_LOADING_AOF_START 2
-#define REDISMODULE_SUBEVENT_LOADING_AOF_END 3
+#define REDISMODULE_SUBEVENT_LOADING_AOF_START 1
+#define REDISMODULE_SUBEVENT_LOADING_REPL_START 2
+#define REDISMODULE_SUBEVENT_LOADING_ENDED 3
+#define REDISMODULE_SUBEVENT_LOADING_FAILED 4
#define REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED 0
#define REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED 1
@@ -245,12 +262,21 @@ static const RedisModuleEvent
#define REDISMODULE_SUBEVENT_MASTER_LINK_UP 0
#define REDISMODULE_SUBEVENT_MASTER_LINK_DOWN 1
-#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_CONNECTED 0
-#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_DISCONNECTED 1
+#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE 0
+#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE 1
+
+#define REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER 0
+#define REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA 1
#define REDISMODULE_SUBEVENT_FLUSHDB_START 0
#define REDISMODULE_SUBEVENT_FLUSHDB_END 1
+#define REDISMODULE_SUBEVENT_MODULE_LOADED 0
+#define REDISMODULE_SUBEVENT_MODULE_UNLOADED 1
+
+#define REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB 0
+#define REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF 1
+
/* RedisModuleClientInfo flags. */
#define REDISMODULE_CLIENTINFO_FLAG_SSL (1<<0)
#define REDISMODULE_CLIENTINFO_FLAG_PUBSUB (1<<1)
@@ -286,6 +312,22 @@ typedef struct RedisModuleClientInfo {
#define RedisModuleClientInfo RedisModuleClientInfoV1
+#define REDISMODULE_REPLICATIONINFO_VERSION 1
+typedef struct RedisModuleReplicationInfo {
+ uint64_t version; /* Not used since this structure is never passed
+ from the module to the core right now. Here
+ for future compatibility. */
+ int master; /* true if master, false if replica */
+ char *masterhost; /* master instance hostname for NOW_REPLICA */
+ int masterport; /* master instance port for NOW_REPLICA */
+ char *replid1; /* Main replication ID */
+ char *replid2; /* Secondary replication ID */
+ uint64_t repl1_offset; /* Main replication offset */
+ uint64_t repl2_offset; /* Offset of replid2 validity */
+} RedisModuleReplicationInfoV1;
+
+#define RedisModuleReplicationInfo RedisModuleReplicationInfoV1
+
#define REDISMODULE_FLUSHINFO_VERSION 1
typedef struct RedisModuleFlushInfo {
uint64_t version; /* Not used since this structure is never passed
@@ -297,6 +339,39 @@ typedef struct RedisModuleFlushInfo {
#define RedisModuleFlushInfo RedisModuleFlushInfoV1
+#define REDISMODULE_MODULE_CHANGE_VERSION 1
+typedef struct RedisModuleModuleChange {
+ uint64_t version; /* Not used since this structure is never passed
+ from the module to the core right now. Here
+ for future compatibility. */
+ const char* module_name;/* Name of module loaded or unloaded. */
+ int32_t module_version; /* Module version. */
+} RedisModuleModuleChangeV1;
+
+#define RedisModuleModuleChange RedisModuleModuleChangeV1
+
+#define REDISMODULE_CRON_LOOP_VERSION 1
+typedef struct RedisModuleCronLoopInfo {
+ uint64_t version; /* Not used since this structure is never passed
+ from the module to the core right now. Here
+ for future compatibility. */
+ int32_t hz; /* Approximate number of events per second. */
+} RedisModuleCronLoopV1;
+
+#define RedisModuleCronLoop RedisModuleCronLoopV1
+
+#define REDISMODULE_LOADING_PROGRESS_VERSION 1
+typedef struct RedisModuleLoadingProgressInfo {
+ uint64_t version; /* Not used since this structure is never passed
+ from the module to the core right now. Here
+ for future compatibility. */
+ int32_t hz; /* Approximate number of events per second. */
+ int32_t progress; /* Approximate progress between 0 and 1024, or -1
+ * if unknown. */
+} RedisModuleLoadingProgressV1;
+
+#define RedisModuleLoadingProgress RedisModuleLoadingProgressV1
+
/* ------------------------- End of common defines ------------------------ */
#ifndef REDISMODULE_CORE
@@ -319,6 +394,7 @@ typedef struct RedisModuleCommandFilterCtx RedisModuleCommandFilterCtx;
typedef struct RedisModuleCommandFilter RedisModuleCommandFilter;
typedef struct RedisModuleInfoCtx RedisModuleInfoCtx;
typedef struct RedisModuleServerInfoData RedisModuleServerInfoData;
+typedef struct RedisModuleScanCursor RedisModuleScanCursor;
typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc);
@@ -336,6 +412,8 @@ typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter);
typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data);
typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_report);
+typedef void (*RedisModuleScanCB)(RedisModuleCtx *ctx, RedisModuleString *keyname, RedisModuleKey *key, void *privdata);
+typedef void (*RedisModuleScanKeyCB)(RedisModuleKey *key, RedisModuleString *field, RedisModuleString *value, void *privdata);
#define REDISMODULE_TYPE_METHOD_VERSION 2
typedef struct RedisModuleTypeMethods {
@@ -385,6 +463,7 @@ size_t REDISMODULE_API_FUNC(RedisModule_CallReplyLength)(RedisModuleCallReply *r
RedisModuleCallReply *REDISMODULE_API_FUNC(RedisModule_CallReplyArrayElement)(RedisModuleCallReply *reply, size_t idx);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateString)(RedisModuleCtx *ctx, const char *ptr, size_t len);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromLongLong)(RedisModuleCtx *ctx, long long ll);
+RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromLongDouble)(RedisModuleCtx *ctx, long double ld, int humanfriendly);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromString)(RedisModuleCtx *ctx, const RedisModuleString *str);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringPrintf)(RedisModuleCtx *ctx, const char *fmt, ...);
void REDISMODULE_API_FUNC(RedisModule_FreeString)(RedisModuleCtx *ctx, RedisModuleString *str);
@@ -402,9 +481,11 @@ int REDISMODULE_API_FUNC(RedisModule_ReplyWithEmptyString)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithVerbatimString)(RedisModuleCtx *ctx, const char *buf, size_t len);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithNull)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithDouble)(RedisModuleCtx *ctx, double d);
+int REDISMODULE_API_FUNC(RedisModule_ReplyWithLongDouble)(RedisModuleCtx *ctx, long double d);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithCallReply)(RedisModuleCtx *ctx, RedisModuleCallReply *reply);
int REDISMODULE_API_FUNC(RedisModule_StringToLongLong)(const RedisModuleString *str, long long *ll);
int REDISMODULE_API_FUNC(RedisModule_StringToDouble)(const RedisModuleString *str, double *d);
+int REDISMODULE_API_FUNC(RedisModule_StringToLongDouble)(const RedisModuleString *str, long double *d);
void REDISMODULE_API_FUNC(RedisModule_AutoMemory)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_Replicate)(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...);
int REDISMODULE_API_FUNC(RedisModule_ReplicateVerbatim)(RedisModuleCtx *ctx);
@@ -417,6 +498,9 @@ char *REDISMODULE_API_FUNC(RedisModule_StringDMA)(RedisModuleKey *key, size_t *l
int REDISMODULE_API_FUNC(RedisModule_StringTruncate)(RedisModuleKey *key, size_t newlen);
mstime_t REDISMODULE_API_FUNC(RedisModule_GetExpire)(RedisModuleKey *key);
int REDISMODULE_API_FUNC(RedisModule_SetExpire)(RedisModuleKey *key, mstime_t expire);
+void REDISMODULE_API_FUNC(RedisModule_ResetDataset)(int restart_aof, int async);
+unsigned long long REDISMODULE_API_FUNC(RedisModule_DbSize)(RedisModuleCtx *ctx);
+RedisModuleString *REDISMODULE_API_FUNC(RedisModule_RandomKey)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_ZsetAdd)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr);
int REDISMODULE_API_FUNC(RedisModule_ZsetIncrby)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr, double *newscore);
int REDISMODULE_API_FUNC(RedisModule_ZsetScore)(RedisModuleKey *key, RedisModuleString *ele, double *score);
@@ -436,10 +520,12 @@ int REDISMODULE_API_FUNC(RedisModule_IsKeysPositionRequest)(RedisModuleCtx *ctx)
void REDISMODULE_API_FUNC(RedisModule_KeyAtPos)(RedisModuleCtx *ctx, int pos);
unsigned long long REDISMODULE_API_FUNC(RedisModule_GetClientId)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_GetClientInfoById)(void *ci, uint64_t id);
+int REDISMODULE_API_FUNC(RedisModule_PublishMessage)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message);
int REDISMODULE_API_FUNC(RedisModule_GetContextFlags)(RedisModuleCtx *ctx);
void *REDISMODULE_API_FUNC(RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes);
RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods);
int REDISMODULE_API_FUNC(RedisModule_ModuleTypeSetValue)(RedisModuleKey *key, RedisModuleType *mt, void *value);
+void *REDISMODULE_API_FUNC(RedisModule_ModuleTypeReplaceValue)(RedisModuleKey *key, RedisModuleType *mt, void *new_value);
RedisModuleType *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetType)(RedisModuleKey *key);
void *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetValue)(RedisModuleKey *key);
int REDISMODULE_API_FUNC(RedisModule_IsIOError)(RedisModuleIO *io);
@@ -458,6 +544,10 @@ void REDISMODULE_API_FUNC(RedisModule_SaveDouble)(RedisModuleIO *io, double valu
double REDISMODULE_API_FUNC(RedisModule_LoadDouble)(RedisModuleIO *io);
void REDISMODULE_API_FUNC(RedisModule_SaveFloat)(RedisModuleIO *io, float value);
float REDISMODULE_API_FUNC(RedisModule_LoadFloat)(RedisModuleIO *io);
+void REDISMODULE_API_FUNC(RedisModule_SaveLongDouble)(RedisModuleIO *io, long double value);
+long double REDISMODULE_API_FUNC(RedisModule_LoadLongDouble)(RedisModuleIO *io);
+void *REDISMODULE_API_FUNC(RedisModule_LoadDataTypeFromString)(const RedisModuleString *str, const RedisModuleType *mt);
+RedisModuleString *REDISMODULE_API_FUNC(RedisModule_SaveDataTypeToString)(RedisModuleCtx *ctx, void *data, const RedisModuleType *mt);
void REDISMODULE_API_FUNC(RedisModule_Log)(RedisModuleCtx *ctx, const char *level, const char *fmt, ...);
void REDISMODULE_API_FUNC(RedisModule_LogIOError)(RedisModuleIO *io, const char *levelstr, const char *fmt, ...);
void REDISMODULE_API_FUNC(RedisModule__Assert)(const char *estr, const char *file, int line);
@@ -511,9 +601,18 @@ long long REDISMODULE_API_FUNC(RedisModule_ServerInfoGetFieldSigned)(RedisModule
unsigned long long REDISMODULE_API_FUNC(RedisModule_ServerInfoGetFieldUnsigned)(RedisModuleServerInfoData *data, const char* field, int *out_err);
double REDISMODULE_API_FUNC(RedisModule_ServerInfoGetFieldDouble)(RedisModuleServerInfoData *data, const char* field, int *out_err);
int REDISMODULE_API_FUNC(RedisModule_SubscribeToServerEvent)(RedisModuleCtx *ctx, RedisModuleEvent event, RedisModuleEventCallback callback);
+int REDISMODULE_API_FUNC(RedisModule_SetLRU)(RedisModuleKey *key, mstime_t lru_idle);
+int REDISMODULE_API_FUNC(RedisModule_GetLRU)(RedisModuleKey *key, mstime_t *lru_idle);
+int REDISMODULE_API_FUNC(RedisModule_SetLFU)(RedisModuleKey *key, long long lfu_freq);
+int REDISMODULE_API_FUNC(RedisModule_GetLFU)(RedisModuleKey *key, long long *lfu_freq);
RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata);
void REDISMODULE_API_FUNC(RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientReadyKey)(RedisModuleCtx *ctx);
+RedisModuleScanCursor *REDISMODULE_API_FUNC(RedisModule_ScanCursorCreate)();
+void REDISMODULE_API_FUNC(RedisModule_ScanCursorRestart)(RedisModuleScanCursor *cursor);
+void REDISMODULE_API_FUNC(RedisModule_ScanCursorDestroy)(RedisModuleScanCursor *cursor);
+int REDISMODULE_API_FUNC(RedisModule_Scan)(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanCB fn, void *privdata);
+int REDISMODULE_API_FUNC(RedisModule_ScanKey)(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleScanKeyCB fn, void *privdata);
/* Experimental APIs */
#ifdef REDISMODULE_EXPERIMENTAL_API
@@ -559,6 +658,8 @@ int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgDelete)(RedisModuleCommandF
int REDISMODULE_API_FUNC(RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *user_data);
int REDISMODULE_API_FUNC(RedisModule_ExitFromChild)(int retcode);
int REDISMODULE_API_FUNC(RedisModule_KillForkChild)(int child_pid);
+float REDISMODULE_API_FUNC(RedisModule_GetUsedMemoryRatio)();
+size_t REDISMODULE_API_FUNC(RedisModule_MallocSize)(void* ptr);
#endif
#define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX)
@@ -592,6 +693,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(ReplyWithNull);
REDISMODULE_GET_API(ReplyWithCallReply);
REDISMODULE_GET_API(ReplyWithDouble);
+ REDISMODULE_GET_API(ReplyWithLongDouble);
REDISMODULE_GET_API(GetSelectedDb);
REDISMODULE_GET_API(SelectDb);
REDISMODULE_GET_API(OpenKey);
@@ -602,6 +704,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(ListPop);
REDISMODULE_GET_API(StringToLongLong);
REDISMODULE_GET_API(StringToDouble);
+ REDISMODULE_GET_API(StringToLongDouble);
REDISMODULE_GET_API(Call);
REDISMODULE_GET_API(CallReplyProto);
REDISMODULE_GET_API(FreeCallReply);
@@ -613,6 +716,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(CreateStringFromCallReply);
REDISMODULE_GET_API(CreateString);
REDISMODULE_GET_API(CreateStringFromLongLong);
+ REDISMODULE_GET_API(CreateStringFromLongDouble);
REDISMODULE_GET_API(CreateStringFromString);
REDISMODULE_GET_API(CreateStringPrintf);
REDISMODULE_GET_API(FreeString);
@@ -627,6 +731,9 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(StringTruncate);
REDISMODULE_GET_API(GetExpire);
REDISMODULE_GET_API(SetExpire);
+ REDISMODULE_GET_API(ResetDataset);
+ REDISMODULE_GET_API(DbSize);
+ REDISMODULE_GET_API(RandomKey);
REDISMODULE_GET_API(ZsetAdd);
REDISMODULE_GET_API(ZsetIncrby);
REDISMODULE_GET_API(ZsetScore);
@@ -649,6 +756,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(PoolAlloc);
REDISMODULE_GET_API(CreateDataType);
REDISMODULE_GET_API(ModuleTypeSetValue);
+ REDISMODULE_GET_API(ModuleTypeReplaceValue);
REDISMODULE_GET_API(ModuleTypeGetType);
REDISMODULE_GET_API(ModuleTypeGetValue);
REDISMODULE_GET_API(IsIOError);
@@ -666,6 +774,10 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(LoadDouble);
REDISMODULE_GET_API(SaveFloat);
REDISMODULE_GET_API(LoadFloat);
+ REDISMODULE_GET_API(SaveLongDouble);
+ REDISMODULE_GET_API(LoadLongDouble);
+ REDISMODULE_GET_API(SaveDataTypeToString);
+ REDISMODULE_GET_API(LoadDataTypeFromString);
REDISMODULE_GET_API(EmitAOF);
REDISMODULE_GET_API(Log);
REDISMODULE_GET_API(LogIOError);
@@ -720,10 +832,20 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(ServerInfoGetFieldUnsigned);
REDISMODULE_GET_API(ServerInfoGetFieldDouble);
REDISMODULE_GET_API(GetClientInfoById);
+ REDISMODULE_GET_API(PublishMessage);
REDISMODULE_GET_API(SubscribeToServerEvent);
+ REDISMODULE_GET_API(SetLRU);
+ REDISMODULE_GET_API(GetLRU);
+ REDISMODULE_GET_API(SetLFU);
+ REDISMODULE_GET_API(GetLFU);
REDISMODULE_GET_API(BlockClientOnKeys);
REDISMODULE_GET_API(SignalKeyAsReady);
REDISMODULE_GET_API(GetBlockedClientReadyKey);
+ REDISMODULE_GET_API(ScanCursorCreate);
+ REDISMODULE_GET_API(ScanCursorRestart);
+ REDISMODULE_GET_API(ScanCursorDestroy);
+ REDISMODULE_GET_API(Scan);
+ REDISMODULE_GET_API(ScanKey);
#ifdef REDISMODULE_EXPERIMENTAL_API
REDISMODULE_GET_API(GetThreadSafeContext);
@@ -767,6 +889,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(Fork);
REDISMODULE_GET_API(ExitFromChild);
REDISMODULE_GET_API(KillForkChild);
+ REDISMODULE_GET_API(GetUsedMemoryRatio);
+ REDISMODULE_GET_API(MallocSize);
#endif
if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR;
diff --git a/src/replication.c b/src/replication.c
index 4550e6a83..c9a2e0fe1 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -533,6 +533,12 @@ int masterTryPartialResynchronization(client *c) {
* has this state from the previous connection with the master. */
refreshGoodSlavesCount();
+
+ /* Fire the replica change modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
+ REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
+ NULL);
+
return C_OK; /* The caller can return, no full resync needed. */
need_full_resync:
@@ -868,6 +874,10 @@ void putSlaveOnline(client *slave) {
return;
}
refreshGoodSlavesCount();
+ /* Fire the replica change modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
+ REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
+ NULL);
serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
replicationGetSlaveName(slave));
}
@@ -1542,11 +1552,11 @@ void readSyncBulkPayload(connection *conn) {
* We'll restore it when the RDB is received. */
connBlock(conn);
connRecvTimeout(conn, server.repl_timeout*1000);
- startLoading(server.repl_transfer_size);
+ startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION);
- if (rdbLoadRio(&rdb,&rsi,0) != C_OK) {
+ if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi) != C_OK) {
/* RDB loading failed. */
- stopLoading();
+ stopLoading(0);
serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization DB "
"from socket");
@@ -1567,7 +1577,7 @@ void readSyncBulkPayload(connection *conn) {
* gets promoted. */
return;
}
- stopLoading();
+ stopLoading(1);
/* RDB loading succeeded if we reach this point. */
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
@@ -1614,7 +1624,7 @@ void readSyncBulkPayload(connection *conn) {
cancelReplicationHandshake();
return;
}
- if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
+ if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) {
serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization "
"DB from disk");
@@ -1636,6 +1646,11 @@ void readSyncBulkPayload(connection *conn) {
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;
+ /* Fire the master link modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
+ REDISMODULE_SUBEVENT_MASTER_LINK_UP,
+ NULL);
+
/* After a full resynchroniziation we use the replication ID and
* offset of the master. The secondary ID / offset are cleared since
* we are starting a new history. */
@@ -2314,12 +2329,31 @@ void replicationSetMaster(char *ip, int port) {
replicationDiscardCachedMaster();
replicationCacheMasterUsingMyself();
}
+
+ /* Fire the role change modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
+ REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA,
+ NULL);
+
+ /* Fire the master link modules event. */
+ if (server.repl_state == REPL_STATE_CONNECTED)
+ moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
+ REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
+ NULL);
+
server.repl_state = REPL_STATE_CONNECT;
}
/* Cancel replication, setting the instance as a master itself. */
void replicationUnsetMaster(void) {
if (server.masterhost == NULL) return; /* Nothing to do. */
+
+ /* Fire the master link modules event. */
+ if (server.repl_state == REPL_STATE_CONNECTED)
+ moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
+ REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
+ NULL);
+
sdsfree(server.masterhost);
server.masterhost = NULL;
/* When a slave is turned into a master, the current replication ID
@@ -2348,11 +2382,22 @@ void replicationUnsetMaster(void) {
* starting from now. Otherwise the backlog will be freed after a
* failover if slaves do not connect immediately. */
server.repl_no_slaves_since = server.unixtime;
+
+ /* Fire the role change modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
+ REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER,
+ NULL);
}
/* This function is called when the slave lose the connection with the
* master into an unexpected way. */
void replicationHandleMasterDisconnection(void) {
+ /* Fire the master link modules event. */
+ if (server.repl_state == REPL_STATE_CONNECTED)
+ moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
+ REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
+ NULL);
+
server.master = NULL;
server.repl_state = REPL_STATE_CONNECT;
server.repl_down_since = server.unixtime;
diff --git a/src/sentinel.c b/src/sentinel.c
index 0490db4e9..42c4d2467 100644
--- a/src/sentinel.c
+++ b/src/sentinel.c
@@ -3993,11 +3993,14 @@ int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) {
* an issue because CLIENT is variadic command, so Redis will not
* recognized as a syntax error, and the transaction will not fail (but
* only the unsupported command will fail). */
- retval = redisAsyncCommand(ri->link->cc,
- sentinelDiscardReplyCallback, ri, "%s KILL TYPE normal",
- sentinelInstanceMapCommand(ri,"CLIENT"));
- if (retval == C_ERR) return retval;
- ri->link->pending_commands++;
+ for (int type = 0; type < 2; type++) {
+ retval = redisAsyncCommand(ri->link->cc,
+ sentinelDiscardReplyCallback, ri, "%s KILL TYPE %s",
+ sentinelInstanceMapCommand(ri,"CLIENT"),
+ type == 0 ? "normal" : "pubsub");
+ if (retval == C_ERR) return retval;
+ ri->link->pending_commands++;
+ }
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
diff --git a/src/server.c b/src/server.c
index a11cb538d..803cfc809 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1691,8 +1691,7 @@ void databasesCron(void) {
}
/* Defrag keys gradually. */
- if (server.active_defrag_enabled)
- activeDefragCycle();
+ activeDefragCycle();
/* Perform hash tables rehashing if needed, but only if there are no
* other processes saving the DB on disk. Otherwise rehashing is bad
@@ -1736,20 +1735,29 @@ void databasesCron(void) {
/* We take a cached value of the unix time in the global state because with
* virtual memory and aging there is to store the current time in objects at
* every object access, and accuracy is not needed. To access a global var is
- * a lot faster than calling time(NULL) */
-void updateCachedTime(void) {
- server.unixtime = time(NULL);
- server.mstime = mstime();
+ * a lot faster than calling time(NULL).
+ *
+ * This function should be fast because it is called at every command execution
+ * in call(), so it is possible to decide if to update the daylight saving
+ * info or not using the 'update_daylight_info' argument. Normally we update
+ * such info only when calling this function from serverCron() but not when
+ * calling it from call(). */
+void updateCachedTime(int update_daylight_info) {
+ server.ustime = ustime();
+ server.mstime = server.ustime / 1000;
+ server.unixtime = server.mstime / 1000;
/* To get information about daylight saving time, we need to call
* localtime_r and cache the result. However calling localtime_r in this
* context is safe since we will never fork() while here, in the main
* thread. The logging function will call a thread safe version of
* localtime that has no locks. */
- struct tm tm;
- time_t ut = server.unixtime;
- localtime_r(&ut,&tm);
- server.daylight_active = tm.tm_isdst;
+ if (update_daylight_info) {
+ struct tm tm;
+ time_t ut = server.unixtime;
+ localtime_r(&ut,&tm);
+ server.daylight_active = tm.tm_isdst;
+ }
}
void checkChildrenDone(void) {
@@ -1838,7 +1846,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
/* Update the time cache. */
- updateCachedTime();
+ updateCachedTime(1);
server.hz = server.config_hz;
/* Adapt the server.hz value to the number of configured clients. If we have
@@ -2056,6 +2064,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
server.rdb_bgsave_scheduled = 0;
}
+ /* Fire the cron loop modules event. */
+ RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz};
+ moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP,
+ 0,
+ &ei);
+
server.cronloops++;
return 1000/server.hz;
}
@@ -2252,7 +2266,7 @@ void createSharedObjects(void) {
void initServerConfig(void) {
int j;
- updateCachedTime();
+ updateCachedTime(1);
getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
server.runid[CONFIG_RUN_ID_SIZE] = '\0';
changeReplicationId();
@@ -2279,6 +2293,7 @@ void initServerConfig(void) {
server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT;
server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE;
server.active_expire_enabled = 1;
+ server.active_expire_effort = CONFIG_DEFAULT_ACTIVE_EXPIRE_EFFORT;
server.jemalloc_bg_thread = 1;
server.active_defrag_enabled = CONFIG_DEFAULT_ACTIVE_DEFRAG;
server.active_defrag_ignore_bytes = CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES;
@@ -2730,6 +2745,7 @@ void resetServerStats(void) {
server.stat_expiredkeys = 0;
server.stat_expired_stale_perc = 0;
server.stat_expired_time_cap_reached_count = 0;
+ server.stat_expire_cycle_time_used = 0;
server.stat_evictedkeys = 0;
server.stat_keyspace_misses = 0;
server.stat_keyspace_hits = 0;
@@ -2771,6 +2787,7 @@ void initServer(void) {
server.hz = server.config_hz;
server.pid = getpid();
server.current_client = NULL;
+ server.fixed_time_expire = 0;
server.clients = listCreate();
server.clients_index = raxNew();
server.clients_to_close = listCreate();
@@ -2832,12 +2849,14 @@ void initServer(void) {
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
+ server.db[j].expires_cursor = 0;
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
server.db[j].defrag_later = listCreate();
+ listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree);
}
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
@@ -3238,10 +3257,13 @@ void preventCommandReplication(client *c) {
*
*/
void call(client *c, int flags) {
- long long dirty, start, duration;
+ long long dirty;
+ ustime_t start, duration;
int client_old_flags = c->flags;
struct redisCommand *real_cmd = c->cmd;
+ server.fixed_time_expire++;
+
/* Sent the command to clients in MONITOR mode, only if the commands are
* not generated from reading an AOF. */
if (listLength(server.monitors) &&
@@ -3259,7 +3281,8 @@ void call(client *c, int flags) {
/* Call the command. */
dirty = server.dirty;
- start = ustime();
+ updateCachedTime(0);
+ start = server.ustime;
c->cmd->proc(c);
duration = ustime()-start;
dirty = server.dirty-dirty;
@@ -3366,6 +3389,7 @@ void call(client *c, int flags) {
trackingRememberKeys(caller);
}
+ server.fixed_time_expire--;
server.stat_numcommands++;
}
@@ -3682,6 +3706,9 @@ int prepareForShutdown(int flags) {
}
}
+ /* Fire the shutdown modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_SHUTDOWN,0,NULL);
+
/* Remove the pid file if possible and needed. */
if (server.daemonize || server.pidfile) {
serverLog(LL_NOTICE,"Removing the pid file.");
@@ -4244,6 +4271,7 @@ sds genRedisInfoString(const char *section) {
"expired_keys:%lld\r\n"
"expired_stale_perc:%.2f\r\n"
"expired_time_cap_reached_count:%lld\r\n"
+ "expire_cycle_cpu_milliseconds:%lld\r\n"
"evicted_keys:%lld\r\n"
"keyspace_hits:%lld\r\n"
"keyspace_misses:%lld\r\n"
@@ -4271,6 +4299,7 @@ sds genRedisInfoString(const char *section) {
server.stat_expiredkeys,
server.stat_expired_stale_perc*100,
server.stat_expired_time_cap_reached_count,
+ server.stat_expire_cycle_time_used/1000,
server.stat_evictedkeys,
server.stat_keyspace_hits,
server.stat_keyspace_misses,
@@ -4767,7 +4796,7 @@ void loadDataFromDisk(void) {
serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
} else {
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
- if (rdbLoad(server.rdb_filename,&rsi) == C_OK) {
+ if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) {
serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
(float)(ustime()-start)/1000000);
diff --git a/src/server.h b/src/server.h
index 9932bafa8..5a95d4c14 100644
--- a/src/server.h
+++ b/src/server.h
@@ -50,6 +50,7 @@
#include <signal.h>
typedef long long mstime_t; /* millisecond time type. */
+typedef long long ustime_t; /* microsecond time type. */
#include "ae.h" /* Event driven programming library */
#include "sds.h" /* Dynamic safe strings */
@@ -173,15 +174,13 @@ typedef long long mstime_t; /* millisecond time type. */
#define CONFIG_DEFAULT_DEFRAG_THRESHOLD_LOWER 10 /* don't defrag when fragmentation is below 10% */
#define CONFIG_DEFAULT_DEFRAG_THRESHOLD_UPPER 100 /* maximum defrag force at 100% fragmentation */
#define CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES (100<<20) /* don't defrag if frag overhead is below 100mb */
-#define CONFIG_DEFAULT_DEFRAG_CYCLE_MIN 5 /* 5% CPU min (at lower threshold) */
-#define CONFIG_DEFAULT_DEFRAG_CYCLE_MAX 75 /* 75% CPU max (at upper threshold) */
+#define CONFIG_DEFAULT_DEFRAG_CYCLE_MIN 1 /* 1% CPU min (at lower threshold) */
+#define CONFIG_DEFAULT_DEFRAG_CYCLE_MAX 25 /* 25% CPU max (at upper threshold) */
#define CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS 1000 /* keys with more than 1000 fields will be processed separately */
#define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */
#define CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL 10 /* 10% tracking table max fill. */
+#define CONFIG_DEFAULT_ACTIVE_EXPIRE_EFFORT 1 /* From 1 to 10. */
-#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */
-#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */
-#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* CPU max % for keys collection */
#define ACTIVE_EXPIRE_CYCLE_SLOW 0
#define ACTIVE_EXPIRE_CYCLE_FAST 1
@@ -720,6 +719,7 @@ typedef struct redisDb {
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
+ unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;
@@ -1133,7 +1133,8 @@ struct redisServer {
list *clients_pending_write; /* There is to write or install handler. */
list *clients_pending_read; /* Client has pending read socket buffers. */
list *slaves, *monitors; /* List of slaves and MONITORs */
- client *current_client; /* Current client, only used on crash report */
+ client *current_client; /* Current client executing the command. */
+ long fixed_time_expire; /* If > 0, expire keys against server.mstime. */
rax *clients_index; /* Active clients dictionary by client ID. */
int clients_paused; /* True if clients are currently paused */
mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
@@ -1165,6 +1166,7 @@ struct redisServer {
long long stat_expiredkeys; /* Number of expired keys */
double stat_expired_stale_perc; /* Percentage of keys probably expired */
long long stat_expired_time_cap_reached_count; /* Early expire cylce stops.*/
+ long long stat_expire_cycle_time_used; /* Cumulative microseconds used. */
long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */
long long stat_keyspace_hits; /* Number of successful lookups of keys */
long long stat_keyspace_misses; /* Number of failed lookups of keys */
@@ -1203,6 +1205,7 @@ struct redisServer {
int maxidletime; /* Client timeout in seconds */
int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */
int active_expire_enabled; /* Can be disabled for testing purposes. */
+ int active_expire_effort; /* From 1 (default) to 10, active effort. */
int active_defrag_enabled;
int jemalloc_bg_thread; /* Enable jemalloc background thread */
size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */
@@ -1400,7 +1403,8 @@ struct redisServer {
_Atomic time_t unixtime; /* Unix time sampled every cron cycle. */
time_t timezone; /* Cached timezone. As set by tzset(). */
int daylight_active; /* Currently in daylight saving time. */
- long long mstime; /* 'unixtime' with milliseconds resolution. */
+ mstime_t mstime; /* 'unixtime' in milliseconds. */
+ ustime_t ustime; /* 'unixtime' in microseconds. */
/* Pubsub */
dict *pubsub_channels; /* Map channels to list of subscribed clients */
list *pubsub_patterns; /* A list of pubsub_patterns */
@@ -1602,6 +1606,7 @@ ssize_t rdbSaveModulesAux(rio *rdb, int when);
int moduleAllDatatypesHandleErrors();
sds modulesCollectInfo(sds info, const char *section, int for_crash_report, int sections);
void moduleFireServerEvent(uint64_t eid, int subid, void *data);
+void processModuleLoadingProgressEvent(int is_aof);
int moduleTryServeClientBlockedOnKey(client *c, robj *key);
void moduleUnblockClient(client *c);
int moduleClientIsBlockedOnKeys(client *c);
@@ -1834,10 +1839,12 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
void rdbPipeWriteHandlerConnRemoved(struct connection *conn);
/* Generic persistence functions */
-void startLoadingFile(FILE* fp, char* filename);
-void startLoading(size_t size);
+void startLoadingFile(FILE* fp, char* filename, int rdbflags);
+void startLoading(size_t size, int rdbflags);
void loadingProgress(off_t pos);
-void stopLoading(void);
+void stopLoading(int success);
+void startSaving(int rdbflags);
+void stopSaving(int success);
#define DISK_ERROR_TYPE_AOF 1 /* Don't accept writes: AOF errors. */
#define DISK_ERROR_TYPE_RDB 2 /* Don't accept writes: RDB errors. */
@@ -1846,7 +1853,6 @@ int writeCommandsDeniedByDiskError(void);
/* RDB persistence */
#include "rdb.h"
-int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi);
void killRDBChild(void);
/* AOF persistence */
@@ -1862,6 +1868,7 @@ void aofRewriteBufferReset(void);
unsigned long aofRewriteBufferSize(void);
ssize_t aofReadDiffFromParent(void);
void killAppendOnlyChild(void);
+void restartAOFAfterSYNC();
/* Child info */
void openChildInfoPipe(void);
@@ -1996,7 +2003,7 @@ void populateCommandTable(void);
void resetCommandTableStats(void);
void adjustOpenFilesLimit(void);
void closeListeningSockets(int unlink_unix_socket);
-void updateCachedTime(void);
+void updateCachedTime(int update_daylight_info);
void resetServerStats(void);
void activeDefragCycle(void);
unsigned int getLRUClock(void);
@@ -2082,10 +2089,11 @@ robj *lookupKeyWrite(redisDb *db, robj *key);
robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply);
robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply);
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags);
+robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags);
robj *objectCommandLookup(client *c, robj *key);
robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply);
-void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
- long long lru_clock);
+int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
+ long long lru_clock, int lru_multiplier);
#define LOOKUP_NONE 0
#define LOOKUP_NOTOUCH (1<<0)
void dbAdd(redisDb *db, robj *key, robj *val);
@@ -2101,6 +2109,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o);
#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */
long long emptyDb(int dbnum, int flags, void(callback)(void*));
long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*));
+void flushAllDataAndResetRDB(int flags);
long long dbTotalServerKeyCount();
int selectDb(client *c, int id);
diff --git a/src/stream.h b/src/stream.h
index 1163b3527..7de769ba1 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -98,6 +98,7 @@ struct client;
stream *streamNew(void);
void freeStream(stream *s);
+unsigned long streamLength(const robj *subject);
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi);
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
diff --git a/src/t_hash.c b/src/t_hash.c
index e6ed33819..b9f0db7fc 100644
--- a/src/t_hash.c
+++ b/src/t_hash.c
@@ -621,7 +621,7 @@ void hincrbyfloatCommand(client *c) {
}
char buf[MAX_LONG_DOUBLE_CHARS];
- int len = ld2string(buf,sizeof(buf),value,1);
+ int len = ld2string(buf,sizeof(buf),value,LD_STR_HUMAN);
new = sdsnewlen(buf,len);
hashTypeSet(o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE);
addReplyBulkCBuffer(c,buf,len);
diff --git a/src/t_stream.c b/src/t_stream.c
index e6694f0b7..a499f7381 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -67,6 +67,12 @@ void freeStream(stream *s) {
zfree(s);
}
+/* Return the length of a stream. */
+unsigned long streamLength(const robj *subject) {
+ stream *s = subject->ptr;
+ return s->length;
+}
+
/* Generate the next stream item ID given the previous one. If the current
* milliseconds Unix time is greater than the previous one, just use this
* as time part and start with sequence part of zero. Otherwise we use the
@@ -173,9 +179,19 @@ int streamCompareID(streamID *a, streamID *b) {
* C_ERR if an ID was given via 'use_id', but adding it failed since the
* current top ID is greater or equal. */
int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) {
- /* If an ID was given, check that it's greater than the last entry ID
- * or return an error. */
- if (use_id && streamCompareID(use_id,&s->last_id) <= 0) return C_ERR;
+
+ /* Generate the new entry ID. */
+ streamID id;
+ if (use_id)
+ id = *use_id;
+ else
+ streamNextID(&s->last_id,&id);
+
+ /* Check that the new ID is greater than the last entry ID
+ * or return an error. Automatically generated IDs might
+ * overflow (and wrap-around) when incrementing the sequence
+ part. */
+ if (streamCompareID(&id,&s->last_id) <= 0) return C_ERR;
/* Add the new entry. */
raxIterator ri;
@@ -192,13 +208,6 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
}
raxStop(&ri);
- /* Generate the new entry ID. */
- streamID id;
- if (use_id)
- id = *use_id;
- else
- streamNextID(&s->last_id,&id);
-
/* We have to add the key into the radix tree in lexicographic order,
* to do so we consider the ID as a single 128 bit number written in
* big endian, so that the most significant bytes are the first ones. */
@@ -1197,6 +1206,14 @@ void xaddCommand(client *c) {
return;
}
+ /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating
+ * a new stream and have streamAppendItem fail, leaving an empty key in the
+ * database. */
+ if (id_given && id.ms == 0 && id.seq == 0) {
+ addReplyError(c,"The ID specified in XADD must be greater than 0-0");
+ return;
+ }
+
/* Lookup the stream at key. */
robj *o;
stream *s;
diff --git a/src/util.c b/src/util.c
index f0f1a4ed3..20471b539 100644
--- a/src/util.c
+++ b/src/util.c
@@ -551,15 +551,17 @@ int d2string(char *buf, size_t len, double value) {
return len;
}
-/* Convert a long double into a string. If humanfriendly is non-zero
- * it does not use exponential format and trims trailing zeroes at the end,
- * however this results in loss of precision. Otherwise exp format is used
- * and the output of snprintf() is not modified.
+/* Create a string object from a long double.
+ * If mode is humanfriendly it does not use exponential format and trims trailing
+ * zeroes at the end (may result in loss of precision).
+ * If mode is default exp format is used and the output of snprintf()
+ * is not modified (may result in loss of precision).
+ * If mode is hex hexadecimal format is used (no loss of precision)
*
* The function returns the length of the string or zero if there was not
* enough buffer room to store it. */
-int ld2string(char *buf, size_t len, long double value, int humanfriendly) {
- size_t l;
+int ld2string(char *buf, size_t len, long double value, ld2string_mode mode) {
+ size_t l = 0;
if (isinf(value)) {
/* Libc in odd systems (Hi Solaris!) will format infinite in a
@@ -572,26 +574,36 @@ int ld2string(char *buf, size_t len, long double value, int humanfriendly) {
memcpy(buf,"-inf",4);
l = 4;
}
- } else if (humanfriendly) {
- /* We use 17 digits precision since with 128 bit floats that precision
- * after rounding is able to represent most small decimal numbers in a
- * way that is "non surprising" for the user (that is, most small
- * decimal numbers will be represented in a way that when converted
- * back into a string are exactly the same as what the user typed.) */
- l = snprintf(buf,len,"%.17Lf", value);
- if (l+1 > len) return 0; /* No room. */
- /* Now remove trailing zeroes after the '.' */
- if (strchr(buf,'.') != NULL) {
- char *p = buf+l-1;
- while(*p == '0') {
- p--;
- l--;
+ } else {
+ switch (mode) {
+ case LD_STR_AUTO:
+ l = snprintf(buf,len,"%.17Lg",value);
+ if (l+1 > len) return 0; /* No room. */
+ break;
+ case LD_STR_HEX:
+ l = snprintf(buf,len,"%La",value);
+ if (l+1 > len) return 0; /* No room. */
+ break;
+ case LD_STR_HUMAN:
+ /* We use 17 digits precision since with 128 bit floats that precision
+ * after rounding is able to represent most small decimal numbers in a
+ * way that is "non surprising" for the user (that is, most small
+ * decimal numbers will be represented in a way that when converted
+ * back into a string are exactly the same as what the user typed.) */
+ l = snprintf(buf,len,"%.17Lf",value);
+ if (l+1 > len) return 0; /* No room. */
+ /* Now remove trailing zeroes after the '.' */
+ if (strchr(buf,'.') != NULL) {
+ char *p = buf+l-1;
+ while(*p == '0') {
+ p--;
+ l--;
+ }
+ if (*p == '.') l--;
}
- if (*p == '.') l--;
+ break;
+ default: return 0; /* Invalid mode. */
}
- } else {
- l = snprintf(buf,len,"%.17Lg", value);
- if (l+1 > len) return 0; /* No room. */
}
buf[l] = '\0';
return l;
diff --git a/src/util.h b/src/util.h
index 7e162686e..e9ad0ee4d 100644
--- a/src/util.h
+++ b/src/util.h
@@ -38,6 +38,13 @@
* This should be the size of the buffer given to ld2string */
#define MAX_LONG_DOUBLE_CHARS 5*1024
+/* long double to string convertion options */
+typedef enum {
+ LD_STR_AUTO, /* %.17Lg */
+ LD_STR_HUMAN, /* %.17Lf + Trimming of trailing zeros */
+ LD_STR_HEX /* %La */
+} ld2string_mode;
+
int stringmatchlen(const char *p, int plen, const char *s, int slen, int nocase);
int stringmatch(const char *p, const char *s, int nocase);
int stringmatchlen_fuzz_test(void);
@@ -51,7 +58,7 @@ int string2l(const char *s, size_t slen, long *value);
int string2ld(const char *s, size_t slen, long double *dp);
int string2d(const char *s, size_t slen, double *dp);
int d2string(char *buf, size_t len, double value);
-int ld2string(char *buf, size_t len, long double value, int humanfriendly);
+int ld2string(char *buf, size_t len, long double value, ld2string_mode mode);
sds getAbsolutePath(char *filename);
unsigned long getTimeZone(void);
int pathIsBaseName(char *path);
diff --git a/tests/modules/Makefile b/tests/modules/Makefile
index 71c0b5ef8..f33f9e80e 100644
--- a/tests/modules/Makefile
+++ b/tests/modules/Makefile
@@ -18,7 +18,10 @@ TEST_MODULES = \
infotest.so \
propagate.so \
misc.so \
- hooks.so
+ hooks.so \
+ blockonkeys.so \
+ scan.so \
+ datatype.so
.PHONY: all
diff --git a/tests/modules/blockonkeys.c b/tests/modules/blockonkeys.c
new file mode 100644
index 000000000..959918b1c
--- /dev/null
+++ b/tests/modules/blockonkeys.c
@@ -0,0 +1,261 @@
+#define REDISMODULE_EXPERIMENTAL_API
+#include "redismodule.h"
+
+#include <string.h>
+#include <assert.h>
+#include <unistd.h>
+
+#define LIST_SIZE 1024
+
+typedef struct {
+ long long list[LIST_SIZE];
+ long long length;
+} fsl_t; /* Fixed-size list */
+
+static RedisModuleType *fsltype = NULL;
+
+fsl_t *fsl_type_create() {
+ fsl_t *o;
+ o = RedisModule_Alloc(sizeof(*o));
+ o->length = 0;
+ return o;
+}
+
+void fsl_type_free(fsl_t *o) {
+ RedisModule_Free(o);
+}
+
+/* ========================== "fsltype" type methods ======================= */
+
+void *fsl_rdb_load(RedisModuleIO *rdb, int encver) {
+ if (encver != 0) {
+ return NULL;
+ }
+ fsl_t *fsl = fsl_type_create();
+ fsl->length = RedisModule_LoadUnsigned(rdb);
+ for (long long i = 0; i < fsl->length; i++)
+ fsl->list[i] = RedisModule_LoadSigned(rdb);
+ return fsl;
+}
+
+void fsl_rdb_save(RedisModuleIO *rdb, void *value) {
+ fsl_t *fsl = value;
+ RedisModule_SaveUnsigned(rdb,fsl->length);
+ for (long long i = 0; i < fsl->length; i++)
+ RedisModule_SaveSigned(rdb, fsl->list[i]);
+}
+
+void fsl_aofrw(RedisModuleIO *aof, RedisModuleString *key, void *value) {
+ fsl_t *fsl = value;
+ for (long long i = 0; i < fsl->length; i++)
+ RedisModule_EmitAOF(aof, "FSL.PUSH","sl", key, fsl->list[i]);
+}
+
+void fsl_free(void *value) {
+ fsl_type_free(value);
+}
+
+/* ========================== helper methods ======================= */
+
+int get_fsl(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode, int create, fsl_t **fsl, int reply_on_failure) {
+ RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, mode);
+
+ int type = RedisModule_KeyType(key);
+ if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != fsltype) {
+ RedisModule_CloseKey(key);
+ if (reply_on_failure)
+ RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
+ return 0;
+ }
+
+ /* Create an empty value object if the key is currently empty. */
+ if (type == REDISMODULE_KEYTYPE_EMPTY) {
+ if (!create) {
+ /* Key is empty but we cannot create */
+ RedisModule_CloseKey(key);
+ *fsl = NULL;
+ return 1;
+ }
+ *fsl = fsl_type_create();
+ RedisModule_ModuleTypeSetValue(key, fsltype, *fsl);
+ } else {
+ *fsl = RedisModule_ModuleTypeGetValue(key);
+ }
+
+ RedisModule_CloseKey(key);
+ return 1;
+}
+
+/* ========================== commands ======================= */
+
+/* FSL.PUSH <key> <int> - Push an integer to the fixed-size list (to the right).
+ * It must be greater than the element in the head of the list. */
+int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 3)
+ return RedisModule_WrongArity(ctx);
+
+ long long ele;
+ if (RedisModule_StringToLongLong(argv[2],&ele) != REDISMODULE_OK)
+ return RedisModule_ReplyWithError(ctx,"ERR invalid integer");
+
+ fsl_t *fsl;
+ if (!get_fsl(ctx, argv[1], REDISMODULE_WRITE, 1, &fsl, 1))
+ return REDISMODULE_OK;
+
+ if (fsl->length == LIST_SIZE)
+ return RedisModule_ReplyWithError(ctx,"ERR list is full");
+
+ if (fsl->length != 0 && fsl->list[fsl->length-1] >= ele)
+ return RedisModule_ReplyWithError(ctx,"ERR new element has to be greater than the head element");
+
+ fsl->list[fsl->length++] = ele;
+
+ if (fsl->length >= 2)
+ RedisModule_SignalKeyAsReady(ctx, argv[1]);
+
+ return RedisModule_ReplyWithSimpleString(ctx, "OK");
+}
+
+int bpop2_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
+
+ fsl_t *fsl;
+ if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0))
+ return REDISMODULE_ERR;
+
+ if (!fsl || fsl->length < 2)
+ return REDISMODULE_ERR;
+
+ RedisModule_ReplyWithArray(ctx, 2);
+ RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
+ RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
+ return REDISMODULE_OK;
+}
+
+int bpop2_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
+}
+
+
+/* FSL.BPOP2 <key> <timeout> - Block clients until list has two or more elements.
+ * When that happens, unblock client and pop the last two elements (from the right). */
+int fsl_bpop2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 3)
+ return RedisModule_WrongArity(ctx);
+
+ long long timeout;
+ if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK || timeout < 0)
+ return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
+
+ fsl_t *fsl;
+ if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
+ return REDISMODULE_OK;
+
+ if (!fsl || fsl->length < 2) {
+ /* Key is empty or has <2 elements, we must block */
+ RedisModule_BlockClientOnKeys(ctx, bpop2_reply_callback, bpop2_timeout_callback,
+ NULL, timeout, &argv[1], 1, NULL);
+ } else {
+ RedisModule_ReplyWithArray(ctx, 2);
+ RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
+ RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
+ }
+
+ return REDISMODULE_OK;
+}
+
+int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
+ long long gt = (long long)RedisModule_GetBlockedClientPrivateData(ctx);
+
+ fsl_t *fsl;
+ if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0))
+ return REDISMODULE_ERR;
+
+ if (!fsl || fsl->list[fsl->length-1] <= gt)
+ return REDISMODULE_ERR;
+
+ RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
+ return REDISMODULE_OK;
+}
+
+int bpopgt_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
+}
+
+void bpopgt_free_privdata(RedisModuleCtx *ctx, void *privdata) {
+ /* Nothing to do because privdata is actually a 'long long',
+ * not a pointer to the heap */
+ REDISMODULE_NOT_USED(ctx);
+ REDISMODULE_NOT_USED(privdata);
+}
+
+/* FSL.BPOPGT <key> <gt> <timeout> - Block clients until list has an element greater than <gt>.
+ * When that happens, unblock client and pop the last element (from the right). */
+int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 4)
+ return RedisModule_WrongArity(ctx);
+
+ long long gt;
+ if (RedisModule_StringToLongLong(argv[2],&gt) != REDISMODULE_OK)
+ return RedisModule_ReplyWithError(ctx,"ERR invalid integer");
+
+ long long timeout;
+ if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0)
+ return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
+
+ fsl_t *fsl;
+ if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
+ return REDISMODULE_OK;
+
+ if (!fsl || fsl->list[fsl->length-1] <= gt) {
+ /* Key is empty or has <2 elements, we must block */
+ RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback,
+ bpopgt_free_privdata, timeout, &argv[1], 1, (void*)gt);
+ } else {
+ RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
+ }
+
+ return REDISMODULE_OK;
+}
+
+int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ if (RedisModule_Init(ctx, "blockonkeys", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ RedisModuleTypeMethods tm = {
+ .version = REDISMODULE_TYPE_METHOD_VERSION,
+ .rdb_load = fsl_rdb_load,
+ .rdb_save = fsl_rdb_save,
+ .aof_rewrite = fsl_aofrw,
+ .mem_usage = NULL,
+ .free = fsl_free,
+ .digest = NULL
+ };
+
+ fsltype = RedisModule_CreateDataType(ctx, "fsltype_t", 0, &tm);
+ if (fsltype == NULL)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"fsl.bpop2",fsl_bpop2,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"fsl.bpopgt",fsl_bpopgt,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ return REDISMODULE_OK;
+}
diff --git a/tests/modules/datatype.c b/tests/modules/datatype.c
new file mode 100644
index 000000000..7c39ab457
--- /dev/null
+++ b/tests/modules/datatype.c
@@ -0,0 +1,161 @@
+/* This module current tests a small subset but should be extended in the future
+ * for general ModuleDataType coverage.
+ */
+
+#include "redismodule.h"
+
+static RedisModuleType *datatype = NULL;
+
+typedef struct {
+ long long intval;
+ RedisModuleString *strval;
+} DataType;
+
+static void *datatype_load(RedisModuleIO *io, int encver) {
+ (void) encver;
+
+ int intval = RedisModule_LoadSigned(io);
+ if (RedisModule_IsIOError(io)) return NULL;
+
+ RedisModuleString *strval = RedisModule_LoadString(io);
+ if (RedisModule_IsIOError(io)) return NULL;
+
+ DataType *dt = (DataType *) RedisModule_Alloc(sizeof(DataType));
+ dt->intval = intval;
+ dt->strval = strval;
+ return dt;
+}
+
+static void datatype_save(RedisModuleIO *io, void *value) {
+ DataType *dt = (DataType *) value;
+ RedisModule_SaveSigned(io, dt->intval);
+ RedisModule_SaveString(io, dt->strval);
+}
+
+static void datatype_free(void *value) {
+ if (value) {
+ DataType *dt = (DataType *) value;
+
+ if (dt->strval) RedisModule_FreeString(NULL, dt->strval);
+ RedisModule_Free(dt);
+ }
+}
+
+static int datatype_set(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 4) {
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+
+ long long intval;
+
+ if (RedisModule_StringToLongLong(argv[2], &intval) != REDISMODULE_OK) {
+ RedisModule_ReplyWithError(ctx, "Invalid integr value");
+ return REDISMODULE_OK;
+ }
+
+ RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
+ DataType *dt = RedisModule_Calloc(sizeof(DataType), 1);
+ dt->intval = intval;
+ dt->strval = argv[3];
+ RedisModule_RetainString(ctx, dt->strval);
+
+ RedisModule_ModuleTypeSetValue(key, datatype, dt);
+ RedisModule_CloseKey(key);
+ RedisModule_ReplyWithSimpleString(ctx, "OK");
+
+ return REDISMODULE_OK;
+}
+
+static int datatype_restore(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 3) {
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+
+ DataType *dt = RedisModule_LoadDataTypeFromString(argv[2], datatype);
+ if (!dt) {
+ RedisModule_ReplyWithError(ctx, "Invalid data");
+ return REDISMODULE_OK;
+ }
+
+ RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
+ RedisModule_ModuleTypeSetValue(key, datatype, dt);
+ RedisModule_CloseKey(key);
+ RedisModule_ReplyWithSimpleString(ctx, "OK");
+
+ return REDISMODULE_OK;
+}
+
+static int datatype_get(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 2) {
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+
+ RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ);
+ DataType *dt = RedisModule_ModuleTypeGetValue(key);
+ RedisModule_CloseKey(key);
+
+ RedisModule_ReplyWithArray(ctx, 2);
+ RedisModule_ReplyWithLongLong(ctx, dt->intval);
+ RedisModule_ReplyWithString(ctx, dt->strval);
+ return REDISMODULE_OK;
+}
+
+static int datatype_dump(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 2) {
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+
+ RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ);
+ DataType *dt = RedisModule_ModuleTypeGetValue(key);
+ RedisModule_CloseKey(key);
+
+ RedisModuleString *reply = RedisModule_SaveDataTypeToString(ctx, dt, datatype);
+ if (!reply) {
+ RedisModule_ReplyWithError(ctx, "Failed to save");
+ return REDISMODULE_OK;
+ }
+
+ RedisModule_ReplyWithString(ctx, reply);
+ RedisModule_FreeString(ctx, reply);
+ return REDISMODULE_OK;
+}
+
+
+int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ if (RedisModule_Init(ctx,"datatype",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_HANDLE_IO_ERRORS);
+
+ RedisModuleTypeMethods datatype_methods = {
+ .version = REDISMODULE_TYPE_METHOD_VERSION,
+ .rdb_load = datatype_load,
+ .rdb_save = datatype_save,
+ .free = datatype_free,
+ };
+
+ datatype = RedisModule_CreateDataType(ctx, "test___dt", 1, &datatype_methods);
+ if (datatype == NULL)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"datatype.set", datatype_set,"deny-oom",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"datatype.get", datatype_get,"",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"datatype.restore", datatype_restore,"deny-oom",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"datatype.dump", datatype_dump,"",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ return REDISMODULE_OK;
+}
diff --git a/tests/modules/hooks.c b/tests/modules/hooks.c
index 33b690b2f..665a20481 100644
--- a/tests/modules/hooks.c
+++ b/tests/modules/hooks.c
@@ -30,36 +30,227 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
-#define REDISMODULE_EXPERIMENTAL_API
#include "redismodule.h"
+#include <stdio.h>
+#include <string.h>
+
+/* We need to store events to be able to test and see what we got, and we can't
+ * store them in the key-space since that would mess up rdb loading (duplicates)
+ * and be lost of flushdb. */
+RedisModuleDict *event_log = NULL;
+
+typedef struct EventElement {
+ long count;
+ RedisModuleString *last_val_string;
+ long last_val_int;
+} EventElement;
+
+void LogStringEvent(RedisModuleCtx *ctx, const char* keyname, const char* data) {
+ EventElement *event = RedisModule_DictGetC(event_log, (void*)keyname, strlen(keyname), NULL);
+ if (!event) {
+ event = RedisModule_Alloc(sizeof(EventElement));
+ memset(event, 0, sizeof(EventElement));
+ RedisModule_DictSetC(event_log, (void*)keyname, strlen(keyname), event);
+ }
+ if (event->last_val_string) RedisModule_FreeString(ctx, event->last_val_string);
+ event->last_val_string = RedisModule_CreateString(ctx, data, strlen(data));
+ event->count++;
+}
+
+void LogNumericEvent(RedisModuleCtx *ctx, const char* keyname, long data) {
+ REDISMODULE_NOT_USED(ctx);
+ EventElement *event = RedisModule_DictGetC(event_log, (void*)keyname, strlen(keyname), NULL);
+ if (!event) {
+ event = RedisModule_Alloc(sizeof(EventElement));
+ memset(event, 0, sizeof(EventElement));
+ RedisModule_DictSetC(event_log, (void*)keyname, strlen(keyname), event);
+ }
+ event->last_val_int = data;
+ event->count++;
+}
+
+void FreeEvent(RedisModuleCtx *ctx, EventElement *event) {
+ if (event->last_val_string)
+ RedisModule_FreeString(ctx, event->last_val_string);
+ RedisModule_Free(event);
+}
+
+int cmdEventCount(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ if (argc != 2){
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+
+ EventElement *event = RedisModule_DictGet(event_log, argv[1], NULL);
+ RedisModule_ReplyWithLongLong(ctx, event? event->count: 0);
+ return REDISMODULE_OK;
+}
+
+int cmdEventLast(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ if (argc != 2){
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+
+ EventElement *event = RedisModule_DictGet(event_log, argv[1], NULL);
+ if (event && event->last_val_string)
+ RedisModule_ReplyWithString(ctx, event->last_val_string);
+ else if (event)
+ RedisModule_ReplyWithLongLong(ctx, event->last_val_int);
+ else
+ RedisModule_ReplyWithNull(ctx);
+ return REDISMODULE_OK;
+}
+
+void clearEvents(RedisModuleCtx *ctx)
+{
+ RedisModuleString *key;
+ EventElement *event;
+ RedisModuleDictIter *iter = RedisModule_DictIteratorStart(event_log, "^", NULL);
+ while((key = RedisModule_DictNext(ctx, iter, (void**)&event)) != NULL) {
+ event->count = 0;
+ event->last_val_int = 0;
+ if (event->last_val_string) RedisModule_FreeString(ctx, event->last_val_string);
+ event->last_val_string = NULL;
+ RedisModule_DictDel(event_log, key, NULL);
+ RedisModule_Free(event);
+ }
+ RedisModule_DictIteratorStop(iter);
+}
+
+int cmdEventsClear(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ REDISMODULE_NOT_USED(argc);
+ REDISMODULE_NOT_USED(argv);
+ clearEvents(ctx);
+ return REDISMODULE_OK;
+}
/* Client state change callback. */
void clientChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
- REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(e);
RedisModuleClientInfo *ci = data;
char *keyname = (sub == REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED) ?
- "connected" : "disconnected";
- RedisModuleCallReply *reply;
- RedisModule_SelectDb(ctx,9);
- reply = RedisModule_Call(ctx,"RPUSH","cl",keyname,(long)ci->id);
- RedisModule_FreeCallReply(reply);
+ "client-connected" : "client-disconnected";
+ LogNumericEvent(ctx, keyname, ci->id);
}
void flushdbCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
- REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(e);
RedisModuleFlushInfo *fi = data;
char *keyname = (sub == REDISMODULE_SUBEVENT_FLUSHDB_START) ?
"flush-start" : "flush-end";
- RedisModuleCallReply *reply;
- RedisModule_SelectDb(ctx,9);
- reply = RedisModule_Call(ctx,"RPUSH","cl",keyname,(long)fi->dbnum);
- RedisModule_FreeCallReply(reply);
+ LogNumericEvent(ctx, keyname, fi->dbnum);
+}
+
+void roleChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
+{
+ REDISMODULE_NOT_USED(e);
+ REDISMODULE_NOT_USED(data);
+
+ RedisModuleReplicationInfo *ri = data;
+ char *keyname = (sub == REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER) ?
+ "role-master" : "role-replica";
+ LogStringEvent(ctx, keyname, ri->masterhost);
+}
+
+void replicationChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
+{
+ REDISMODULE_NOT_USED(e);
+ REDISMODULE_NOT_USED(data);
+
+ char *keyname = (sub == REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE) ?
+ "replica-online" : "replica-offline";
+ LogNumericEvent(ctx, keyname, 0);
+}
+
+void rasterLinkChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
+{
+ REDISMODULE_NOT_USED(e);
+ REDISMODULE_NOT_USED(data);
+
+ char *keyname = (sub == REDISMODULE_SUBEVENT_MASTER_LINK_UP) ?
+ "masterlink-up" : "masterlink-down";
+ LogNumericEvent(ctx, keyname, 0);
+}
+
+void persistenceCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
+{
+ REDISMODULE_NOT_USED(e);
+ REDISMODULE_NOT_USED(data);
+
+ char *keyname = NULL;
+ switch (sub) {
+ case REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START: keyname = "persistence-rdb-start"; break;
+ case REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START: keyname = "persistence-aof-start"; break;
+ case REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START: keyname = "persistence-syncrdb-start"; break;
+ case REDISMODULE_SUBEVENT_PERSISTENCE_ENDED: keyname = "persistence-end"; break;
+ case REDISMODULE_SUBEVENT_PERSISTENCE_FAILED: keyname = "persistence-failed"; break;
+ }
+ /* modifying the keyspace from the fork child is not an option, using log instead */
+ RedisModule_Log(ctx, "warning", "module-event-%s", keyname);
+ if (sub == REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START)
+ LogNumericEvent(ctx, keyname, 0);
+}
+
+void loadingCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
+{
+ REDISMODULE_NOT_USED(e);
+ REDISMODULE_NOT_USED(data);
+
+ char *keyname = NULL;
+ switch (sub) {
+ case REDISMODULE_SUBEVENT_LOADING_RDB_START: keyname = "loading-rdb-start"; break;
+ case REDISMODULE_SUBEVENT_LOADING_AOF_START: keyname = "loading-aof-start"; break;
+ case REDISMODULE_SUBEVENT_LOADING_REPL_START: keyname = "loading-repl-start"; break;
+ case REDISMODULE_SUBEVENT_LOADING_ENDED: keyname = "loading-end"; break;
+ case REDISMODULE_SUBEVENT_LOADING_FAILED: keyname = "loading-failed"; break;
+ }
+ LogNumericEvent(ctx, keyname, 0);
+}
+
+void loadingProgressCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
+{
+ REDISMODULE_NOT_USED(e);
+
+ RedisModuleLoadingProgress *ei = data;
+ char *keyname = (sub == REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB) ?
+ "loading-progress-rdb" : "loading-progress-aof";
+ LogNumericEvent(ctx, keyname, ei->progress);
+}
+
+void shutdownCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
+{
+ REDISMODULE_NOT_USED(e);
+ REDISMODULE_NOT_USED(data);
+ REDISMODULE_NOT_USED(sub);
+
+ RedisModule_Log(ctx, "warning", "module-event-%s", "shutdown");
+}
+
+void cronLoopCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
+{
+ REDISMODULE_NOT_USED(e);
+ REDISMODULE_NOT_USED(sub);
+
+ RedisModuleCronLoop *ei = data;
+ LogNumericEvent(ctx, "cron-loop", ei->hz);
+}
+
+void moduleChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
+{
+ REDISMODULE_NOT_USED(e);
+
+ RedisModuleModuleChange *ei = data;
+ char *keyname = (sub == REDISMODULE_SUBEVENT_MODULE_LOADED) ?
+ "module-loaded" : "module-unloaded";
+ LogStringEvent(ctx, keyname, ei->module_name);
}
/* This function must be present on each Redis module. It is used in order to
@@ -71,9 +262,50 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
if (RedisModule_Init(ctx,"testhook",1,REDISMODULE_APIVER_1)
== REDISMODULE_ERR) return REDISMODULE_ERR;
+ /* replication related hooks */
+ RedisModule_SubscribeToServerEvent(ctx,
+ RedisModuleEvent_ReplicationRoleChanged, roleChangeCallback);
+ RedisModule_SubscribeToServerEvent(ctx,
+ RedisModuleEvent_ReplicaChange, replicationChangeCallback);
+ RedisModule_SubscribeToServerEvent(ctx,
+ RedisModuleEvent_MasterLinkChange, rasterLinkChangeCallback);
+
+ /* persistence related hooks */
+ RedisModule_SubscribeToServerEvent(ctx,
+ RedisModuleEvent_Persistence, persistenceCallback);
+ RedisModule_SubscribeToServerEvent(ctx,
+ RedisModuleEvent_Loading, loadingCallback);
+ RedisModule_SubscribeToServerEvent(ctx,
+ RedisModuleEvent_LoadingProgress, loadingProgressCallback);
+
+ /* other hooks */
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_ClientChange, clientChangeCallback);
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_FlushDB, flushdbCallback);
+ RedisModule_SubscribeToServerEvent(ctx,
+ RedisModuleEvent_Shutdown, shutdownCallback);
+ RedisModule_SubscribeToServerEvent(ctx,
+ RedisModuleEvent_CronLoop, cronLoopCallback);
+ RedisModule_SubscribeToServerEvent(ctx,
+ RedisModuleEvent_ModuleChange, moduleChangeCallback);
+
+ event_log = RedisModule_CreateDict(ctx);
+
+ if (RedisModule_CreateCommand(ctx,"hooks.event_count", cmdEventCount,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx,"hooks.event_last", cmdEventLast,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx,"hooks.clear", cmdEventsClear,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
return REDISMODULE_OK;
}
+
+int RedisModule_OnUnload(RedisModuleCtx *ctx) {
+ clearEvents(ctx);
+ RedisModule_FreeDict(ctx, event_log);
+ event_log = NULL;
+ return REDISMODULE_OK;
+}
+
diff --git a/tests/modules/misc.c b/tests/modules/misc.c
index fd892f52c..b5a032f60 100644
--- a/tests/modules/misc.c
+++ b/tests/modules/misc.c
@@ -6,6 +6,8 @@
#include <unistd.h>
#include <errno.h>
+#define UNUSED(x) (void)(x)
+
int test_call_generic(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
if (argc<2) {
@@ -40,6 +42,146 @@ int test_call_info(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_OK;
}
+int test_ld_conv(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ UNUSED(argv);
+ UNUSED(argc);
+ long double ld = 0.00000000000000001L;
+ const char *ldstr = "0.00000000000000001";
+ RedisModuleString *s1 = RedisModule_CreateStringFromLongDouble(ctx, ld, 1);
+ RedisModuleString *s2 =
+ RedisModule_CreateString(ctx, ldstr, strlen(ldstr));
+ if (RedisModule_StringCompare(s1, s2) != 0) {
+ char err[4096];
+ snprintf(err, 4096,
+ "Failed to convert long double to string ('%s' != '%s')",
+ RedisModule_StringPtrLen(s1, NULL),
+ RedisModule_StringPtrLen(s2, NULL));
+ RedisModule_ReplyWithError(ctx, err);
+ goto final;
+ }
+ long double ld2 = 0;
+ if (RedisModule_StringToLongDouble(s2, &ld2) == REDISMODULE_ERR) {
+ RedisModule_ReplyWithError(ctx,
+ "Failed to convert string to long double");
+ goto final;
+ }
+ if (ld2 != ld) {
+ char err[4096];
+ snprintf(err, 4096,
+ "Failed to convert string to long double (%.40Lf != %.40Lf)",
+ ld2,
+ ld);
+ RedisModule_ReplyWithError(ctx, err);
+ goto final;
+ }
+ RedisModule_ReplyWithLongDouble(ctx, ld2);
+final:
+ RedisModule_FreeString(ctx, s1);
+ RedisModule_FreeString(ctx, s2);
+ return REDISMODULE_OK;
+}
+
+int test_flushall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ RedisModule_ResetDataset(1, 0);
+ RedisModule_ReplyWithCString(ctx, "Ok");
+ return REDISMODULE_OK;
+}
+
+int test_dbsize(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ long long ll = RedisModule_DbSize(ctx);
+ RedisModule_ReplyWithLongLong(ctx, ll);
+ return REDISMODULE_OK;
+}
+
+int test_randomkey(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ RedisModuleString *str = RedisModule_RandomKey(ctx);
+ RedisModule_ReplyWithString(ctx, str);
+ RedisModule_FreeString(ctx, str);
+ return REDISMODULE_OK;
+}
+
+RedisModuleKey *open_key_or_reply(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode) {
+ RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, mode);
+ if (!key) {
+ RedisModule_ReplyWithError(ctx, "key not found");
+ return NULL;
+ }
+ return key;
+}
+
+int test_getlru(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ if (argc<2) {
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+ RedisModuleKey *key = open_key_or_reply(ctx, argv[1], REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH);
+ mstime_t lru;
+ RedisModule_GetLRU(key, &lru);
+ RedisModule_ReplyWithLongLong(ctx, lru);
+ RedisModule_CloseKey(key);
+ return REDISMODULE_OK;
+}
+
+int test_setlru(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ if (argc<3) {
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+ RedisModuleKey *key = open_key_or_reply(ctx, argv[1], REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH);
+ mstime_t lru;
+ if (RedisModule_StringToLongLong(argv[2], &lru) != REDISMODULE_OK) {
+ RedisModule_ReplyWithError(ctx, "invalid idle time");
+ return REDISMODULE_OK;
+ }
+ int was_set = RedisModule_SetLRU(key, lru)==REDISMODULE_OK;
+ RedisModule_ReplyWithLongLong(ctx, was_set);
+ RedisModule_CloseKey(key);
+ return REDISMODULE_OK;
+}
+
+int test_getlfu(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ if (argc<2) {
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+ RedisModuleKey *key = open_key_or_reply(ctx, argv[1], REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH);
+ mstime_t lfu;
+ RedisModule_GetLFU(key, &lfu);
+ RedisModule_ReplyWithLongLong(ctx, lfu);
+ RedisModule_CloseKey(key);
+ return REDISMODULE_OK;
+}
+
+int test_setlfu(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ if (argc<3) {
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+ RedisModuleKey *key = open_key_or_reply(ctx, argv[1], REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH);
+ mstime_t lfu;
+ if (RedisModule_StringToLongLong(argv[2], &lfu) != REDISMODULE_OK) {
+ RedisModule_ReplyWithError(ctx, "invalid freq");
+ return REDISMODULE_OK;
+ }
+ int was_set = RedisModule_SetLFU(key, lfu)==REDISMODULE_OK;
+ RedisModule_ReplyWithLongLong(ctx, was_set);
+ RedisModule_CloseKey(key);
+ return REDISMODULE_OK;
+}
+
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
@@ -50,6 +192,22 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"test.call_info", test_call_info,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx,"test.ld_conversion", test_ld_conv, "",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx,"test.flushall", test_flushall,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx,"test.dbsize", test_dbsize,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx,"test.randomkey", test_randomkey,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx,"test.setlru", test_setlru,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx,"test.getlru", test_getlru,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx,"test.setlfu", test_setlfu,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx,"test.getlfu", test_getlfu,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
return REDISMODULE_OK;
}
diff --git a/tests/modules/scan.c b/tests/modules/scan.c
new file mode 100644
index 000000000..afede244b
--- /dev/null
+++ b/tests/modules/scan.c
@@ -0,0 +1,109 @@
+#include "redismodule.h"
+
+#include <string.h>
+#include <assert.h>
+#include <unistd.h>
+
+typedef struct {
+ size_t nkeys;
+} scan_strings_pd;
+
+void scan_strings_callback(RedisModuleCtx *ctx, RedisModuleString* keyname, RedisModuleKey* key, void *privdata) {
+ scan_strings_pd* pd = privdata;
+ int was_opened = 0;
+ if (!key) {
+ key = RedisModule_OpenKey(ctx, keyname, REDISMODULE_READ);
+ was_opened = 1;
+ }
+
+ if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_STRING) {
+ size_t len;
+ char * data = RedisModule_StringDMA(key, &len, REDISMODULE_READ);
+ RedisModule_ReplyWithArray(ctx, 2);
+ RedisModule_ReplyWithString(ctx, keyname);
+ RedisModule_ReplyWithStringBuffer(ctx, data, len);
+ pd->nkeys++;
+ }
+ if (was_opened)
+ RedisModule_CloseKey(key);
+}
+
+int scan_strings(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ scan_strings_pd pd = {
+ .nkeys = 0,
+ };
+
+ RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
+
+ RedisModuleScanCursor* cursor = RedisModule_ScanCursorCreate();
+ while(RedisModule_Scan(ctx, cursor, scan_strings_callback, &pd));
+ RedisModule_ScanCursorDestroy(cursor);
+
+ RedisModule_ReplySetArrayLength(ctx, pd.nkeys);
+ return REDISMODULE_OK;
+}
+
+typedef struct {
+ RedisModuleCtx *ctx;
+ size_t nreplies;
+} scan_key_pd;
+
+void scan_key_callback(RedisModuleKey *key, RedisModuleString* field, RedisModuleString* value, void *privdata) {
+ REDISMODULE_NOT_USED(key);
+ scan_key_pd* pd = privdata;
+ RedisModule_ReplyWithArray(pd->ctx, 2);
+ RedisModule_ReplyWithString(pd->ctx, field);
+ if (value)
+ RedisModule_ReplyWithString(pd->ctx, value);
+ else
+ RedisModule_ReplyWithNull(pd->ctx);
+ pd->nreplies++;
+}
+
+int scan_key(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ if (argc != 2) {
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+ scan_key_pd pd = {
+ .ctx = ctx,
+ .nreplies = 0,
+ };
+
+ RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ);
+ if (!key) {
+ RedisModule_ReplyWithError(ctx, "not found");
+ return REDISMODULE_OK;
+ }
+
+ RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
+
+ RedisModuleScanCursor* cursor = RedisModule_ScanCursorCreate();
+ while(RedisModule_ScanKey(key, cursor, scan_key_callback, &pd));
+ RedisModule_ScanCursorDestroy(cursor);
+
+ RedisModule_ReplySetArrayLength(ctx, pd.nreplies);
+ RedisModule_CloseKey(key);
+ return REDISMODULE_OK;
+}
+
+int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ if (RedisModule_Init(ctx, "scan", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "scan.scan_strings", scan_strings, "", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "scan.scan_key", scan_key, "", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ return REDISMODULE_OK;
+}
+
+
diff --git a/tests/modules/testrdb.c b/tests/modules/testrdb.c
index eb8d1a999..7c04bb4ef 100644
--- a/tests/modules/testrdb.c
+++ b/tests/modules/testrdb.c
@@ -15,11 +15,19 @@ RedisModuleString *after_str = NULL;
void *testrdb_type_load(RedisModuleIO *rdb, int encver) {
int count = RedisModule_LoadSigned(rdb);
- if (RedisModule_IsIOError(rdb))
+ RedisModuleString *str = RedisModule_LoadString(rdb);
+ float f = RedisModule_LoadFloat(rdb);
+ long double ld = RedisModule_LoadLongDouble(rdb);
+ if (RedisModule_IsIOError(rdb)) {
+ RedisModuleCtx *ctx = RedisModule_GetContextFromIO(rdb);
+ RedisModule_FreeString(ctx, str);
return NULL;
+ }
+ /* Using the values only after checking for io errors. */
assert(count==1);
assert(encver==1);
- RedisModuleString *str = RedisModule_LoadString(rdb);
+ assert(f==1.5f);
+ assert(ld==0.333333333333333333L);
return str;
}
@@ -27,6 +35,8 @@ void testrdb_type_save(RedisModuleIO *rdb, void *value) {
RedisModuleString *str = (RedisModuleString*)value;
RedisModule_SaveSigned(rdb, 1);
RedisModule_SaveString(rdb, str);
+ RedisModule_SaveFloat(rdb, 1.5);
+ RedisModule_SaveLongDouble(rdb, 0.333333333333333333L);
}
void testrdb_aux_save(RedisModuleIO *rdb, int when) {
diff --git a/tests/support/test.tcl b/tests/support/test.tcl
index 2646acecd..5e8916236 100644
--- a/tests/support/test.tcl
+++ b/tests/support/test.tcl
@@ -11,28 +11,55 @@ proc fail {msg} {
proc assert {condition} {
if {![uplevel 1 [list expr $condition]]} {
- error "assertion:Expected condition '$condition' to be true ([uplevel 1 [list subst -nocommands $condition]])"
+ set context "(context: [info frame -1])"
+ error "assertion:Expected [uplevel 1 [list subst -nocommands $condition]] $context"
}
}
proc assert_no_match {pattern value} {
if {[string match $pattern $value]} {
- error "assertion:Expected '$value' to not match '$pattern'"
+ set context "(context: [info frame -1])"
+ error "assertion:Expected '$value' to not match '$pattern' $context"
}
}
proc assert_match {pattern value} {
if {![string match $pattern $value]} {
- error "assertion:Expected '$value' to match '$pattern'"
+ set context "(context: [info frame -1])"
+ error "assertion:Expected '$value' to match '$pattern' $context"
}
}
-proc assert_equal {expected value {detail ""}} {
+proc assert_equal {value expected {detail ""}} {
if {$expected ne $value} {
if {$detail ne ""} {
- set detail " (detail: $detail)"
+ set detail "(detail: $detail)"
+ } else {
+ set detail "(context: [info frame -1])"
+ }
+ error "assertion:Expected '$value' to be equal to '$expected' $detail"
+ }
+}
+
+proc assert_lessthan {value expected {detail ""}} {
+ if {!($value < $expected)} {
+ if {$detail ne ""} {
+ set detail "(detail: $detail)"
+ } else {
+ set detail "(context: [info frame -1])"
+ }
+ error "assertion:Expected '$value' to be lessthan to '$expected' $detail"
+ }
+}
+
+proc assert_range {value min max {detail ""}} {
+ if {!($value <= $max && $value >= $min)} {
+ if {$detail ne ""} {
+ set detail "(detail: $detail)"
+ } else {
+ set detail "(context: [info frame -1])"
}
- error "assertion:Expected '$value' to be equal to '$expected'$detail"
+ error "assertion:Expected '$value' to be between to '$min' and '$max' $detail"
}
}
diff --git a/tests/unit/moduleapi/blockonkeys.tcl b/tests/unit/moduleapi/blockonkeys.tcl
new file mode 100644
index 000000000..cb99ab1c9
--- /dev/null
+++ b/tests/unit/moduleapi/blockonkeys.tcl
@@ -0,0 +1,85 @@
+set testmodule [file normalize tests/modules/blockonkeys.so]
+
+start_server {tags {"modules"}} {
+ r module load $testmodule
+
+ test {Module client blocked on keys (no metadata): No block} {
+ r del k
+ r fsl.push k 33
+ r fsl.push k 34
+ r fsl.bpop2 k 0
+ } {34 33}
+
+ test {Module client blocked on keys (no metadata): Timeout} {
+ r del k
+ set rd [redis_deferring_client]
+ r fsl.push k 33
+ $rd fsl.bpop2 k 1
+ assert_equal {Request timedout} [$rd read]
+ }
+
+ test {Module client blocked on keys (no metadata): Blocked, case 1} {
+ r del k
+ set rd [redis_deferring_client]
+ r fsl.push k 33
+ $rd fsl.bpop2 k 0
+ r fsl.push k 34
+ assert_equal {34 33} [$rd read]
+ }
+
+ test {Module client blocked on keys (no metadata): Blocked, case 2} {
+ r del k
+ set rd [redis_deferring_client]
+ r fsl.push k 33
+ r fsl.push k 34
+ $rd fsl.bpop2 k 0
+ assert_equal {34 33} [$rd read]
+ }
+
+ test {Module client blocked on keys (with metadata): No block} {
+ r del k
+ r fsl.push k 34
+ r fsl.bpopgt k 30 0
+ } {34}
+
+ test {Module client blocked on keys (with metadata): Timeout} {
+ r del k
+ set rd [redis_deferring_client]
+ r fsl.push k 33
+ $rd fsl.bpopgt k 35 1
+ assert_equal {Request timedout} [$rd read]
+ }
+
+ test {Module client blocked on keys (with metadata): Blocked, case 1} {
+ r del k
+ set rd [redis_deferring_client]
+ r fsl.push k 33
+ $rd fsl.bpopgt k 33 0
+ r fsl.push k 34
+ assert_equal {34} [$rd read]
+ }
+
+ test {Module client blocked on keys (with metadata): Blocked, case 2} {
+ r del k
+ set rd [redis_deferring_client]
+ $rd fsl.bpopgt k 35 0
+ r fsl.push k 33
+ r fsl.push k 34
+ r fsl.push k 35
+ r fsl.push k 36
+ assert_equal {36} [$rd read]
+ }
+
+ test {Module client blocked on keys does not wake up on wrong type} {
+ r del k
+ set rd [redis_deferring_client]
+ $rd fsl.bpop2 k 0
+ r lpush k 12
+ r lpush k 13
+ r lpush k 14
+ r del k
+ r fsl.push k 33
+ r fsl.push k 34
+ assert_equal {34 33} [$rd read]
+ }
+}
diff --git a/tests/unit/moduleapi/datatype.tcl b/tests/unit/moduleapi/datatype.tcl
new file mode 100644
index 000000000..c1da696d3
--- /dev/null
+++ b/tests/unit/moduleapi/datatype.tcl
@@ -0,0 +1,27 @@
+set testmodule [file normalize tests/modules/datatype.so]
+
+start_server {tags {"modules"}} {
+ r module load $testmodule
+
+ test {DataType: Test module is sane, GET/SET work.} {
+ r datatype.set dtkey 100 stringval
+ assert {[r datatype.get dtkey] eq {100 stringval}}
+ }
+
+ test {DataType: RM_SaveDataTypeToString(), RM_LoadDataTypeFromString() work} {
+ r datatype.set dtkey -1111 MyString
+ set encoded [r datatype.dump dtkey]
+
+ r datatype.restore dtkeycopy $encoded
+ assert {[r datatype.get dtkeycopy] eq {-1111 MyString}}
+ }
+
+ test {DataType: Handle truncated RM_LoadDataTypeFromString()} {
+ r datatype.set dtkey -1111 MyString
+ set encoded [r datatype.dump dtkey]
+ set truncated [string range $encoded 0 end-1]
+
+ catch {r datatype.restore dtkeycopy $truncated} e
+ set e
+ } {*Invalid*}
+}
diff --git a/tests/unit/moduleapi/hooks.tcl b/tests/unit/moduleapi/hooks.tcl
index 7a727902d..cbca9c3eb 100644
--- a/tests/unit/moduleapi/hooks.tcl
+++ b/tests/unit/moduleapi/hooks.tcl
@@ -3,26 +3,138 @@ set testmodule [file normalize tests/modules/hooks.so]
tags "modules" {
start_server {} {
r module load $testmodule
+ r config set appendonly yes
+
test {Test clients connection / disconnection hooks} {
for {set j 0} {$j < 2} {incr j} {
set rd1 [redis_deferring_client]
$rd1 close
}
- assert {[r llen connected] > 1}
- assert {[r llen disconnected] > 1}
+ assert {[r hooks.event_count client-connected] > 1}
+ assert {[r hooks.event_count client-disconnected] > 1}
+ }
+
+ test {Test module cron hook} {
+ after 100
+ assert {[r hooks.event_count cron-loop] > 0}
+ set hz [r hooks.event_last cron-loop]
+ assert_equal $hz 10
+ }
+
+ test {Test module loaded / unloaded hooks} {
+ set othermodule [file normalize tests/modules/infotest.so]
+ r module load $othermodule
+ r module unload infotest
+ assert_equal [r hooks.event_last module-loaded] "infotest"
+ assert_equal [r hooks.event_last module-unloaded] "infotest"
+ }
+
+ test {Test module aofrw hook} {
+ r debug populate 1000 foo 10000 ;# 10mb worth of data
+ r config set rdbcompression no ;# rdb progress is only checked once in 2mb
+ r BGREWRITEAOF
+ waitForBgrewriteaof r
+ assert_equal [string match {*module-event-persistence-aof-start*} [exec tail -20 < [srv 0 stdout]]] 1
+ assert_equal [string match {*module-event-persistence-end*} [exec tail -20 < [srv 0 stdout]]] 1
+ }
+
+ test {Test module aof load and rdb/aof progress hooks} {
+ # create some aof tail (progress is checked only once in 1000 commands)
+ for {set j 0} {$j < 4000} {incr j} {
+ r set "bar$j" x
+ }
+ # set some configs that will cause many loading progress events during aof loading
+ r config set key-load-delay 1
+ r config set dynamic-hz no
+ r config set hz 500
+ r DEBUG LOADAOF
+ assert_equal [r hooks.event_last loading-aof-start] 0
+ assert_equal [r hooks.event_last loading-end] 0
+ assert {[r hooks.event_count loading-rdb-start] == 0}
+ assert {[r hooks.event_count loading-progress-rdb] >= 2} ;# comes from the preamble section
+ assert {[r hooks.event_count loading-progress-aof] >= 2}
+ }
+ # undo configs before next test
+ r config set dynamic-hz yes
+ r config set key-load-delay 0
+
+ test {Test module rdb save hook} {
+ # debug reload does: save, flush, load:
+ assert {[r hooks.event_count persistence-syncrdb-start] == 0}
+ assert {[r hooks.event_count loading-rdb-start] == 0}
+ r debug reload
+ assert {[r hooks.event_count persistence-syncrdb-start] == 1}
+ assert {[r hooks.event_count loading-rdb-start] == 1}
}
test {Test flushdb hooks} {
- r flushall ;# Note: only the "end" RPUSH will survive
- r select 1
- r flushdb
- r select 2
r flushdb
- r select 9
- assert {[r llen flush-start] == 2}
- assert {[r llen flush-end] == 3}
- assert {[r lrange flush-start 0 -1] eq {1 2}}
- assert {[r lrange flush-end 0 -1] eq {-1 1 2}}
+ assert_equal [r hooks.event_last flush-start] 9
+ assert_equal [r hooks.event_last flush-end] 9
+ r flushall
+ assert_equal [r hooks.event_last flush-start] -1
+ assert_equal [r hooks.event_last flush-end] -1
+ }
+
+ # replication related tests
+ set master [srv 0 client]
+ set master_host [srv 0 host]
+ set master_port [srv 0 port]
+ start_server {} {
+ r module load $testmodule
+ set replica [srv 0 client]
+ set replica_host [srv 0 host]
+ set replica_port [srv 0 port]
+ $replica replicaof $master_host $master_port
+
+ wait_for_condition 50 100 {
+ [string match {*master_link_status:up*} [r info replication]]
+ } else {
+ fail "Can't turn the instance into a replica"
+ }
+
+ test {Test master link up hook} {
+ assert_equal [r hooks.event_count masterlink-up] 1
+ assert_equal [r hooks.event_count masterlink-down] 0
+ }
+
+ test {Test role-replica hook} {
+ assert_equal [r hooks.event_count role-replica] 1
+ assert_equal [r hooks.event_count role-master] 0
+ assert_equal [r hooks.event_last role-replica] [s 0 master_host]
+ }
+
+ test {Test replica-online hook} {
+ assert_equal [r -1 hooks.event_count replica-online] 1
+ assert_equal [r -1 hooks.event_count replica-offline] 0
+ }
+
+ test {Test master link down hook} {
+ r client kill type master
+ assert_equal [r hooks.event_count masterlink-down] 1
+ }
+
+ $replica replicaof no one
+
+ test {Test role-master hook} {
+ assert_equal [r hooks.event_count role-replica] 1
+ assert_equal [r hooks.event_count role-master] 1
+ assert_equal [r hooks.event_last role-master] {}
+ }
+
+ test {Test replica-offline hook} {
+ assert_equal [r -1 hooks.event_count replica-online] 1
+ assert_equal [r -1 hooks.event_count replica-offline] 1
+ }
+ # get the replica stdout, to be used by the next test
+ set replica_stdout [srv 0 stdout]
}
+
+
+ # look into the log file of the server that just exited
+ test {Test shutdown hook} {
+ assert_equal [string match {*module-event-shutdown*} [exec tail -5 < $replica_stdout]] 1
+ }
+
}
}
diff --git a/tests/unit/moduleapi/misc.tcl b/tests/unit/moduleapi/misc.tcl
index d392aeab0..748016f1a 100644
--- a/tests/unit/moduleapi/misc.tcl
+++ b/tests/unit/moduleapi/misc.tcl
@@ -16,4 +16,55 @@ start_server {tags {"modules"}} {
assert { [string match "*cmdstat_module*" $info] }
}
+ test {test long double conversions} {
+ set ld [r test.ld_conversion]
+ assert {[string match $ld "0.00000000000000001"]}
+ }
+
+ test {test module db commands} {
+ r set x foo
+ set key [r test.randomkey]
+ assert_equal $key "x"
+ assert_equal [r test.dbsize] 1
+ r test.flushall
+ assert_equal [r test.dbsize] 0
+ }
+
+ test {test modle lru api} {
+ r config set maxmemory-policy allkeys-lru
+ r set x foo
+ set lru [r test.getlru x]
+ assert { $lru <= 1000 }
+ set was_set [r test.setlru x 100000]
+ assert { $was_set == 1 }
+ set idle [r object idletime x]
+ assert { $idle >= 100 }
+ set lru [r test.getlru x]
+ assert { $lru >= 100000 }
+ r config set maxmemory-policy allkeys-lfu
+ set lru [r test.getlru x]
+ assert { $lru == -1 }
+ set was_set [r test.setlru x 100000]
+ assert { $was_set == 0 }
+ }
+ r config set maxmemory-policy allkeys-lru
+
+ test {test modle lfu api} {
+ r config set maxmemory-policy allkeys-lfu
+ r set x foo
+ set lfu [r test.getlfu x]
+ assert { $lfu >= 1 }
+ set was_set [r test.setlfu x 100]
+ assert { $was_set == 1 }
+ set freq [r object freq x]
+ assert { $freq <= 100 }
+ set lfu [r test.getlfu x]
+ assert { $lfu <= 100 }
+ r config set maxmemory-policy allkeys-lru
+ set lfu [r test.getlfu x]
+ assert { $lfu == -1 }
+ set was_set [r test.setlfu x 100]
+ assert { $was_set == 0 }
+ }
+
}
diff --git a/tests/unit/moduleapi/scan.tcl b/tests/unit/moduleapi/scan.tcl
new file mode 100644
index 000000000..de1672e0a
--- /dev/null
+++ b/tests/unit/moduleapi/scan.tcl
@@ -0,0 +1,47 @@
+set testmodule [file normalize tests/modules/scan.so]
+
+start_server {tags {"modules"}} {
+ r module load $testmodule
+
+ test {Module scan keyspace} {
+ # the module create a scan command with filtering which also return values
+ r set x 1
+ r set y 2
+ r set z 3
+ r hset h f v
+ lsort [r scan.scan_strings]
+ } {{x 1} {y 2} {z 3}}
+
+ test {Module scan hash ziplist} {
+ r hmset hh f1 v1 f2 v2
+ lsort [r scan.scan_key hh]
+ } {{f1 v1} {f2 v2}}
+
+ test {Module scan hash dict} {
+ r config set hash-max-ziplist-entries 2
+ r hmset hh f3 v3
+ lsort [r scan.scan_key hh]
+ } {{f1 v1} {f2 v2} {f3 v3}}
+
+ test {Module scan zset ziplist} {
+ r zadd zz 1 f1 2 f2
+ lsort [r scan.scan_key zz]
+ } {{f1 1} {f2 2}}
+
+ test {Module scan zset dict} {
+ r config set zset-max-ziplist-entries 2
+ r zadd zz 3 f3
+ lsort [r scan.scan_key zz]
+ } {{f1 1} {f2 2} {f3 3}}
+
+ test {Module scan set intset} {
+ r sadd ss 1 2
+ lsort [r scan.scan_key ss]
+ } {{1 {}} {2 {}}}
+
+ test {Module scan set dict} {
+ r config set set-max-intset-entries 2
+ r sadd ss 3
+ lsort [r scan.scan_key ss]
+ } {{1 {}} {2 {}} {3 {}}}
+}
diff --git a/tests/unit/scripting.tcl b/tests/unit/scripting.tcl
index b3e1c48e6..2543a0377 100644
--- a/tests/unit/scripting.tcl
+++ b/tests/unit/scripting.tcl
@@ -536,7 +536,7 @@ foreach cmdrepl {0 1} {
start_server {tags {"scripting repl"}} {
start_server {} {
if {$cmdrepl == 1} {
- set rt "(commmands replication)"
+ set rt "(commands replication)"
} else {
set rt "(scripts replication)"
r debug lua-always-replicate-commands 1
diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl
index a7415ae8d..656bac5de 100644
--- a/tests/unit/type/stream.tcl
+++ b/tests/unit/type/stream.tcl
@@ -79,6 +79,12 @@ start_server {
assert {[streamCompareID $id2 $id3] == -1}
}
+ test {XADD IDs correctly report an error when overflowing} {
+ r DEL mystream
+ r xadd mystream 18446744073709551615-18446744073709551615 a b
+ assert_error ERR* {r xadd mystream * c d}
+ }
+
test {XADD with MAXLEN option} {
r DEL mystream
for {set j 0} {$j < 1000} {incr j} {
@@ -117,6 +123,12 @@ start_server {
assert {[r xlen mystream] == $j}
}
+ test {XADD with ID 0-0} {
+ r DEL otherstream
+ catch {r XADD otherstream 0-0 k v} err
+ assert {[r EXISTS otherstream] == 0}
+ }
+
test {XRANGE COUNT works as expected} {
assert {[llength [r xrange mystream - + COUNT 10]] == 10}
}