From 412eb67d21705ca31162d614fb8e3a6a568ef5ed Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Wed, 29 May 2019 14:21:47 +0800 Subject: aof: fix assignment for aof_fsync_offset Signed-off-by: Yuan Zhou --- src/aof.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/aof.c b/src/aof.c index 4744847d2..c8fb8e8f6 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1768,7 +1768,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { server.aof_selected_db = -1; /* Make sure SELECT is re-issued */ aofUpdateCurrentSize(); server.aof_rewrite_base_size = server.aof_current_size; - server.aof_current_size = server.aof_current_size; + server.aof_fsync_offset = server.aof_current_size; /* Clear regular AOF buffer since its contents was just written to * the new AOF from the background rewrite buffer. */ -- cgit v1.2.1 From e978bdf9ef62404083afed28d98e7a455bcecd3b Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Wed, 23 Oct 2019 11:53:15 +0300 Subject: Module API for controlling LRU and LFU, and OpenKey without TOUCH Some commands would want to open a key without touching it's LRU/LFU similarly to the OBJECT or DEBUG command do. Other commands may want to implement logic similar to what RESTORE does (and in the future MIGRATE) and get/set the LRU or LFU. --- src/db.c | 8 ++++++-- src/module.c | 39 +++++++++++++++++++++++++++++++++++++-- src/object.c | 5 ++++- src/redismodule.h | 8 ++++++++ src/server.h | 3 ++- 5 files changed, 57 insertions(+), 6 deletions(-) diff --git a/src/db.c b/src/db.c index 2c0a0cdd3..14f163a8f 100644 --- a/src/db.c +++ b/src/db.c @@ -151,9 +151,13 @@ robj *lookupKeyRead(redisDb *db, robj *key) { * * Returns the linked value object if the key exists or NULL if the key * does not exist in the specified DB. */ -robj *lookupKeyWrite(redisDb *db, robj *key) { +robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) { expireIfNeeded(db,key); - return lookupKey(db,key,LOOKUP_NONE); + return lookupKey(db,key,flags); +} + +robj *lookupKeyWrite(redisDb *db, robj *key) { + return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE); } robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) { diff --git a/src/module.c b/src/module.c index 2a1bda879..09cf6ac7a 100644 --- a/src/module.c +++ b/src/module.c @@ -1830,11 +1830,12 @@ int RM_SelectDb(RedisModuleCtx *ctx, int newid) { void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) { RedisModuleKey *kp; robj *value; + int flags = mode & REDISMODULE_OPEN_KEY_NOTOUCH? LOOKUP_NOTOUCH: 0; if (mode & REDISMODULE_WRITE) { - value = lookupKeyWrite(ctx->client->db,keyname); + value = lookupKeyWriteWithFlags(ctx->client->db,keyname, flags); } else { - value = lookupKeyRead(ctx->client->db,keyname); + value = lookupKeyReadWithFlags(ctx->client->db,keyname, flags); if (value == NULL) { return NULL; } @@ -6397,6 +6398,38 @@ size_t moduleCount(void) { return dictSize(modules); } +/* Set the key LRU/LFU depending on server.maxmemory_policy. + * The lru_idle arg is idle time in seconds, and is only relevant if the + * eviction policy is LRU based. + * The lfu_freq arg is a logarithmic counter that provides an indication of + * the access frequencyonly (must be <= 255) and is only relevant if the + * eviction policy is LFU based. + * Either or both of them may be <0, in that case, nothing is set. */ +/* return value is an indication if the lru field was updated or not. */ +int RM_SetLRUOrLFU(RedisModuleKey *key, long long lfu_freq, long long lru_idle) { + if (!key->value) + return REDISMODULE_ERR; + if (objectSetLRUOrLFU(key->value, lfu_freq, lru_idle, lru_idle>=0 ? LRU_CLOCK() : 0)) + return REDISMODULE_OK; + return REDISMODULE_ERR; +} + +/* Gets the key LRU or LFU (depending on the current eviction policy). + * One will be set to the appropiate return value, and the other will be set to -1. + * see RedisModule_SetLRUOrLFU for units and ranges. + * return value is an indication of success. */ +int RM_GetLRUOrLFU(RedisModuleKey *key, long long *lfu_freq, long long *lru_idle) { + *lru_idle = *lfu_freq = -1; + if (!key->value) + return REDISMODULE_ERR; + if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { + *lfu_freq = LFUDecrAndReturn(key->value); + } else { + *lru_idle = estimateObjectIdleTime(key->value)/1000; + } + return REDISMODULE_OK; +} + /* Register all the APIs we export. Keep this function at the end of the * file so that's easy to seek it to add new entries. */ void moduleRegisterCoreAPI(void) { @@ -6589,4 +6622,6 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(InfoAddFieldULongLong); REGISTER_API(GetClientInfoById); REGISTER_API(SubscribeToServerEvent); + REGISTER_API(SetLRUOrLFU); + REGISTER_API(GetLRUOrLFU); } diff --git a/src/object.c b/src/object.c index 70022f897..47b21ae1f 100644 --- a/src/object.c +++ b/src/object.c @@ -1209,12 +1209,13 @@ sds getMemoryDoctorReport(void) { * The lru_idle and lru_clock args are only relevant if policy * is MAXMEMORY_FLAG_LRU. * Either or both of them may be <0, in that case, nothing is set. */ -void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, +int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, long long lru_clock) { if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { if (lfu_freq >= 0) { serverAssert(lfu_freq <= 255); val->lru = (LFUGetTimeInMinutes()<<8) | lfu_freq; + return 1; } } else if (lru_idle >= 0) { /* Provided LRU idle time is in seconds. Scale @@ -1231,7 +1232,9 @@ void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, if (lru_abs < 0) lru_abs = (lru_clock+(LRU_CLOCK_MAX/2)) % LRU_CLOCK_MAX; val->lru = lru_abs; + return 1; } + return 0; } /* ======================= The OBJECT and MEMORY commands =================== */ diff --git a/src/redismodule.h b/src/redismodule.h index 7053840b2..96e9fb4fa 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -18,6 +18,10 @@ #define REDISMODULE_READ (1<<0) #define REDISMODULE_WRITE (1<<1) +/* RedisModule_OpenKey extra flags for the 'mode' argument. + * Avoid touching the LRU/LFU of the key when opened. */ +#define REDISMODULE_OPEN_KEY_NOTOUCH (1<<16) + #define REDISMODULE_LIST_HEAD 0 #define REDISMODULE_LIST_TAIL 1 @@ -503,6 +507,8 @@ int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldDouble)(RedisModuleInfoCtx *ctx int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldLongLong)(RedisModuleInfoCtx *ctx, char *field, long long value); int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldULongLong)(RedisModuleInfoCtx *ctx, char *field, unsigned long long value); int REDISMODULE_API_FUNC(RedisModule_SubscribeToServerEvent)(RedisModuleCtx *ctx, RedisModuleEvent event, RedisModuleEventCallback callback); +int REDISMODULE_API_FUNC(RedisModule_SetLRUOrLFU)(RedisModuleKey *key, long long lfu_freq, long long lru_idle); +int REDISMODULE_API_FUNC(RedisModule_GetLRUOrLFU)(RedisModuleKey *key, long long *lfu_freq, long long *lru_idle); /* Experimental APIs */ #ifdef REDISMODULE_EXPERIMENTAL_API @@ -703,6 +709,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(InfoAddFieldULongLong); REDISMODULE_GET_API(GetClientInfoById); REDISMODULE_GET_API(SubscribeToServerEvent); + REDISMODULE_GET_API(SetLRUOrLFU); + REDISMODULE_GET_API(GetLRUOrLFU); #ifdef REDISMODULE_EXPERIMENTAL_API REDISMODULE_GET_API(GetThreadSafeContext); diff --git a/src/server.h b/src/server.h index 97672d727..30b25a918 100644 --- a/src/server.h +++ b/src/server.h @@ -2079,9 +2079,10 @@ robj *lookupKeyWrite(redisDb *db, robj *key); robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply); robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply); robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags); +robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags); robj *objectCommandLookup(client *c, robj *key); robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply); -void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, +int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, long long lru_clock); #define LOOKUP_NONE 0 #define LOOKUP_NOTOUCH (1<<0) -- cgit v1.2.1 From a0cfd519e3cbbb4c432d25094e4cb202631f131f Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Tue, 29 Oct 2019 17:29:06 +0200 Subject: test infra: improve prints on failed assertions sometimes we have several assertions with the same condition in the same test at different stages, and when these fail (the ones that print the condition text) you don't know which one it was. other assertions didn't print the condition text (variable names), just the expected and unexpected values. So now, all assertions print context line, and conditin text. besides, one of the major differences between 'assert' and 'assert_equal', is that the later is able to print the value that doesn't match the expected. if there is a rare non-reproducible failure, it is helpful to know what was the value the test encountered and how far it was from the threshold. So now, adding assert_lessthan and assert_range that can be used in some places. were we used just 'assert { a > b }' so far. --- tests/support/test.tcl | 39 +++++++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/tests/support/test.tcl b/tests/support/test.tcl index 2646acecd..5e8916236 100644 --- a/tests/support/test.tcl +++ b/tests/support/test.tcl @@ -11,28 +11,55 @@ proc fail {msg} { proc assert {condition} { if {![uplevel 1 [list expr $condition]]} { - error "assertion:Expected condition '$condition' to be true ([uplevel 1 [list subst -nocommands $condition]])" + set context "(context: [info frame -1])" + error "assertion:Expected [uplevel 1 [list subst -nocommands $condition]] $context" } } proc assert_no_match {pattern value} { if {[string match $pattern $value]} { - error "assertion:Expected '$value' to not match '$pattern'" + set context "(context: [info frame -1])" + error "assertion:Expected '$value' to not match '$pattern' $context" } } proc assert_match {pattern value} { if {![string match $pattern $value]} { - error "assertion:Expected '$value' to match '$pattern'" + set context "(context: [info frame -1])" + error "assertion:Expected '$value' to match '$pattern' $context" } } -proc assert_equal {expected value {detail ""}} { +proc assert_equal {value expected {detail ""}} { if {$expected ne $value} { if {$detail ne ""} { - set detail " (detail: $detail)" + set detail "(detail: $detail)" + } else { + set detail "(context: [info frame -1])" + } + error "assertion:Expected '$value' to be equal to '$expected' $detail" + } +} + +proc assert_lessthan {value expected {detail ""}} { + if {!($value < $expected)} { + if {$detail ne ""} { + set detail "(detail: $detail)" + } else { + set detail "(context: [info frame -1])" + } + error "assertion:Expected '$value' to be lessthan to '$expected' $detail" + } +} + +proc assert_range {value min max {detail ""}} { + if {!($value <= $max && $value >= $min)} { + if {$detail ne ""} { + set detail "(detail: $detail)" + } else { + set detail "(context: [info frame -1])" } - error "assertion:Expected '$value' to be equal to '$expected'$detail" + error "assertion:Expected '$value' to be between to '$min' and '$max' $detail" } } -- cgit v1.2.1 From 51c3ff8d75d9c784297ad587a31a37acd89499d8 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Tue, 29 Oct 2019 17:59:09 +0200 Subject: Modules hooks: complete missing hooks for the initial set of hooks * replication hooks: role change, master link status, replica online/offline * persistence hooks: saving, loading, loading progress * misc hooks: cron loop, shutdown, module loaded/unloaded * change the way hooks test work, and add tests for all of the above startLoading() now gets flag indicating what is loaded. stopLoading() now gets an indication of success or failure. adding startSaving() and stopSaving() with similar args and role. --- src/aof.c | 14 ++- src/debug.c | 2 +- src/module.c | 161 ++++++++++++++++++++++---- src/networking.c | 5 + src/rdb.c | 80 ++++++++++--- src/rdb.h | 11 +- src/redis-check-rdb.c | 6 +- src/redismodule.h | 86 ++++++++++++-- src/replication.c | 55 ++++++++- src/server.c | 11 +- src/server.h | 10 +- tests/modules/hooks.c | 256 +++++++++++++++++++++++++++++++++++++++-- tests/unit/moduleapi/hooks.tcl | 134 +++++++++++++++++++-- 13 files changed, 737 insertions(+), 94 deletions(-) diff --git a/src/aof.c b/src/aof.c index 0e3648ff0..dda9579e3 100644 --- a/src/aof.c +++ b/src/aof.c @@ -731,7 +731,7 @@ int loadAppendOnlyFile(char *filename) { server.aof_state = AOF_OFF; fakeClient = createAOFClient(); - startLoadingFile(fp, filename); + startLoadingFile(fp, filename, RDBFLAGS_AOF_PREAMBLE); /* Check if this AOF file has an RDB preamble. In that case we need to * load the RDB file and later continue loading the AOF tail. */ @@ -746,7 +746,7 @@ int loadAppendOnlyFile(char *filename) { serverLog(LL_NOTICE,"Reading RDB preamble from AOF file..."); if (fseek(fp,0,SEEK_SET) == -1) goto readerr; rioInitWithFile(&rdb,fp); - if (rdbLoadRio(&rdb,NULL,1) != C_OK) { + if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) { serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted"); goto readerr; } else { @@ -767,6 +767,7 @@ int loadAppendOnlyFile(char *filename) { if (!(loops++ % 1000)) { loadingProgress(ftello(fp)); processEventsWhileBlocked(); + processModuleLoadingProgressEvent(1); } if (fgets(buf,sizeof(buf),fp) == NULL) { @@ -859,7 +860,7 @@ loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */ fclose(fp); freeFakeClient(fakeClient); server.aof_state = old_aof_state; - stopLoading(); + stopLoading(1); aofUpdateCurrentSize(); server.aof_rewrite_base_size = server.aof_current_size; server.aof_fsync_offset = server.aof_current_size; @@ -1400,9 +1401,11 @@ int rewriteAppendOnlyFile(char *filename) { if (server.aof_rewrite_incremental_fsync) rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES); + startSaving(RDBFLAGS_AOF_PREAMBLE); + if (server.aof_use_rdb_preamble) { int error; - if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) { + if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) { errno = error; goto werr; } @@ -1465,15 +1468,18 @@ int rewriteAppendOnlyFile(char *filename) { if (rename(tmpfile,filename) == -1) { serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno)); unlink(tmpfile); + stopSaving(0); return C_ERR; } serverLog(LL_NOTICE,"SYNC append only file rewrite performed"); + stopSaving(1); return C_OK; werr: serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno)); fclose(fp); unlink(tmpfile); + stopSaving(0); return C_ERR; } diff --git a/src/debug.c b/src/debug.c index 179f6d2c9..a2d37337d 100644 --- a/src/debug.c +++ b/src/debug.c @@ -417,7 +417,7 @@ NULL } emptyDb(-1,EMPTYDB_NO_FLAGS,NULL); protectClient(c); - int ret = rdbLoad(server.rdb_filename,NULL); + int ret = rdbLoad(server.rdb_filename,NULL,RDBFLAGS_NONE); unprotectClient(c); if (ret != C_OK) { addReplyError(c,"Error trying to load the RDB dump"); diff --git a/src/module.c b/src/module.c index 2a1bda879..57e388881 100644 --- a/src/module.c +++ b/src/module.c @@ -1620,6 +1620,27 @@ int modulePopulateClientInfoStructure(void *ci, client *client, int structver) { return REDISMODULE_OK; } +/* This is an helper for moduleFireServerEvent() and other functions: + * It populates the replication info structure with the appropriate + * fields depending on the version provided. If the version is not valid + * then REDISMODULE_ERR is returned. Otherwise the function returns + * REDISMODULE_OK and the structure pointed by 'ri' gets populated. */ +int modulePopulateReplicationInfoStructure(void *ri, int structver) { + if (structver != 1) return REDISMODULE_ERR; + + RedisModuleReplicationInfoV1 *ri1 = ri; + memset(ri1,0,sizeof(*ri1)); + ri1->version = structver; + ri1->master = server.masterhost==NULL; + ri1->masterhost = server.masterhost? server.masterhost: ""; + ri1->masterport = server.masterport; + ri1->replid1 = server.replid; + ri1->replid2 = server.replid2; + ri1->repl1_offset = server.master_repl_offset; + ri1->repl2_offset = server.second_replid_offset; + return REDISMODULE_OK; +} + /* Return information about the client with the specified ID (that was * previously obtained via the RedisModule_GetClientId() API). If the * client exists, REDISMODULE_OK is returned, otherwise REDISMODULE_ERR @@ -5780,8 +5801,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * * The following sub events are available: * - * REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER - * REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA + * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_MASTER + * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_REPLICA * * The 'data' field can be casted by the callback to a * RedisModuleReplicationInfo structure with the following fields: @@ -5791,24 +5812,30 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * int masterport; // master instance port for NOW_REPLICA * char *replid1; // Main replication ID * char *replid2; // Secondary replication ID + * uint64_t repl1_offset; // Main replication offset * uint64_t repl2_offset; // Offset of replid2 validity - * uint64_t main_repl_offset; // Replication offset * * RedisModuleEvent_Persistence * * This event is called when RDB saving or AOF rewriting starts * and ends. The following sub events are available: * - * REDISMODULE_EVENT_LOADING_RDB_START // BGSAVE start - * REDISMODULE_EVENT_LOADING_RDB_END // BGSAVE end - * REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE start - * REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE end - * REDISMODULE_EVENT_LOADING_AOF_START // AOF rewrite start - * REDISMODULE_EVENT_LOADING_AOF_END // AOF rewrite end + * REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START + * REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START + * REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START + * REDISMODULE_SUBEVENT_PERSISTENCE_ENDED + * REDISMODULE_SUBEVENT_PERSISTENCE_FAILED * * The above events are triggered not just when the user calls the * relevant commands like BGSAVE, but also when a saving operation * or AOF rewriting occurs because of internal server triggers. + * The SYNC_RDB_START sub events are happening in the forground due to + * SAVE command, FLUSHALL, or server shutdown, and the other RDB and + * AOF sub events are executed in a background fork child, so any + * action the module takes can only affect the generated AOF or RDB, + * but will not be reflected in the parent process and affect connected + * clients and commands. Also note that the AOF_START sub event may end + * up saving RDB content in case of an AOF with rdb-preamble. * * RedisModuleEvent_FlushDB * @@ -5816,8 +5843,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * because of replication, after the replica synchronization) * happened. The following sub events are available: * - * REDISMODULE_EVENT_FLUSHDB_START - * REDISMODULE_EVENT_FLUSHDB_END + * REDISMODULE_SUBEVENT_FLUSHDB_START + * REDISMODULE_SUBEVENT_FLUSHDB_END * * The data pointer can be casted to a RedisModuleFlushInfo * structure with the following fields: @@ -5841,12 +5868,15 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * replica is loading the RDB file from the master. * The following sub events are available: * - * REDISMODULE_EVENT_LOADING_RDB_START - * REDISMODULE_EVENT_LOADING_RDB_END - * REDISMODULE_EVENT_LOADING_MASTER_RDB_START - * REDISMODULE_EVENT_LOADING_MASTER_RDB_END - * REDISMODULE_EVENT_LOADING_AOF_START - * REDISMODULE_EVENT_LOADING_AOF_END + * REDISMODULE_SUBEVENT_LOADING_RDB_START + * REDISMODULE_SUBEVENT_LOADING_AOF_START + * REDISMODULE_SUBEVENT_LOADING_REPL_START + * REDISMODULE_SUBEVENT_LOADING_ENDED + * REDISMODULE_SUBEVENT_LOADING_FAILED + * + * Note that AOF loading may start with an RDB data in case of + * rdb-preamble, in which case you'll only recieve an AOF_START event. + * * * RedisModuleEvent_ClientChange * @@ -5855,8 +5885,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * structure, documented in RedisModule_GetClientInfoById(). * The following sub events are available: * - * REDISMODULE_EVENT_CLIENT_CHANGE_CONNECTED - * REDISMODULE_EVENT_CLIENT_CHANGE_DISCONNECTED + * REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED + * REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED * * RedisModuleEvent_Shutdown * @@ -5869,8 +5899,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * replica since it gets disconnected. * The following sub events are availble: * - * REDISMODULE_EVENT_REPLICA_CHANGE_ONLINE - * REDISMODULE_EVENT_REPLICA_CHANGE_OFFLINE + * REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE + * REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE * * No additional information is available so far: future versions * of Redis will have an API in order to enumerate the replicas @@ -5885,6 +5915,11 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * this changes depending on the "hz" configuration. * No sub events are available. * + * The data pointer can be casted to a RedisModuleCronLoop + * structure with the following fields: + * + * int32_t hz; // Approximate number of events per second. + * * RedisModuleEvent_MasterLinkChange * * This is called for replicas in order to notify when the @@ -5894,8 +5929,38 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * replication is happening correctly. * The following sub events are available: * - * REDISMODULE_EVENT_MASTER_LINK_UP - * REDISMODULE_EVENT_MASTER_LINK_DOWN + * REDISMODULE_SUBEVENT_MASTER_LINK_UP + * REDISMODULE_SUBEVENT_MASTER_LINK_DOWN + * + * RedisModuleEvent_ModuleChange + * + * This event is called when a new module is loaded or one is unloaded. + * The following sub events are availble: + * + * REDISMODULE_SUBEVENT_MODULE_LOADED + * REDISMODULE_SUBEVENT_MODULE_UNLOADED + * + * The data pointer can be casted to a RedisModuleModuleChange + * structure with the following fields: + * + * const char* module_name; // Name of module loaded or unloaded. + * int32_t module_version; // Module version. + * + * RedisModuleEvent_LoadingProgress + * + * This event is called repeatedly called while an RDB or AOF file + * is being loaded. + * The following sub events are availble: + * + * REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB + * REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF + * + * The data pointer can be casted to a RedisModuleLoadingProgress + * structure with the following fields: + * + * int32_t hz; // Approximate number of events per second. + * int32_t progress; // Approximate progress between 0 and 1024, + * or -1 if unknown. * * The function returns REDISMODULE_OK if the module was successfully subscrived * for the specified event. If the API is called from a wrong context then @@ -5954,7 +6019,7 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) { listRewind(RedisModule_EventListeners,&li); while((ln = listNext(&li))) { RedisModuleEventListener *el = ln->value; - if (el->event.id == eid && !el->module->in_hook) { + if (el->event.id == eid) { RedisModuleCtx ctx = REDISMODULE_CTX_INIT; ctx.module = el->module; @@ -5968,6 +6033,8 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) { void *moduledata = NULL; RedisModuleClientInfoV1 civ1; + RedisModuleReplicationInfoV1 riv1; + RedisModuleModuleChangeV1 mcv1; /* Start at DB zero by default when calling the handler. It's * up to the specific event setup to change it when it makes * sense. For instance for FLUSHDB events we select the correct @@ -5979,11 +6046,26 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) { modulePopulateClientInfoStructure(&civ1,data, el->event.dataver); moduledata = &civ1; + } else if (eid == REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED) { + modulePopulateReplicationInfoStructure(&riv1,el->event.dataver); + moduledata = &riv1; } else if (eid == REDISMODULE_EVENT_FLUSHDB) { moduledata = data; RedisModuleFlushInfoV1 *fi = data; if (fi->dbnum != -1) selectDb(ctx.client, fi->dbnum); + } else if (eid == REDISMODULE_EVENT_MODULE_CHANGE) { + RedisModule *m = data; + if (m == el->module) + continue; + mcv1.version = REDISMODULE_MODULE_CHANGE_VERSION; + mcv1.module_name = m->name; + mcv1.module_version = m->ver; + moduledata = &mcv1; + } else if (eid == REDISMODULE_EVENT_LOADING_PROGRESS) { + moduledata = data; + } else if (eid == REDISMODULE_EVENT_CRON_LOOP) { + moduledata = data; } ModulesInHooks++; @@ -6015,6 +6097,27 @@ void moduleUnsubscribeAllServerEvents(RedisModule *module) { } } +void processModuleLoadingProgressEvent(int is_aof) { + long long now = ustime(); + static long long next_event = 0; + if (now >= next_event) { + /* Fire the loading progress modules end event. */ + int progress = -1; + if (server.loading_total_bytes) + progress = (server.loading_total_bytes<<10) / server.loading_total_bytes; + RedisModuleFlushInfoV1 fi = {REDISMODULE_LOADING_PROGRESS_VERSION, + server.hz, + progress}; + moduleFireServerEvent(REDISMODULE_EVENT_LOADING_PROGRESS, + is_aof? + REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF: + REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB, + &fi); + /* decide when the next event should fire. */ + next_event = now + 1000000 / server.hz; + } +} + /* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */ @@ -6183,6 +6286,11 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) { ctx.module->blocked_clients = 0; ctx.module->handle = handle; serverLog(LL_NOTICE,"Module '%s' loaded from %s",ctx.module->name,path); + /* Fire the loaded modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE, + REDISMODULE_SUBEVENT_MODULE_LOADED, + ctx.module); + moduleFreeContext(&ctx); return C_OK; } @@ -6245,6 +6353,11 @@ int moduleUnload(sds name) { module->name, error); } + /* Fire the unloaded modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE, + REDISMODULE_SUBEVENT_MODULE_UNLOADED, + module); + /* Remove from list of modules. */ serverLog(LL_NOTICE,"Module %s unloaded",module->name); dictDelete(modules,module->name); diff --git a/src/networking.c b/src/networking.c index e7cc561fa..9336c177c 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1118,6 +1118,11 @@ void freeClient(client *c) { if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0) server.repl_no_slaves_since = server.unixtime; refreshGoodSlavesCount(); + /* Fire the replica change modules event. */ + if (c->replstate == SLAVE_STATE_ONLINE) + moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, + REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE, + NULL); } /* Master/slave cleanup Case 2: diff --git a/src/rdb.c b/src/rdb.c index f530219a4..b569edfea 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1080,9 +1080,9 @@ ssize_t rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) { } /* Save a few default AUX fields with information about the RDB generated. */ -int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) { +int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { int redis_bits = (sizeof(void*) == 8) ? 64 : 32; - int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0; + int aof_preamble = (rdbflags & RDBFLAGS_AOF_PREAMBLE) != 0; /* Add a few fields about the state when the RDB was created. */ if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1; @@ -1150,7 +1150,7 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) { * When the function returns C_ERR and if 'error' is not NULL, the * integer pointed by 'error' is set to the value of errno just after the I/O * error. */ -int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { +int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { dictIterator *di = NULL; dictEntry *de; char magic[10]; @@ -1162,7 +1162,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { rdb->update_cksum = rioGenericUpdateChecksum; snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; - if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr; + if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr; if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr; for (j = 0; j < server.dbnum; j++) { @@ -1199,7 +1199,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { /* When this RDB is produced as part of an AOF rewrite, move * accumulated diff from parent to child while rewriting in * order to have a smaller final write. */ - if (flags & RDB_SAVE_AOF_PREAMBLE && + if (rdbflags & RDBFLAGS_AOF_PREAMBLE && rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) { processed = rdb->processed_bytes; @@ -1254,18 +1254,21 @@ werr: int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) { char eofmark[RDB_EOF_MARK_SIZE]; + startSaving(RDBFLAGS_REPLICATION); getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE); if (error) *error = 0; if (rioWrite(rdb,"$EOF:",5) == 0) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; if (rioWrite(rdb,"\r\n",2) == 0) goto werr; - if (rdbSaveRio(rdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr; + if (rdbSaveRio(rdb,error,RDBFLAGS_NONE,rsi) == C_ERR) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; + stopSaving(1); return C_OK; werr: /* Write error. */ /* Set 'error' only if not already set by rdbSaveRio() call. */ if (error && *error == 0) *error = errno; + stopSaving(0); return C_ERR; } @@ -1291,11 +1294,12 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) { } rioInitWithFile(&rdb,fp); + startSaving(RDBFLAGS_NONE); if (server.rdb_save_incremental_fsync) rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES); - if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) { + if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) { errno = error; goto werr; } @@ -1317,6 +1321,7 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) { cwdp ? cwdp : "unknown", strerror(errno)); unlink(tmpfile); + stopSaving(0); return C_ERR; } @@ -1324,12 +1329,14 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) { server.dirty = 0; server.lastsave = time(NULL); server.lastbgsave_status = C_OK; + stopSaving(1); return C_OK; werr: serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno)); fclose(fp); unlink(tmpfile); + stopSaving(0); return C_ERR; } @@ -1918,23 +1925,33 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { /* Mark that we are loading in the global state and setup the fields * needed to provide loading stats. */ -void startLoading(size_t size) { +void startLoading(size_t size, int rdbflags) { /* Load the DB */ server.loading = 1; server.loading_start_time = time(NULL); server.loading_loaded_bytes = 0; server.loading_total_bytes = size; + + /* Fire the loading modules start event. */ + int subevent; + if (rdbflags & RDBFLAGS_AOF_PREAMBLE) + subevent = REDISMODULE_SUBEVENT_LOADING_AOF_START; + else if(rdbflags & RDBFLAGS_REPLICATION) + subevent = REDISMODULE_SUBEVENT_LOADING_REPL_START; + else + subevent = REDISMODULE_SUBEVENT_LOADING_RDB_START; + moduleFireServerEvent(REDISMODULE_EVENT_LOADING,subevent,NULL); } /* Mark that we are loading in the global state and setup the fields * needed to provide loading stats. * 'filename' is optional and used for rdb-check on error */ -void startLoadingFile(FILE *fp, char* filename) { +void startLoadingFile(FILE *fp, char* filename, int rdbflags) { struct stat sb; if (fstat(fileno(fp), &sb) == -1) sb.st_size = 0; rdbFileBeingLoaded = filename; - startLoading(sb.st_size); + startLoading(sb.st_size, rdbflags); } /* Refresh the loading progress info */ @@ -1945,9 +1962,37 @@ void loadingProgress(off_t pos) { } /* Loading finished */ -void stopLoading(void) { +void stopLoading(int success) { server.loading = 0; rdbFileBeingLoaded = NULL; + + /* Fire the loading modules end event. */ + moduleFireServerEvent(REDISMODULE_EVENT_LOADING, + success? + REDISMODULE_SUBEVENT_LOADING_ENDED: + REDISMODULE_SUBEVENT_LOADING_FAILED, + NULL); +} + +void startSaving(int rdbflags) { + /* Fire the persistence modules end event. */ + int subevent; + if (rdbflags & RDBFLAGS_AOF_PREAMBLE) + subevent = REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START; + else if (getpid()!=server.pid) + subevent = REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START; + else + subevent = REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START; + moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,subevent,NULL); +} + +void stopSaving(int success) { + /* Fire the persistence modules end event. */ + moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE, + success? + REDISMODULE_SUBEVENT_PERSISTENCE_ENDED: + REDISMODULE_SUBEVENT_PERSISTENCE_FAILED, + NULL); } /* Track loading progress in order to serve client's from time to time @@ -1966,12 +2011,13 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { replicationSendNewlineToMaster(); loadingProgress(r->processed_bytes); processEventsWhileBlocked(); + processModuleLoadingProgressEvent(0); } } /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, * otherwise C_ERR is returned and 'errno' is set accordingly. */ -int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { +int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { uint64_t dbid; int type, rdbver; redisDb *db = server.db+0; @@ -2182,7 +2228,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { * received from the master. In the latter case, the master is * responsible for key expiry. If we would expire keys here, the * snapshot taken by the master may not be reflected on the slave. */ - if (server.masterhost == NULL && !loading_aof && expiretime != -1 && expiretime < now) { + if (server.masterhost == NULL && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now) { decrRefCount(key); decrRefCount(val); } else { @@ -2243,17 +2289,17 @@ eoferr: * * If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the * loading code will fiil the information fields in the structure. */ -int rdbLoad(char *filename, rdbSaveInfo *rsi) { +int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) { FILE *fp; rio rdb; int retval; if ((fp = fopen(filename,"r")) == NULL) return C_ERR; - startLoadingFile(fp, filename); + startLoadingFile(fp, filename,rdbflags); rioInitWithFile(&rdb,fp); - retval = rdbLoadRio(&rdb,rsi,0); + retval = rdbLoadRio(&rdb,rdbflags,rsi); fclose(fp); - stopLoading(); + stopLoading(retval==C_OK); return retval; } diff --git a/src/rdb.h b/src/rdb.h index 40a50f7ba..4229beea8 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -121,8 +121,10 @@ #define RDB_LOAD_PLAIN (1<<1) #define RDB_LOAD_SDS (1<<2) -#define RDB_SAVE_NONE 0 -#define RDB_SAVE_AOF_PREAMBLE (1<<0) +/* flags on the purpose of rdb save or load */ +#define RDBFLAGS_NONE 0 +#define RDBFLAGS_AOF_PREAMBLE (1<<0) +#define RDBFLAGS_REPLICATION (1<<1) int rdbSaveType(rio *rdb, unsigned char type); int rdbLoadType(rio *rdb); @@ -135,7 +137,7 @@ uint64_t rdbLoadLen(rio *rdb, int *isencoded); int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr); int rdbSaveObjectType(rio *rdb, robj *o); int rdbLoadObjectType(rio *rdb); -int rdbLoad(char *filename, rdbSaveInfo *rsi); +int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags); int rdbSaveBackground(char *filename, rdbSaveInfo *rsi); int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); void rdbRemoveTempFile(pid_t childpid); @@ -154,7 +156,8 @@ int rdbSaveBinaryDoubleValue(rio *rdb, double val); int rdbLoadBinaryDoubleValue(rio *rdb, double *val); int rdbSaveBinaryFloatValue(rio *rdb, float val); int rdbLoadBinaryFloatValue(rio *rdb, float *val); -int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof); +int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi); +int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi); rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi); #endif diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c index 5e7415046..1210d49b4 100644 --- a/src/redis-check-rdb.c +++ b/src/redis-check-rdb.c @@ -202,7 +202,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { } expiretime = -1; - startLoadingFile(fp, rdbfilename); + startLoadingFile(fp, rdbfilename, RDBFLAGS_NONE); while(1) { robj *key, *val; @@ -316,7 +316,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { } if (closefile) fclose(fp); - stopLoading(); + stopLoading(1); return 0; eoferr: /* unexpected end of file is handled here with a fatal exit */ @@ -327,7 +327,7 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */ } err: if (closefile) fclose(fp); - stopLoading(); + stopLoading(0); return 1; } diff --git a/src/redismodule.h b/src/redismodule.h index 7053840b2..7ae1ad692 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -181,6 +181,8 @@ typedef uint64_t RedisModuleTimerID; #define REDISMODULE_EVENT_REPLICA_CHANGE 6 #define REDISMODULE_EVENT_MASTER_LINK_CHANGE 7 #define REDISMODULE_EVENT_CRON_LOOP 8 +#define REDISMODULE_EVENT_MODULE_CHANGE 9 +#define REDISMODULE_EVENT_LOADING_PROGRESS 10 typedef struct RedisModuleEvent { uint64_t id; /* REDISMODULE_EVENT_... defines. */ @@ -226,18 +228,28 @@ static const RedisModuleEvent RedisModuleEvent_MasterLinkChange = { REDISMODULE_EVENT_MASTER_LINK_CHANGE, 1 + }, + RedisModuleEvent_ModuleChange = { + REDISMODULE_EVENT_MODULE_CHANGE, + 1 + }, + RedisModuleEvent_LoadingProgress = { + REDISMODULE_EVENT_LOADING_PROGRESS, + 1 }; /* Those are values that are used for the 'subevent' callback argument. */ #define REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START 0 -#define REDISMODULE_SUBEVENT_PERSISTENCE_RDB_END 1 -#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START 2 -#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_END 3 +#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START 1 +#define REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START 2 +#define REDISMODULE_SUBEVENT_PERSISTENCE_ENDED 3 +#define REDISMODULE_SUBEVENT_PERSISTENCE_FAILED 4 #define REDISMODULE_SUBEVENT_LOADING_RDB_START 0 -#define REDISMODULE_SUBEVENT_LOADING_RDB_END 1 -#define REDISMODULE_SUBEVENT_LOADING_AOF_START 2 -#define REDISMODULE_SUBEVENT_LOADING_AOF_END 3 +#define REDISMODULE_SUBEVENT_LOADING_AOF_START 1 +#define REDISMODULE_SUBEVENT_LOADING_REPL_START 2 +#define REDISMODULE_SUBEVENT_LOADING_ENDED 3 +#define REDISMODULE_SUBEVENT_LOADING_FAILED 4 #define REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED 0 #define REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED 1 @@ -245,12 +257,21 @@ static const RedisModuleEvent #define REDISMODULE_SUBEVENT_MASTER_LINK_UP 0 #define REDISMODULE_SUBEVENT_MASTER_LINK_DOWN 1 -#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_CONNECTED 0 -#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_DISCONNECTED 1 +#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE 0 +#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE 1 + +#define REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER 0 +#define REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA 1 #define REDISMODULE_SUBEVENT_FLUSHDB_START 0 #define REDISMODULE_SUBEVENT_FLUSHDB_END 1 +#define REDISMODULE_SUBEVENT_MODULE_LOADED 0 +#define REDISMODULE_SUBEVENT_MODULE_UNLOADED 1 + +#define REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB 0 +#define REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF 1 + /* RedisModuleClientInfo flags. */ #define REDISMODULE_CLIENTINFO_FLAG_SSL (1<<0) #define REDISMODULE_CLIENTINFO_FLAG_PUBSUB (1<<1) @@ -286,6 +307,22 @@ typedef struct RedisModuleClientInfo { #define RedisModuleClientInfo RedisModuleClientInfoV1 +#define REDISMODULE_REPLICATIONINFO_VERSION 1 +typedef struct RedisModuleReplicationInfo { + uint64_t version; /* Not used since this structure is never passed + from the module to the core right now. Here + for future compatibility. */ + int master; /* true if master, false if replica */ + char *masterhost; /* master instance hostname for NOW_REPLICA */ + int masterport; /* master instance port for NOW_REPLICA */ + char *replid1; /* Main replication ID */ + char *replid2; /* Secondary replication ID */ + uint64_t repl1_offset; /* Main replication offset */ + uint64_t repl2_offset; /* Offset of replid2 validity */ +} RedisModuleReplicationInfoV1; + +#define RedisModuleReplicationInfo RedisModuleReplicationInfoV1 + #define REDISMODULE_FLUSHINFO_VERSION 1 typedef struct RedisModuleFlushInfo { uint64_t version; /* Not used since this structure is never passed @@ -297,6 +334,39 @@ typedef struct RedisModuleFlushInfo { #define RedisModuleFlushInfo RedisModuleFlushInfoV1 +#define REDISMODULE_MODULE_CHANGE_VERSION 1 +typedef struct RedisModuleModuleChange { + uint64_t version; /* Not used since this structure is never passed + from the module to the core right now. Here + for future compatibility. */ + const char* module_name;/* Name of module loaded or unloaded. */ + int32_t module_version; /* Module version. */ +} RedisModuleModuleChangeV1; + +#define RedisModuleModuleChange RedisModuleModuleChangeV1 + +#define REDISMODULE_CRON_LOOP_VERSION 1 +typedef struct RedisModuleCronLoopInfo { + uint64_t version; /* Not used since this structure is never passed + from the module to the core right now. Here + for future compatibility. */ + int32_t hz; /* Approximate number of events per second. */ +} RedisModuleCronLoopV1; + +#define RedisModuleCronLoop RedisModuleCronLoopV1 + +#define REDISMODULE_LOADING_PROGRESS_VERSION 1 +typedef struct RedisModuleLoadingProgressInfo { + uint64_t version; /* Not used since this structure is never passed + from the module to the core right now. Here + for future compatibility. */ + int32_t hz; /* Approximate number of events per second. */ + int32_t progress; /* Approximate progress between 0 and 1024, or -1 + * if unknown. */ +} RedisModuleLoadingProgressV1; + +#define RedisModuleLoadingProgress RedisModuleLoadingProgressV1 + /* ------------------------- End of common defines ------------------------ */ #ifndef REDISMODULE_CORE diff --git a/src/replication.c b/src/replication.c index 4550e6a83..c9a2e0fe1 100644 --- a/src/replication.c +++ b/src/replication.c @@ -533,6 +533,12 @@ int masterTryPartialResynchronization(client *c) { * has this state from the previous connection with the master. */ refreshGoodSlavesCount(); + + /* Fire the replica change modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, + REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE, + NULL); + return C_OK; /* The caller can return, no full resync needed. */ need_full_resync: @@ -868,6 +874,10 @@ void putSlaveOnline(client *slave) { return; } refreshGoodSlavesCount(); + /* Fire the replica change modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, + REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE, + NULL); serverLog(LL_NOTICE,"Synchronization with replica %s succeeded", replicationGetSlaveName(slave)); } @@ -1542,11 +1552,11 @@ void readSyncBulkPayload(connection *conn) { * We'll restore it when the RDB is received. */ connBlock(conn); connRecvTimeout(conn, server.repl_timeout*1000); - startLoading(server.repl_transfer_size); + startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION); - if (rdbLoadRio(&rdb,&rsi,0) != C_OK) { + if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi) != C_OK) { /* RDB loading failed. */ - stopLoading(); + stopLoading(0); serverLog(LL_WARNING, "Failed trying to load the MASTER synchronization DB " "from socket"); @@ -1567,7 +1577,7 @@ void readSyncBulkPayload(connection *conn) { * gets promoted. */ return; } - stopLoading(); + stopLoading(1); /* RDB loading succeeded if we reach this point. */ if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { @@ -1614,7 +1624,7 @@ void readSyncBulkPayload(connection *conn) { cancelReplicationHandshake(); return; } - if (rdbLoad(server.rdb_filename,&rsi) != C_OK) { + if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) { serverLog(LL_WARNING, "Failed trying to load the MASTER synchronization " "DB from disk"); @@ -1636,6 +1646,11 @@ void readSyncBulkPayload(connection *conn) { server.repl_state = REPL_STATE_CONNECTED; server.repl_down_since = 0; + /* Fire the master link modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, + REDISMODULE_SUBEVENT_MASTER_LINK_UP, + NULL); + /* After a full resynchroniziation we use the replication ID and * offset of the master. The secondary ID / offset are cleared since * we are starting a new history. */ @@ -2314,12 +2329,31 @@ void replicationSetMaster(char *ip, int port) { replicationDiscardCachedMaster(); replicationCacheMasterUsingMyself(); } + + /* Fire the role change modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED, + REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA, + NULL); + + /* Fire the master link modules event. */ + if (server.repl_state == REPL_STATE_CONNECTED) + moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, + REDISMODULE_SUBEVENT_MASTER_LINK_DOWN, + NULL); + server.repl_state = REPL_STATE_CONNECT; } /* Cancel replication, setting the instance as a master itself. */ void replicationUnsetMaster(void) { if (server.masterhost == NULL) return; /* Nothing to do. */ + + /* Fire the master link modules event. */ + if (server.repl_state == REPL_STATE_CONNECTED) + moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, + REDISMODULE_SUBEVENT_MASTER_LINK_DOWN, + NULL); + sdsfree(server.masterhost); server.masterhost = NULL; /* When a slave is turned into a master, the current replication ID @@ -2348,11 +2382,22 @@ void replicationUnsetMaster(void) { * starting from now. Otherwise the backlog will be freed after a * failover if slaves do not connect immediately. */ server.repl_no_slaves_since = server.unixtime; + + /* Fire the role change modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED, + REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER, + NULL); } /* This function is called when the slave lose the connection with the * master into an unexpected way. */ void replicationHandleMasterDisconnection(void) { + /* Fire the master link modules event. */ + if (server.repl_state == REPL_STATE_CONNECTED) + moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, + REDISMODULE_SUBEVENT_MASTER_LINK_DOWN, + NULL); + server.master = NULL; server.repl_state = REPL_STATE_CONNECT; server.repl_down_since = server.unixtime; diff --git a/src/server.c b/src/server.c index 8f165113d..f42924764 100644 --- a/src/server.c +++ b/src/server.c @@ -2056,6 +2056,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { server.rdb_bgsave_scheduled = 0; } + /* Fire the cron loop modules event. */ + RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz}; + moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP, + 0, + &ei); + server.cronloops++; return 1000/server.hz; } @@ -3682,6 +3688,9 @@ int prepareForShutdown(int flags) { } } + /* Fire the shutdown modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_SHUTDOWN,0,NULL); + /* Remove the pid file if possible and needed. */ if (server.daemonize || server.pidfile) { serverLog(LL_NOTICE,"Removing the pid file."); @@ -4767,7 +4776,7 @@ void loadDataFromDisk(void) { serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); } else { rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; - if (rdbLoad(server.rdb_filename,&rsi) == C_OK) { + if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) { serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds", (float)(ustime()-start)/1000000); diff --git a/src/server.h b/src/server.h index 97672d727..b52d57a05 100644 --- a/src/server.h +++ b/src/server.h @@ -1602,6 +1602,7 @@ ssize_t rdbSaveModulesAux(rio *rdb, int when); int moduleAllDatatypesHandleErrors(); sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections); void moduleFireServerEvent(uint64_t eid, int subid, void *data); +void processModuleLoadingProgressEvent(int is_aof); /* Utils */ long long ustime(void); @@ -1831,10 +1832,12 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, void rdbPipeWriteHandlerConnRemoved(struct connection *conn); /* Generic persistence functions */ -void startLoadingFile(FILE* fp, char* filename); -void startLoading(size_t size); +void startLoadingFile(FILE* fp, char* filename, int rdbflags); +void startLoading(size_t size, int rdbflags); void loadingProgress(off_t pos); -void stopLoading(void); +void stopLoading(int success); +void startSaving(int rdbflags); +void stopSaving(int success); #define DISK_ERROR_TYPE_AOF 1 /* Don't accept writes: AOF errors. */ #define DISK_ERROR_TYPE_RDB 2 /* Don't accept writes: RDB errors. */ @@ -1843,7 +1846,6 @@ int writeCommandsDeniedByDiskError(void); /* RDB persistence */ #include "rdb.h" -int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi); void killRDBChild(void); /* AOF persistence */ diff --git a/tests/modules/hooks.c b/tests/modules/hooks.c index 33b690b2f..665a20481 100644 --- a/tests/modules/hooks.c +++ b/tests/modules/hooks.c @@ -30,36 +30,227 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#define REDISMODULE_EXPERIMENTAL_API #include "redismodule.h" +#include +#include + +/* We need to store events to be able to test and see what we got, and we can't + * store them in the key-space since that would mess up rdb loading (duplicates) + * and be lost of flushdb. */ +RedisModuleDict *event_log = NULL; + +typedef struct EventElement { + long count; + RedisModuleString *last_val_string; + long last_val_int; +} EventElement; + +void LogStringEvent(RedisModuleCtx *ctx, const char* keyname, const char* data) { + EventElement *event = RedisModule_DictGetC(event_log, (void*)keyname, strlen(keyname), NULL); + if (!event) { + event = RedisModule_Alloc(sizeof(EventElement)); + memset(event, 0, sizeof(EventElement)); + RedisModule_DictSetC(event_log, (void*)keyname, strlen(keyname), event); + } + if (event->last_val_string) RedisModule_FreeString(ctx, event->last_val_string); + event->last_val_string = RedisModule_CreateString(ctx, data, strlen(data)); + event->count++; +} + +void LogNumericEvent(RedisModuleCtx *ctx, const char* keyname, long data) { + REDISMODULE_NOT_USED(ctx); + EventElement *event = RedisModule_DictGetC(event_log, (void*)keyname, strlen(keyname), NULL); + if (!event) { + event = RedisModule_Alloc(sizeof(EventElement)); + memset(event, 0, sizeof(EventElement)); + RedisModule_DictSetC(event_log, (void*)keyname, strlen(keyname), event); + } + event->last_val_int = data; + event->count++; +} + +void FreeEvent(RedisModuleCtx *ctx, EventElement *event) { + if (event->last_val_string) + RedisModule_FreeString(ctx, event->last_val_string); + RedisModule_Free(event); +} + +int cmdEventCount(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 2){ + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + EventElement *event = RedisModule_DictGet(event_log, argv[1], NULL); + RedisModule_ReplyWithLongLong(ctx, event? event->count: 0); + return REDISMODULE_OK; +} + +int cmdEventLast(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 2){ + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + EventElement *event = RedisModule_DictGet(event_log, argv[1], NULL); + if (event && event->last_val_string) + RedisModule_ReplyWithString(ctx, event->last_val_string); + else if (event) + RedisModule_ReplyWithLongLong(ctx, event->last_val_int); + else + RedisModule_ReplyWithNull(ctx); + return REDISMODULE_OK; +} + +void clearEvents(RedisModuleCtx *ctx) +{ + RedisModuleString *key; + EventElement *event; + RedisModuleDictIter *iter = RedisModule_DictIteratorStart(event_log, "^", NULL); + while((key = RedisModule_DictNext(ctx, iter, (void**)&event)) != NULL) { + event->count = 0; + event->last_val_int = 0; + if (event->last_val_string) RedisModule_FreeString(ctx, event->last_val_string); + event->last_val_string = NULL; + RedisModule_DictDel(event_log, key, NULL); + RedisModule_Free(event); + } + RedisModule_DictIteratorStop(iter); +} + +int cmdEventsClear(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argc); + REDISMODULE_NOT_USED(argv); + clearEvents(ctx); + return REDISMODULE_OK; +} /* Client state change callback. */ void clientChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) { - REDISMODULE_NOT_USED(ctx); REDISMODULE_NOT_USED(e); RedisModuleClientInfo *ci = data; char *keyname = (sub == REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED) ? - "connected" : "disconnected"; - RedisModuleCallReply *reply; - RedisModule_SelectDb(ctx,9); - reply = RedisModule_Call(ctx,"RPUSH","cl",keyname,(long)ci->id); - RedisModule_FreeCallReply(reply); + "client-connected" : "client-disconnected"; + LogNumericEvent(ctx, keyname, ci->id); } void flushdbCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) { - REDISMODULE_NOT_USED(ctx); REDISMODULE_NOT_USED(e); RedisModuleFlushInfo *fi = data; char *keyname = (sub == REDISMODULE_SUBEVENT_FLUSHDB_START) ? "flush-start" : "flush-end"; - RedisModuleCallReply *reply; - RedisModule_SelectDb(ctx,9); - reply = RedisModule_Call(ctx,"RPUSH","cl",keyname,(long)fi->dbnum); - RedisModule_FreeCallReply(reply); + LogNumericEvent(ctx, keyname, fi->dbnum); +} + +void roleChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + + RedisModuleReplicationInfo *ri = data; + char *keyname = (sub == REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER) ? + "role-master" : "role-replica"; + LogStringEvent(ctx, keyname, ri->masterhost); +} + +void replicationChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + + char *keyname = (sub == REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE) ? + "replica-online" : "replica-offline"; + LogNumericEvent(ctx, keyname, 0); +} + +void rasterLinkChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + + char *keyname = (sub == REDISMODULE_SUBEVENT_MASTER_LINK_UP) ? + "masterlink-up" : "masterlink-down"; + LogNumericEvent(ctx, keyname, 0); +} + +void persistenceCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + + char *keyname = NULL; + switch (sub) { + case REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START: keyname = "persistence-rdb-start"; break; + case REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START: keyname = "persistence-aof-start"; break; + case REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START: keyname = "persistence-syncrdb-start"; break; + case REDISMODULE_SUBEVENT_PERSISTENCE_ENDED: keyname = "persistence-end"; break; + case REDISMODULE_SUBEVENT_PERSISTENCE_FAILED: keyname = "persistence-failed"; break; + } + /* modifying the keyspace from the fork child is not an option, using log instead */ + RedisModule_Log(ctx, "warning", "module-event-%s", keyname); + if (sub == REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START) + LogNumericEvent(ctx, keyname, 0); +} + +void loadingCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + + char *keyname = NULL; + switch (sub) { + case REDISMODULE_SUBEVENT_LOADING_RDB_START: keyname = "loading-rdb-start"; break; + case REDISMODULE_SUBEVENT_LOADING_AOF_START: keyname = "loading-aof-start"; break; + case REDISMODULE_SUBEVENT_LOADING_REPL_START: keyname = "loading-repl-start"; break; + case REDISMODULE_SUBEVENT_LOADING_ENDED: keyname = "loading-end"; break; + case REDISMODULE_SUBEVENT_LOADING_FAILED: keyname = "loading-failed"; break; + } + LogNumericEvent(ctx, keyname, 0); +} + +void loadingProgressCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + + RedisModuleLoadingProgress *ei = data; + char *keyname = (sub == REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB) ? + "loading-progress-rdb" : "loading-progress-aof"; + LogNumericEvent(ctx, keyname, ei->progress); +} + +void shutdownCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + REDISMODULE_NOT_USED(sub); + + RedisModule_Log(ctx, "warning", "module-event-%s", "shutdown"); +} + +void cronLoopCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(sub); + + RedisModuleCronLoop *ei = data; + LogNumericEvent(ctx, "cron-loop", ei->hz); +} + +void moduleChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + + RedisModuleModuleChange *ei = data; + char *keyname = (sub == REDISMODULE_SUBEVENT_MODULE_LOADED) ? + "module-loaded" : "module-unloaded"; + LogStringEvent(ctx, keyname, ei->module_name); } /* This function must be present on each Redis module. It is used in order to @@ -71,9 +262,50 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_Init(ctx,"testhook",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) return REDISMODULE_ERR; + /* replication related hooks */ + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_ReplicationRoleChanged, roleChangeCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_ReplicaChange, replicationChangeCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_MasterLinkChange, rasterLinkChangeCallback); + + /* persistence related hooks */ + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_Persistence, persistenceCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_Loading, loadingCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_LoadingProgress, loadingProgressCallback); + + /* other hooks */ RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_ClientChange, clientChangeCallback); RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_FlushDB, flushdbCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_Shutdown, shutdownCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_CronLoop, cronLoopCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_ModuleChange, moduleChangeCallback); + + event_log = RedisModule_CreateDict(ctx); + + if (RedisModule_CreateCommand(ctx,"hooks.event_count", cmdEventCount,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"hooks.event_last", cmdEventLast,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"hooks.clear", cmdEventsClear,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } + +int RedisModule_OnUnload(RedisModuleCtx *ctx) { + clearEvents(ctx); + RedisModule_FreeDict(ctx, event_log); + event_log = NULL; + return REDISMODULE_OK; +} + diff --git a/tests/unit/moduleapi/hooks.tcl b/tests/unit/moduleapi/hooks.tcl index 7a727902d..cbca9c3eb 100644 --- a/tests/unit/moduleapi/hooks.tcl +++ b/tests/unit/moduleapi/hooks.tcl @@ -3,26 +3,138 @@ set testmodule [file normalize tests/modules/hooks.so] tags "modules" { start_server {} { r module load $testmodule + r config set appendonly yes + test {Test clients connection / disconnection hooks} { for {set j 0} {$j < 2} {incr j} { set rd1 [redis_deferring_client] $rd1 close } - assert {[r llen connected] > 1} - assert {[r llen disconnected] > 1} + assert {[r hooks.event_count client-connected] > 1} + assert {[r hooks.event_count client-disconnected] > 1} + } + + test {Test module cron hook} { + after 100 + assert {[r hooks.event_count cron-loop] > 0} + set hz [r hooks.event_last cron-loop] + assert_equal $hz 10 + } + + test {Test module loaded / unloaded hooks} { + set othermodule [file normalize tests/modules/infotest.so] + r module load $othermodule + r module unload infotest + assert_equal [r hooks.event_last module-loaded] "infotest" + assert_equal [r hooks.event_last module-unloaded] "infotest" + } + + test {Test module aofrw hook} { + r debug populate 1000 foo 10000 ;# 10mb worth of data + r config set rdbcompression no ;# rdb progress is only checked once in 2mb + r BGREWRITEAOF + waitForBgrewriteaof r + assert_equal [string match {*module-event-persistence-aof-start*} [exec tail -20 < [srv 0 stdout]]] 1 + assert_equal [string match {*module-event-persistence-end*} [exec tail -20 < [srv 0 stdout]]] 1 + } + + test {Test module aof load and rdb/aof progress hooks} { + # create some aof tail (progress is checked only once in 1000 commands) + for {set j 0} {$j < 4000} {incr j} { + r set "bar$j" x + } + # set some configs that will cause many loading progress events during aof loading + r config set key-load-delay 1 + r config set dynamic-hz no + r config set hz 500 + r DEBUG LOADAOF + assert_equal [r hooks.event_last loading-aof-start] 0 + assert_equal [r hooks.event_last loading-end] 0 + assert {[r hooks.event_count loading-rdb-start] == 0} + assert {[r hooks.event_count loading-progress-rdb] >= 2} ;# comes from the preamble section + assert {[r hooks.event_count loading-progress-aof] >= 2} + } + # undo configs before next test + r config set dynamic-hz yes + r config set key-load-delay 0 + + test {Test module rdb save hook} { + # debug reload does: save, flush, load: + assert {[r hooks.event_count persistence-syncrdb-start] == 0} + assert {[r hooks.event_count loading-rdb-start] == 0} + r debug reload + assert {[r hooks.event_count persistence-syncrdb-start] == 1} + assert {[r hooks.event_count loading-rdb-start] == 1} } test {Test flushdb hooks} { - r flushall ;# Note: only the "end" RPUSH will survive - r select 1 - r flushdb - r select 2 r flushdb - r select 9 - assert {[r llen flush-start] == 2} - assert {[r llen flush-end] == 3} - assert {[r lrange flush-start 0 -1] eq {1 2}} - assert {[r lrange flush-end 0 -1] eq {-1 1 2}} + assert_equal [r hooks.event_last flush-start] 9 + assert_equal [r hooks.event_last flush-end] 9 + r flushall + assert_equal [r hooks.event_last flush-start] -1 + assert_equal [r hooks.event_last flush-end] -1 + } + + # replication related tests + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + start_server {} { + r module load $testmodule + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + $replica replicaof $master_host $master_port + + wait_for_condition 50 100 { + [string match {*master_link_status:up*} [r info replication]] + } else { + fail "Can't turn the instance into a replica" + } + + test {Test master link up hook} { + assert_equal [r hooks.event_count masterlink-up] 1 + assert_equal [r hooks.event_count masterlink-down] 0 + } + + test {Test role-replica hook} { + assert_equal [r hooks.event_count role-replica] 1 + assert_equal [r hooks.event_count role-master] 0 + assert_equal [r hooks.event_last role-replica] [s 0 master_host] + } + + test {Test replica-online hook} { + assert_equal [r -1 hooks.event_count replica-online] 1 + assert_equal [r -1 hooks.event_count replica-offline] 0 + } + + test {Test master link down hook} { + r client kill type master + assert_equal [r hooks.event_count masterlink-down] 1 + } + + $replica replicaof no one + + test {Test role-master hook} { + assert_equal [r hooks.event_count role-replica] 1 + assert_equal [r hooks.event_count role-master] 1 + assert_equal [r hooks.event_last role-master] {} + } + + test {Test replica-offline hook} { + assert_equal [r -1 hooks.event_count replica-online] 1 + assert_equal [r -1 hooks.event_count replica-offline] 1 + } + # get the replica stdout, to be used by the next test + set replica_stdout [srv 0 stdout] } + + + # look into the log file of the server that just exited + test {Test shutdown hook} { + assert_equal [string match {*module-event-shutdown*} [exec tail -5 < $replica_stdout]] 1 + } + } } -- cgit v1.2.1 From 779aebc91cd8d3043ab172e0bc5b8c988df88e33 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 3 Nov 2019 16:42:31 +0200 Subject: Module API for loading and saving long double looks like each platform implements long double differently (different bit count) so we can't save them as binary, and we also want to avoid creating a new RDB format version, so we save these are hex strings using "%La". This commit includes a change in the arguments of ld2string to support this. as well as tests for coverage and short reads. coded by @guybe7 --- src/module.c | 27 ++++++++++++++++++++++ src/networking.c | 2 +- src/object.c | 2 +- src/redismodule.h | 4 ++++ src/t_hash.c | 2 +- src/util.c | 60 +++++++++++++++++++++++++++++-------------------- src/util.h | 9 +++++++- tests/modules/testrdb.c | 9 +++++++- 8 files changed, 86 insertions(+), 29 deletions(-) diff --git a/src/module.c b/src/module.c index f9f654b42..68d1d023d 100644 --- a/src/module.c +++ b/src/module.c @@ -3716,6 +3716,31 @@ loaderr: return 0; } +/* In the context of the rdb_save method of a module data type, saves a long double + * value to the RDB file. The double can be a valid number, a NaN or infinity. + * It is possible to load back the value with RedisModule_LoadLongDouble(). */ +void RM_SaveLongDouble(RedisModuleIO *io, long double value) { + if (io->error) return; + char buf[MAX_LONG_DOUBLE_CHARS]; + /* Long double has different number of bits in different platforms, so we + * save it as a string type. */ + size_t len = ld2string(buf,sizeof(buf),value,LD_STR_HEX); + RM_SaveStringBuffer(io,buf,len+1); /* len+1 for '\0' */ +} + +/* In the context of the rdb_save method of a module data type, loads back the + * long double value saved by RedisModule_SaveLongDouble(). */ +long double RM_LoadLongDouble(RedisModuleIO *io) { + if (io->error) return 0; + long double value; + size_t len; + char* str = RM_LoadStringBuffer(io,&len); + if (!str) return 0; + string2ld(str,len,&value); + RM_Free(str); + return value; +} + /* Iterate over modules, and trigger rdb aux saving for the ones modules types * who asked for it. */ ssize_t rdbSaveModulesAux(rio *rdb, int when) { @@ -6669,6 +6694,8 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(LoadDouble); REGISTER_API(SaveFloat); REGISTER_API(LoadFloat); + REGISTER_API(SaveLongDouble); + REGISTER_API(LoadLongDouble); REGISTER_API(EmitAOF); REGISTER_API(Log); REGISTER_API(LogIOError); diff --git a/src/networking.c b/src/networking.c index e7cc561fa..428ab14ce 100644 --- a/src/networking.c +++ b/src/networking.c @@ -530,7 +530,7 @@ void addReplyHumanLongDouble(client *c, long double d) { decrRefCount(o); } else { char buf[MAX_LONG_DOUBLE_CHARS]; - int len = ld2string(buf,sizeof(buf),d,1); + int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN); addReplyProto(c,",",1); addReplyProto(c,buf,len); addReplyProto(c,"\r\n",2); diff --git a/src/object.c b/src/object.c index 70022f897..53ad518a9 100644 --- a/src/object.c +++ b/src/object.c @@ -178,7 +178,7 @@ robj *createStringObjectFromLongLongForValue(long long value) { * The 'humanfriendly' option is used for INCRBYFLOAT and HINCRBYFLOAT. */ robj *createStringObjectFromLongDouble(long double value, int humanfriendly) { char buf[MAX_LONG_DOUBLE_CHARS]; - int len = ld2string(buf,sizeof(buf),value,humanfriendly); + int len = ld2string(buf,sizeof(buf),value,humanfriendly? LD_STR_HUMAN: LD_STR_AUTO); return createStringObject(buf,len); } diff --git a/src/redismodule.h b/src/redismodule.h index ea0d6a139..54d198592 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -457,6 +457,8 @@ void REDISMODULE_API_FUNC(RedisModule_SaveDouble)(RedisModuleIO *io, double valu double REDISMODULE_API_FUNC(RedisModule_LoadDouble)(RedisModuleIO *io); void REDISMODULE_API_FUNC(RedisModule_SaveFloat)(RedisModuleIO *io, float value); float REDISMODULE_API_FUNC(RedisModule_LoadFloat)(RedisModuleIO *io); +void REDISMODULE_API_FUNC(RedisModule_SaveLongDouble)(RedisModuleIO *io, long double value); +long double REDISMODULE_API_FUNC(RedisModule_LoadLongDouble)(RedisModuleIO *io); void REDISMODULE_API_FUNC(RedisModule_Log)(RedisModuleCtx *ctx, const char *level, const char *fmt, ...); void REDISMODULE_API_FUNC(RedisModule_LogIOError)(RedisModuleIO *io, const char *levelstr, const char *fmt, ...); void REDISMODULE_API_FUNC(RedisModule__Assert)(const char *estr, const char *file, int line); @@ -658,6 +660,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(LoadDouble); REDISMODULE_GET_API(SaveFloat); REDISMODULE_GET_API(LoadFloat); + REDISMODULE_GET_API(SaveLongDouble); + REDISMODULE_GET_API(LoadLongDouble); REDISMODULE_GET_API(EmitAOF); REDISMODULE_GET_API(Log); REDISMODULE_GET_API(LogIOError); diff --git a/src/t_hash.c b/src/t_hash.c index e6ed33819..b9f0db7fc 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -621,7 +621,7 @@ void hincrbyfloatCommand(client *c) { } char buf[MAX_LONG_DOUBLE_CHARS]; - int len = ld2string(buf,sizeof(buf),value,1); + int len = ld2string(buf,sizeof(buf),value,LD_STR_HUMAN); new = sdsnewlen(buf,len); hashTypeSet(o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE); addReplyBulkCBuffer(c,buf,len); diff --git a/src/util.c b/src/util.c index 783bcf83b..062a572d4 100644 --- a/src/util.c +++ b/src/util.c @@ -510,15 +510,17 @@ int d2string(char *buf, size_t len, double value) { return len; } -/* Convert a long double into a string. If humanfriendly is non-zero - * it does not use exponential format and trims trailing zeroes at the end, - * however this results in loss of precision. Otherwise exp format is used - * and the output of snprintf() is not modified. +/* Create a string object from a long double. + * If mode is humanfriendly it does not use exponential format and trims trailing + * zeroes at the end (may result in loss of precision). + * If mode is default exp format is used and the output of snprintf() + * is not modified (may result in loss of precision). + * If mode is hex hexadecimal format is used (no loss of precision) * * The function returns the length of the string or zero if there was not * enough buffer room to store it. */ -int ld2string(char *buf, size_t len, long double value, int humanfriendly) { - size_t l; +int ld2string(char *buf, size_t len, long double value, ld2string_mode mode) { + size_t l = 0; if (isinf(value)) { /* Libc in odd systems (Hi Solaris!) will format infinite in a @@ -531,26 +533,36 @@ int ld2string(char *buf, size_t len, long double value, int humanfriendly) { memcpy(buf,"-inf",4); l = 4; } - } else if (humanfriendly) { - /* We use 17 digits precision since with 128 bit floats that precision - * after rounding is able to represent most small decimal numbers in a - * way that is "non surprising" for the user (that is, most small - * decimal numbers will be represented in a way that when converted - * back into a string are exactly the same as what the user typed.) */ - l = snprintf(buf,len,"%.17Lf", value); - if (l+1 > len) return 0; /* No room. */ - /* Now remove trailing zeroes after the '.' */ - if (strchr(buf,'.') != NULL) { - char *p = buf+l-1; - while(*p == '0') { - p--; - l--; + } else { + switch (mode) { + case LD_STR_AUTO: + l = snprintf(buf,len,"%.17Lg",value); + if (l+1 > len) return 0; /* No room. */ + break; + case LD_STR_HEX: + l = snprintf(buf,len,"%La",value); + if (l+1 > len) return 0; /* No room. */ + break; + case LD_STR_HUMAN: + /* We use 17 digits precision since with 128 bit floats that precision + * after rounding is able to represent most small decimal numbers in a + * way that is "non surprising" for the user (that is, most small + * decimal numbers will be represented in a way that when converted + * back into a string are exactly the same as what the user typed.) */ + l = snprintf(buf,len,"%.17Lf",value); + if (l+1 > len) return 0; /* No room. */ + /* Now remove trailing zeroes after the '.' */ + if (strchr(buf,'.') != NULL) { + char *p = buf+l-1; + while(*p == '0') { + p--; + l--; + } + if (*p == '.') l--; } - if (*p == '.') l--; + break; + default: return 0; /* Invalid mode. */ } - } else { - l = snprintf(buf,len,"%.17Lg", value); - if (l+1 > len) return 0; /* No room. */ } buf[l] = '\0'; return l; diff --git a/src/util.h b/src/util.h index b6c01aa59..a91addb80 100644 --- a/src/util.h +++ b/src/util.h @@ -38,6 +38,13 @@ * This should be the size of the buffer given to ld2string */ #define MAX_LONG_DOUBLE_CHARS 5*1024 +/* long double to string convertion options */ +typedef enum { + LD_STR_AUTO, /* %.17Lg */ + LD_STR_HUMAN, /* %.17Lf + Trimming of trailing zeros */ + LD_STR_HEX /* %La */ +} ld2string_mode; + int stringmatchlen(const char *p, int plen, const char *s, int slen, int nocase); int stringmatch(const char *p, const char *s, int nocase); int stringmatchlen_fuzz_test(void); @@ -49,7 +56,7 @@ int string2ll(const char *s, size_t slen, long long *value); int string2l(const char *s, size_t slen, long *value); int string2ld(const char *s, size_t slen, long double *dp); int d2string(char *buf, size_t len, double value); -int ld2string(char *buf, size_t len, long double value, int humanfriendly); +int ld2string(char *buf, size_t len, long double value, ld2string_mode mode); sds getAbsolutePath(char *filename); unsigned long getTimeZone(void); int pathIsBaseName(char *path); diff --git a/tests/modules/testrdb.c b/tests/modules/testrdb.c index eb8d1a999..8a262e8a7 100644 --- a/tests/modules/testrdb.c +++ b/tests/modules/testrdb.c @@ -15,11 +15,16 @@ RedisModuleString *after_str = NULL; void *testrdb_type_load(RedisModuleIO *rdb, int encver) { int count = RedisModule_LoadSigned(rdb); + RedisModuleString *str = RedisModule_LoadString(rdb); + float f = RedisModule_LoadFloat(rdb); + long double ld = RedisModule_LoadLongDouble(rdb); if (RedisModule_IsIOError(rdb)) return NULL; + /* Using the values only after checking for io errors. */ assert(count==1); assert(encver==1); - RedisModuleString *str = RedisModule_LoadString(rdb); + assert(f==1.5f); + assert(ld==0.333333333333333333L); return str; } @@ -27,6 +32,8 @@ void testrdb_type_save(RedisModuleIO *rdb, void *value) { RedisModuleString *str = (RedisModuleString*)value; RedisModule_SaveSigned(rdb, 1); RedisModule_SaveString(rdb, str); + RedisModule_SaveFloat(rdb, 1.5); + RedisModule_SaveLongDouble(rdb, 0.333333333333333333L); } void testrdb_aux_save(RedisModuleIO *rdb, int when) { -- cgit v1.2.1 From 87332ce524e30d4949d2144a4da15b5ae17e5051 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 3 Nov 2019 17:35:35 +0200 Subject: Module API for PUBLISH, FLUSHALL, RANDOMKEY, DBSIZE --- src/db.c | 44 ++++++++++++++++++++++++-------------------- src/module.c | 35 +++++++++++++++++++++++++++++++++++ src/redismodule.h | 8 ++++++++ src/server.h | 2 ++ 4 files changed, 69 insertions(+), 20 deletions(-) diff --git a/src/db.c b/src/db.c index 2c0a0cdd3..ad19b42dd 100644 --- a/src/db.c +++ b/src/db.c @@ -461,6 +461,29 @@ int getFlushCommandFlags(client *c, int *flags) { return C_OK; } +/* Flushes the whole server data set. */ +void flushAllDataAndResetRDB(int flags) { + server.dirty += emptyDb(-1,flags,NULL); + if (server.rdb_child_pid != -1) killRDBChild(); + if (server.saveparamslen > 0) { + /* Normally rdbSave() will reset dirty, but we don't want this here + * as otherwise FLUSHALL will not be replicated nor put into the AOF. */ + int saved_dirty = server.dirty; + rdbSaveInfo rsi, *rsiptr; + rsiptr = rdbPopulateSaveInfo(&rsi); + rdbSave(server.rdb_filename,rsiptr); + server.dirty = saved_dirty; + } + server.dirty++; +#if defined(USE_JEMALLOC) + /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. + * for large databases, flushdb blocks for long anyway, so a bit more won't + * harm and this way the flush and purge will be synchroneus. */ + if (!(flags & EMPTYDB_ASYNC)) + jemalloc_purge(); +#endif +} + /* FLUSHDB [ASYNC] * * Flushes the currently SELECTed Redis DB. */ @@ -484,28 +507,9 @@ void flushdbCommand(client *c) { * Flushes the whole server data set. */ void flushallCommand(client *c) { int flags; - if (getFlushCommandFlags(c,&flags) == C_ERR) return; - server.dirty += emptyDb(-1,flags,NULL); + flushAllDataAndResetRDB(flags); addReply(c,shared.ok); - if (server.rdb_child_pid != -1) killRDBChild(); - if (server.saveparamslen > 0) { - /* Normally rdbSave() will reset dirty, but we don't want this here - * as otherwise FLUSHALL will not be replicated nor put into the AOF. */ - int saved_dirty = server.dirty; - rdbSaveInfo rsi, *rsiptr; - rsiptr = rdbPopulateSaveInfo(&rsi); - rdbSave(server.rdb_filename,rsiptr); - server.dirty = saved_dirty; - } - server.dirty++; -#if defined(USE_JEMALLOC) - /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. - * for large databases, flushdb blocks for long anyway, so a bit more won't - * harm and this way the flush and purge will be synchroneus. */ - if (!(flags & EMPTYDB_ASYNC)) - jemalloc_purge(); -#endif } /* This command implements DEL and LAZYDEL. */ diff --git a/src/module.c b/src/module.c index f9f654b42..5453aed47 100644 --- a/src/module.c +++ b/src/module.c @@ -1677,6 +1677,15 @@ int RM_GetClientInfoById(void *ci, uint64_t id) { return modulePopulateClientInfoStructure(ci,client,structver); } +/* Publish a message to subscribers (see PUBLISH command). */ +int RM_PublishMessage(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) { + UNUSED(ctx); + int receivers = pubsubPublishMessage(channel, message); + if (server.cluster_enabled) + clusterPropagatePublish(channel, message); + return receivers; +} + /* Return the currently selected DB. */ int RM_GetSelectedDb(RedisModuleCtx *ctx) { return ctx->client->db->id; @@ -1964,6 +1973,28 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) { return REDISMODULE_OK; } +/* Performs similar operation to FLUSHALL, and optionally start a new AOF file (if enabled) + * If restart_aof is true, you must make sure the command that triggered this call is not + * propagated to the AOF file. + * When async is set to true, db contents will be freed by a background thread. */ +void RM_ResetDataset(int restart_aof, int async) { + if (restart_aof && server.aof_state != AOF_OFF) stopAppendOnly(); + flushAllDataAndResetRDB(async? EMPTYDB_ASYNC: EMPTYDB_NO_FLAGS); + if (server.aof_enabled && restart_aof) restartAOFAfterSYNC(); +} + +/* Returns the number of keys in the current db. */ +unsigned long long RM_DbSize(RedisModuleCtx *ctx) { + return dictSize(ctx->client->db->dict); +} + +/* Returns a name of a random key, or NULL if current db is empty. */ +RedisModuleString *RM_RandomKey(RedisModuleCtx *ctx) { + robj *key = dbRandomKey(ctx->client->db); + autoMemoryAdd(ctx,REDISMODULE_AM_STRING,key); + return key; +} + /* -------------------------------------------------------------------------- * Key API for String type * -------------------------------------------------------------------------- */ @@ -6630,6 +6661,9 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(StringTruncate); REGISTER_API(SetExpire); REGISTER_API(GetExpire); + REGISTER_API(ResetDataset); + REGISTER_API(DbSize); + REGISTER_API(RandomKey); REGISTER_API(ZsetAdd); REGISTER_API(ZsetIncrby); REGISTER_API(ZsetScore); @@ -6757,6 +6791,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(InfoAddFieldLongLong); REGISTER_API(InfoAddFieldULongLong); REGISTER_API(GetClientInfoById); + REGISTER_API(PublishMessage); REGISTER_API(SubscribeToServerEvent); REGISTER_API(BlockClientOnKeys); REGISTER_API(SignalKeyAsReady); diff --git a/src/redismodule.h b/src/redismodule.h index ea0d6a139..77019f89e 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -416,6 +416,9 @@ char *REDISMODULE_API_FUNC(RedisModule_StringDMA)(RedisModuleKey *key, size_t *l int REDISMODULE_API_FUNC(RedisModule_StringTruncate)(RedisModuleKey *key, size_t newlen); mstime_t REDISMODULE_API_FUNC(RedisModule_GetExpire)(RedisModuleKey *key); int REDISMODULE_API_FUNC(RedisModule_SetExpire)(RedisModuleKey *key, mstime_t expire); +void REDISMODULE_API_FUNC(RedisModule_ResetDataset)(int restart_aof, int async); +unsigned long long REDISMODULE_API_FUNC(RedisModule_DbSize)(RedisModuleCtx *ctx); +RedisModuleString *REDISMODULE_API_FUNC(RedisModule_RandomKey)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_ZsetAdd)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr); int REDISMODULE_API_FUNC(RedisModule_ZsetIncrby)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr, double *newscore); int REDISMODULE_API_FUNC(RedisModule_ZsetScore)(RedisModuleKey *key, RedisModuleString *ele, double *score); @@ -435,6 +438,7 @@ int REDISMODULE_API_FUNC(RedisModule_IsKeysPositionRequest)(RedisModuleCtx *ctx) void REDISMODULE_API_FUNC(RedisModule_KeyAtPos)(RedisModuleCtx *ctx, int pos); unsigned long long REDISMODULE_API_FUNC(RedisModule_GetClientId)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_GetClientInfoById)(void *ci, uint64_t id); +int REDISMODULE_API_FUNC(RedisModule_PublishMessage)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message); int REDISMODULE_API_FUNC(RedisModule_GetContextFlags)(RedisModuleCtx *ctx); void *REDISMODULE_API_FUNC(RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes); RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods); @@ -619,6 +623,9 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(StringTruncate); REDISMODULE_GET_API(GetExpire); REDISMODULE_GET_API(SetExpire); + REDISMODULE_GET_API(ResetDataset); + REDISMODULE_GET_API(DbSize); + REDISMODULE_GET_API(RandomKey); REDISMODULE_GET_API(ZsetAdd); REDISMODULE_GET_API(ZsetIncrby); REDISMODULE_GET_API(ZsetScore); @@ -705,6 +712,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(InfoAddFieldLongLong); REDISMODULE_GET_API(InfoAddFieldULongLong); REDISMODULE_GET_API(GetClientInfoById); + REDISMODULE_GET_API(PublishMessage); REDISMODULE_GET_API(SubscribeToServerEvent); REDISMODULE_GET_API(BlockClientOnKeys); REDISMODULE_GET_API(SignalKeyAsReady); diff --git a/src/server.h b/src/server.h index f724f7d64..4287d1adf 100644 --- a/src/server.h +++ b/src/server.h @@ -1862,6 +1862,7 @@ void aofRewriteBufferReset(void); unsigned long aofRewriteBufferSize(void); ssize_t aofReadDiffFromParent(void); void killAppendOnlyChild(void); +void restartAOFAfterSYNC(); /* Child info */ void openChildInfoPipe(void); @@ -2101,6 +2102,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o); #define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */ long long emptyDb(int dbnum, int flags, void(callback)(void*)); long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)); +void flushAllDataAndResetRDB(int flags); long long dbTotalServerKeyCount(); int selectDb(client *c, int id); -- cgit v1.2.1 From b81f486c2f0d9f8ae14fc7c0568ba59e629995d6 Mon Sep 17 00:00:00 2001 From: Guy Benoish Date: Thu, 29 Mar 2018 17:46:13 +0700 Subject: Modules: Test RedisModule_BlockClientOnKeys --- runtest-moduleapi | 1 + tests/modules/Makefile | 3 +- tests/modules/blockonkeys.c | 261 +++++++++++++++++++++++++++++++++++ tests/unit/moduleapi/blockonkeys.tcl | 85 ++++++++++++ 4 files changed, 349 insertions(+), 1 deletion(-) create mode 100644 tests/modules/blockonkeys.c create mode 100644 tests/unit/moduleapi/blockonkeys.tcl diff --git a/runtest-moduleapi b/runtest-moduleapi index 9301002c9..e48535126 100755 --- a/runtest-moduleapi +++ b/runtest-moduleapi @@ -21,4 +21,5 @@ $TCLSH tests/test_helper.tcl \ --single unit/moduleapi/propagate \ --single unit/moduleapi/hooks \ --single unit/moduleapi/misc \ +--single unit/moduleapi/blockonkeys \ "${@}" diff --git a/tests/modules/Makefile b/tests/modules/Makefile index 71c0b5ef8..9e27758a2 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -18,7 +18,8 @@ TEST_MODULES = \ infotest.so \ propagate.so \ misc.so \ - hooks.so + hooks.so \ + blockonkeys.so .PHONY: all diff --git a/tests/modules/blockonkeys.c b/tests/modules/blockonkeys.c new file mode 100644 index 000000000..959918b1c --- /dev/null +++ b/tests/modules/blockonkeys.c @@ -0,0 +1,261 @@ +#define REDISMODULE_EXPERIMENTAL_API +#include "redismodule.h" + +#include +#include +#include + +#define LIST_SIZE 1024 + +typedef struct { + long long list[LIST_SIZE]; + long long length; +} fsl_t; /* Fixed-size list */ + +static RedisModuleType *fsltype = NULL; + +fsl_t *fsl_type_create() { + fsl_t *o; + o = RedisModule_Alloc(sizeof(*o)); + o->length = 0; + return o; +} + +void fsl_type_free(fsl_t *o) { + RedisModule_Free(o); +} + +/* ========================== "fsltype" type methods ======================= */ + +void *fsl_rdb_load(RedisModuleIO *rdb, int encver) { + if (encver != 0) { + return NULL; + } + fsl_t *fsl = fsl_type_create(); + fsl->length = RedisModule_LoadUnsigned(rdb); + for (long long i = 0; i < fsl->length; i++) + fsl->list[i] = RedisModule_LoadSigned(rdb); + return fsl; +} + +void fsl_rdb_save(RedisModuleIO *rdb, void *value) { + fsl_t *fsl = value; + RedisModule_SaveUnsigned(rdb,fsl->length); + for (long long i = 0; i < fsl->length; i++) + RedisModule_SaveSigned(rdb, fsl->list[i]); +} + +void fsl_aofrw(RedisModuleIO *aof, RedisModuleString *key, void *value) { + fsl_t *fsl = value; + for (long long i = 0; i < fsl->length; i++) + RedisModule_EmitAOF(aof, "FSL.PUSH","sl", key, fsl->list[i]); +} + +void fsl_free(void *value) { + fsl_type_free(value); +} + +/* ========================== helper methods ======================= */ + +int get_fsl(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode, int create, fsl_t **fsl, int reply_on_failure) { + RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, mode); + + int type = RedisModule_KeyType(key); + if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != fsltype) { + RedisModule_CloseKey(key); + if (reply_on_failure) + RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); + return 0; + } + + /* Create an empty value object if the key is currently empty. */ + if (type == REDISMODULE_KEYTYPE_EMPTY) { + if (!create) { + /* Key is empty but we cannot create */ + RedisModule_CloseKey(key); + *fsl = NULL; + return 1; + } + *fsl = fsl_type_create(); + RedisModule_ModuleTypeSetValue(key, fsltype, *fsl); + } else { + *fsl = RedisModule_ModuleTypeGetValue(key); + } + + RedisModule_CloseKey(key); + return 1; +} + +/* ========================== commands ======================= */ + +/* FSL.PUSH - Push an integer to the fixed-size list (to the right). + * It must be greater than the element in the head of the list. */ +int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 3) + return RedisModule_WrongArity(ctx); + + long long ele; + if (RedisModule_StringToLongLong(argv[2],&ele) != REDISMODULE_OK) + return RedisModule_ReplyWithError(ctx,"ERR invalid integer"); + + fsl_t *fsl; + if (!get_fsl(ctx, argv[1], REDISMODULE_WRITE, 1, &fsl, 1)) + return REDISMODULE_OK; + + if (fsl->length == LIST_SIZE) + return RedisModule_ReplyWithError(ctx,"ERR list is full"); + + if (fsl->length != 0 && fsl->list[fsl->length-1] >= ele) + return RedisModule_ReplyWithError(ctx,"ERR new element has to be greater than the head element"); + + fsl->list[fsl->length++] = ele; + + if (fsl->length >= 2) + RedisModule_SignalKeyAsReady(ctx, argv[1]); + + return RedisModule_ReplyWithSimpleString(ctx, "OK"); +} + +int bpop2_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx); + + fsl_t *fsl; + if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0)) + return REDISMODULE_ERR; + + if (!fsl || fsl->length < 2) + return REDISMODULE_ERR; + + RedisModule_ReplyWithArray(ctx, 2); + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + return REDISMODULE_OK; +} + +int bpop2_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + return RedisModule_ReplyWithSimpleString(ctx, "Request timedout"); +} + + +/* FSL.BPOP2 - Block clients until list has two or more elements. + * When that happens, unblock client and pop the last two elements (from the right). */ +int fsl_bpop2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 3) + return RedisModule_WrongArity(ctx); + + long long timeout; + if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK || timeout < 0) + return RedisModule_ReplyWithError(ctx,"ERR invalid timeout"); + + fsl_t *fsl; + if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1)) + return REDISMODULE_OK; + + if (!fsl || fsl->length < 2) { + /* Key is empty or has <2 elements, we must block */ + RedisModule_BlockClientOnKeys(ctx, bpop2_reply_callback, bpop2_timeout_callback, + NULL, timeout, &argv[1], 1, NULL); + } else { + RedisModule_ReplyWithArray(ctx, 2); + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + } + + return REDISMODULE_OK; +} + +int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx); + long long gt = (long long)RedisModule_GetBlockedClientPrivateData(ctx); + + fsl_t *fsl; + if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0)) + return REDISMODULE_ERR; + + if (!fsl || fsl->list[fsl->length-1] <= gt) + return REDISMODULE_ERR; + + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + return REDISMODULE_OK; +} + +int bpopgt_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + return RedisModule_ReplyWithSimpleString(ctx, "Request timedout"); +} + +void bpopgt_free_privdata(RedisModuleCtx *ctx, void *privdata) { + /* Nothing to do because privdata is actually a 'long long', + * not a pointer to the heap */ + REDISMODULE_NOT_USED(ctx); + REDISMODULE_NOT_USED(privdata); +} + +/* FSL.BPOPGT - Block clients until list has an element greater than . + * When that happens, unblock client and pop the last element (from the right). */ +int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 4) + return RedisModule_WrongArity(ctx); + + long long gt; + if (RedisModule_StringToLongLong(argv[2],>) != REDISMODULE_OK) + return RedisModule_ReplyWithError(ctx,"ERR invalid integer"); + + long long timeout; + if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0) + return RedisModule_ReplyWithError(ctx,"ERR invalid timeout"); + + fsl_t *fsl; + if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1)) + return REDISMODULE_OK; + + if (!fsl || fsl->list[fsl->length-1] <= gt) { + /* Key is empty or has <2 elements, we must block */ + RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback, + bpopgt_free_privdata, timeout, &argv[1], 1, (void*)gt); + } else { + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + } + + return REDISMODULE_OK; +} + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + if (RedisModule_Init(ctx, "blockonkeys", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR) + return REDISMODULE_ERR; + + RedisModuleTypeMethods tm = { + .version = REDISMODULE_TYPE_METHOD_VERSION, + .rdb_load = fsl_rdb_load, + .rdb_save = fsl_rdb_save, + .aof_rewrite = fsl_aofrw, + .mem_usage = NULL, + .free = fsl_free, + .digest = NULL + }; + + fsltype = RedisModule_CreateDataType(ctx, "fsltype_t", 0, &tm); + if (fsltype == NULL) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"fsl.bpop2",fsl_bpop2,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"fsl.bpopgt",fsl_bpopgt,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + return REDISMODULE_OK; +} diff --git a/tests/unit/moduleapi/blockonkeys.tcl b/tests/unit/moduleapi/blockonkeys.tcl new file mode 100644 index 000000000..cb99ab1c9 --- /dev/null +++ b/tests/unit/moduleapi/blockonkeys.tcl @@ -0,0 +1,85 @@ +set testmodule [file normalize tests/modules/blockonkeys.so] + +start_server {tags {"modules"}} { + r module load $testmodule + + test {Module client blocked on keys (no metadata): No block} { + r del k + r fsl.push k 33 + r fsl.push k 34 + r fsl.bpop2 k 0 + } {34 33} + + test {Module client blocked on keys (no metadata): Timeout} { + r del k + set rd [redis_deferring_client] + r fsl.push k 33 + $rd fsl.bpop2 k 1 + assert_equal {Request timedout} [$rd read] + } + + test {Module client blocked on keys (no metadata): Blocked, case 1} { + r del k + set rd [redis_deferring_client] + r fsl.push k 33 + $rd fsl.bpop2 k 0 + r fsl.push k 34 + assert_equal {34 33} [$rd read] + } + + test {Module client blocked on keys (no metadata): Blocked, case 2} { + r del k + set rd [redis_deferring_client] + r fsl.push k 33 + r fsl.push k 34 + $rd fsl.bpop2 k 0 + assert_equal {34 33} [$rd read] + } + + test {Module client blocked on keys (with metadata): No block} { + r del k + r fsl.push k 34 + r fsl.bpopgt k 30 0 + } {34} + + test {Module client blocked on keys (with metadata): Timeout} { + r del k + set rd [redis_deferring_client] + r fsl.push k 33 + $rd fsl.bpopgt k 35 1 + assert_equal {Request timedout} [$rd read] + } + + test {Module client blocked on keys (with metadata): Blocked, case 1} { + r del k + set rd [redis_deferring_client] + r fsl.push k 33 + $rd fsl.bpopgt k 33 0 + r fsl.push k 34 + assert_equal {34} [$rd read] + } + + test {Module client blocked on keys (with metadata): Blocked, case 2} { + r del k + set rd [redis_deferring_client] + $rd fsl.bpopgt k 35 0 + r fsl.push k 33 + r fsl.push k 34 + r fsl.push k 35 + r fsl.push k 36 + assert_equal {36} [$rd read] + } + + test {Module client blocked on keys does not wake up on wrong type} { + r del k + set rd [redis_deferring_client] + $rd fsl.bpop2 k 0 + r lpush k 12 + r lpush k 13 + r lpush k 14 + r del k + r fsl.push k 33 + r fsl.push k 34 + assert_equal {34 33} [$rd read] + } +} -- cgit v1.2.1 From b12d2f65d660b4139b322af90b6ef60ed267210b Mon Sep 17 00:00:00 2001 From: Loris Cro Date: Mon, 4 Nov 2019 16:36:06 +0100 Subject: fix unreported overflow in autogerenared stream IDs --- src/t_stream.c | 23 +++++++++++++---------- tests/unit/type/stream.tcl | 6 ++++++ 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index ea9a620f1..58b59f521 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -173,9 +173,19 @@ int streamCompareID(streamID *a, streamID *b) { * C_ERR if an ID was given via 'use_id', but adding it failed since the * current top ID is greater or equal. */ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) { - /* If an ID was given, check that it's greater than the last entry ID - * or return an error. */ - if (use_id && streamCompareID(use_id,&s->last_id) <= 0) return C_ERR; + + /* Generate the new entry ID. */ + streamID id; + if (use_id) + id = *use_id; + else + streamNextID(&s->last_id,&id); + + /* Check that the new ID is greater than the last entry ID + * or return an error. Automatically generated IDs might + * overflow (and wrap-around) when incrementing the sequence + part. */ + if (streamCompareID(&id,&s->last_id) <= 0) return C_ERR; /* Add the new entry. */ raxIterator ri; @@ -192,13 +202,6 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_ } raxStop(&ri); - /* Generate the new entry ID. */ - streamID id; - if (use_id) - id = *use_id; - else - streamNextID(&s->last_id,&id); - /* We have to add the key into the radix tree in lexicographic order, * to do so we consider the ID as a single 128 bit number written in * big endian, so that the most significant bytes are the first ones. */ diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index a7415ae8d..aa9c5f3a9 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -79,6 +79,12 @@ start_server { assert {[streamCompareID $id2 $id3] == -1} } + test {XADD IDs correctly report an error when overflowing} { + r DEL mystream + r xadd mystream 18446744073709551615-18446744073709551615 a b + assert_error ERR* {r xadd mystream * c d} + } + test {XADD with MAXLEN option} { r DEL mystream for {set j 0} {$j < 1000} {incr j} { -- cgit v1.2.1 From 3adf10b8095475e2170831e036f67e6b2c9c6bdb Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Mon, 4 Nov 2019 19:30:31 +0200 Subject: Test coverage for new module APIs: dbsize, flushall, randomkey, lru get/set --- tests/modules/misc.c | 69 +++++++++++++++++++++++++++++++++++++++++++ tests/unit/moduleapi/misc.tcl | 19 ++++++++++++ 2 files changed, 88 insertions(+) diff --git a/tests/modules/misc.c b/tests/modules/misc.c index fd892f52c..7701a9c7c 100644 --- a/tests/modules/misc.c +++ b/tests/modules/misc.c @@ -40,6 +40,65 @@ int test_call_info(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_OK; } +int test_flushall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModule_ResetDataset(1, 0); + RedisModule_ReplyWithCString(ctx, "Ok"); + return REDISMODULE_OK; +} + +int test_dbsize(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + long long ll = RedisModule_DbSize(ctx); + RedisModule_ReplyWithLongLong(ctx, ll); + return REDISMODULE_OK; +} + +int test_randomkey(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModuleString *str = RedisModule_RandomKey(ctx); + RedisModule_ReplyWithString(ctx, str); + RedisModule_FreeString(ctx, str); + return REDISMODULE_OK; +} + +int test_getlru(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc<2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + RedisModuleString *keyname = argv[1]; + RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH); + long long lru, lfu; + RedisModule_GetLRUOrLFU(key, &lfu, &lru); + RedisModule_ReplyWithLongLong(ctx, lru); + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + +int test_setlru(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc<3) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + RedisModuleString *keyname = argv[1]; + RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, REDISMODULE_WRITE|REDISMODULE_OPEN_KEY_NOTOUCH); + long long lru; + RedisModule_StringToLongLong(argv[2], &lru); + RedisModule_SetLRUOrLFU(key, -1, lru); + RedisModule_ReplyWithCString(ctx, "Ok"); + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); @@ -50,6 +109,16 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"test.call_info", test_call_info,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.flushall", test_flushall,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.dbsize", test_dbsize,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.randomkey", test_randomkey,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.setlru", test_setlru,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.getlru", test_getlru,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; return REDISMODULE_OK; } diff --git a/tests/unit/moduleapi/misc.tcl b/tests/unit/moduleapi/misc.tcl index d392aeab0..ebfa9631f 100644 --- a/tests/unit/moduleapi/misc.tcl +++ b/tests/unit/moduleapi/misc.tcl @@ -16,4 +16,23 @@ start_server {tags {"modules"}} { assert { [string match "*cmdstat_module*" $info] } } + test {test module db commands} { + r set x foo + set key [r test.randomkey] + assert_equal $key "x" + assert_equal [r test.dbsize] 1 + r test.flushall + assert_equal [r test.dbsize] 0 + } + + test {test modle lru api} { + r set x foo + set lru [r test.getlru x] + assert { $lru <= 1 } + r test.setlru x 100 + set idle [r object idletime x] + assert { $idle >= 100 } + set lru [r test.getlru x] + assert { $lru >= 100 } + } } -- cgit v1.2.1 From 60ec2b78b3bd5905dd1405f2f8dfa94b1ec97d75 Mon Sep 17 00:00:00 2001 From: artix Date: Thu, 31 Oct 2019 16:51:30 +0100 Subject: Start support for long double in modules New API: - RedisModule_StringToLongDouble - RedisModule_CreateStringFromLongDouble - RedisModule_ReplyWithLongDouble --- src/module.c | 40 ++++++++++++++++++++++++++++++++++++++++ src/redismodule.h | 6 ++++++ 2 files changed, 46 insertions(+) diff --git a/src/module.c b/src/module.c index ad34e7b64..e78ac0c57 100644 --- a/src/module.c +++ b/src/module.c @@ -1012,6 +1012,20 @@ RedisModuleString *RM_CreateStringFromLongLong(RedisModuleCtx *ctx, long long ll return RM_CreateString(ctx,buf,len); } +/* Like RedisModule_CreatString(), but creates a string starting from a long + * double. + * + * The returned string must be released with RedisModule_FreeString() or by + * enabling automatic memory management. + * + * The passed context 'ctx' may be NULL if necessary, see the + * RedisModule_CreateString() documentation for more info. */ +RedisModuleString *RM_CreateStringFromLongDouble(RedisModuleCtx *ctx, long double ld, int humanfriendly) { + char buf[MAX_LONG_DOUBLE_CHARS]; + size_t len = ld2string(buf,sizeof(buf),ld,humanfriendly); + return RM_CreateString(ctx,buf,len); +} + /* Like RedisModule_CreatString(), but creates a string starting from another * RedisModuleString. * @@ -1116,6 +1130,14 @@ int RM_StringToDouble(const RedisModuleString *str, double *d) { return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR; } +/* Convert the string into a long double, storing it at `*ld`. + * Returns REDISMODULE_OK on success or REDISMODULE_ERR if the string is + * not a valid string representation of a double value. */ +int RM_StringToLongDouble(const RedisModuleString *str, long double *ld) { + int retval = string2ld(str->ptr,sdslen(str->ptr),ld); + return retval ? REDISMODULE_OK : REDISMODULE_ERR; +} + /* Compare two string objects, returning -1, 0 or 1 respectively if * a < b, a == b, a > b. Strings are compared byte by byte as two * binary blobs without any encoding care / collation attempt. */ @@ -1442,6 +1464,21 @@ int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) { return REDISMODULE_OK; } +/* Send a string reply obtained converting the long double 'ld' into a bulk + * string. This function is basically equivalent to converting a long double + * into a string into a C buffer, and then calling the function + * RedisModule_ReplyWithStringBuffer() with the buffer and length. + * The double string uses human readable formatting (see + * `addReplyHumanLongDouble` in networking.c). + * + * The function always returns REDISMODULE_OK. */ +int RM_ReplyWithLongDouble(RedisModuleCtx *ctx, long double ld) { + client *c = moduleGetReplyClient(ctx); + if (c == NULL) return REDISMODULE_OK; + addReplyHumanLongDouble(c, ld); + return REDISMODULE_OK; +} + /* -------------------------------------------------------------------------- * Commands replication API * -------------------------------------------------------------------------- */ @@ -6797,6 +6834,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(ReplyWithNull); REGISTER_API(ReplyWithCallReply); REGISTER_API(ReplyWithDouble); + REGISTER_API(ReplyWithLongDouble); REGISTER_API(GetSelectedDb); REGISTER_API(SelectDb); REGISTER_API(OpenKey); @@ -6807,6 +6845,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(ListPop); REGISTER_API(StringToLongLong); REGISTER_API(StringToDouble); + REGISTER_API(StringToLongDouble); REGISTER_API(Call); REGISTER_API(CallReplyProto); REGISTER_API(FreeCallReply); @@ -6818,6 +6857,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(CreateStringFromCallReply); REGISTER_API(CreateString); REGISTER_API(CreateStringFromLongLong); + REGISTER_API(CreateStringFromLongDouble); REGISTER_API(CreateStringFromString); REGISTER_API(CreateStringPrintf); REGISTER_API(FreeString); diff --git a/src/redismodule.h b/src/redismodule.h index 728c7f584..d390263b2 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -458,6 +458,7 @@ size_t REDISMODULE_API_FUNC(RedisModule_CallReplyLength)(RedisModuleCallReply *r RedisModuleCallReply *REDISMODULE_API_FUNC(RedisModule_CallReplyArrayElement)(RedisModuleCallReply *reply, size_t idx); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateString)(RedisModuleCtx *ctx, const char *ptr, size_t len); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromLongLong)(RedisModuleCtx *ctx, long long ll); +RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromLongDouble)(RedisModuleCtx *ctx, long double ld, int humanfriendly); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromString)(RedisModuleCtx *ctx, const RedisModuleString *str); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringPrintf)(RedisModuleCtx *ctx, const char *fmt, ...); void REDISMODULE_API_FUNC(RedisModule_FreeString)(RedisModuleCtx *ctx, RedisModuleString *str); @@ -475,9 +476,11 @@ int REDISMODULE_API_FUNC(RedisModule_ReplyWithEmptyString)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_ReplyWithVerbatimString)(RedisModuleCtx *ctx, const char *buf, size_t len); int REDISMODULE_API_FUNC(RedisModule_ReplyWithNull)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_ReplyWithDouble)(RedisModuleCtx *ctx, double d); +int REDISMODULE_API_FUNC(RedisModule_ReplyWithLongDouble)(RedisModuleCtx *ctx, double d); int REDISMODULE_API_FUNC(RedisModule_ReplyWithCallReply)(RedisModuleCtx *ctx, RedisModuleCallReply *reply); int REDISMODULE_API_FUNC(RedisModule_StringToLongLong)(const RedisModuleString *str, long long *ll); int REDISMODULE_API_FUNC(RedisModule_StringToDouble)(const RedisModuleString *str, double *d); +int REDISMODULE_API_FUNC(RedisModule_StringToLongDouble)(const RedisModuleString *str, long double *d); void REDISMODULE_API_FUNC(RedisModule_AutoMemory)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_Replicate)(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...); int REDISMODULE_API_FUNC(RedisModule_ReplicateVerbatim)(RedisModuleCtx *ctx); @@ -666,6 +669,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(ReplyWithNull); REDISMODULE_GET_API(ReplyWithCallReply); REDISMODULE_GET_API(ReplyWithDouble); + REDISMODULE_GET_API(ReplyWithLongDouble); REDISMODULE_GET_API(GetSelectedDb); REDISMODULE_GET_API(SelectDb); REDISMODULE_GET_API(OpenKey); @@ -676,6 +680,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(ListPop); REDISMODULE_GET_API(StringToLongLong); REDISMODULE_GET_API(StringToDouble); + REDISMODULE_GET_API(StringToLongDouble); REDISMODULE_GET_API(Call); REDISMODULE_GET_API(CallReplyProto); REDISMODULE_GET_API(FreeCallReply); @@ -687,6 +692,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(CreateStringFromCallReply); REDISMODULE_GET_API(CreateString); REDISMODULE_GET_API(CreateStringFromLongLong); + REDISMODULE_GET_API(CreateStringFromLongDouble); REDISMODULE_GET_API(CreateStringFromString); REDISMODULE_GET_API(CreateStringPrintf); REDISMODULE_GET_API(FreeString); -- cgit v1.2.1 From 060af1858dabc537854c7a9fef302e9ecd9e1ad2 Mon Sep 17 00:00:00 2001 From: artix Date: Mon, 4 Nov 2019 18:04:35 +0100 Subject: Fix RedisModule_ReplyWithLongDouble ptr definition, add tests --- src/redismodule.h | 2 +- tests/modules/misc.c | 40 +++++++++++++++++++++++++++++++++++++++- tests/unit/moduleapi/misc.tcl | 5 +++++ 3 files changed, 45 insertions(+), 2 deletions(-) diff --git a/src/redismodule.h b/src/redismodule.h index d390263b2..98939e93c 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -476,7 +476,7 @@ int REDISMODULE_API_FUNC(RedisModule_ReplyWithEmptyString)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_ReplyWithVerbatimString)(RedisModuleCtx *ctx, const char *buf, size_t len); int REDISMODULE_API_FUNC(RedisModule_ReplyWithNull)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_ReplyWithDouble)(RedisModuleCtx *ctx, double d); -int REDISMODULE_API_FUNC(RedisModule_ReplyWithLongDouble)(RedisModuleCtx *ctx, double d); +int REDISMODULE_API_FUNC(RedisModule_ReplyWithLongDouble)(RedisModuleCtx *ctx, long double d); int REDISMODULE_API_FUNC(RedisModule_ReplyWithCallReply)(RedisModuleCtx *ctx, RedisModuleCallReply *reply); int REDISMODULE_API_FUNC(RedisModule_StringToLongLong)(const RedisModuleString *str, long long *ll); int REDISMODULE_API_FUNC(RedisModule_StringToDouble)(const RedisModuleString *str, double *d); diff --git a/tests/modules/misc.c b/tests/modules/misc.c index fd892f52c..52a4c4d41 100644 --- a/tests/modules/misc.c +++ b/tests/modules/misc.c @@ -40,6 +40,43 @@ int test_call_info(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_OK; } +int test_ld_conv(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + long double ld = 0.00000000000000001L; + const char *ldstr = "0.00000000000000001"; + RedisModuleString *s1 = RedisModule_CreateStringFromLongDouble(ctx, ld, 1); + RedisModuleString *s2 = + RedisModule_CreateString(ctx, ldstr, strlen(ldstr)); + if (RedisModule_StringCompare(s1, s2) != 0) { + char err[4096]; + snprintf(err, 4096, + "Failed to convert long double to string ('%s' != '%s')", + RedisModule_StringPtrLen(s1, NULL), + RedisModule_StringPtrLen(s2, NULL)); + RedisModule_ReplyWithError(ctx, err); + goto final; + } + long double ld2 = 0; + if (RedisModule_StringToLongDouble(s2, &ld2) == REDISMODULE_ERR) { + RedisModule_ReplyWithError(ctx, + "Failed to convert string to long double"); + goto final; + } + if (ld2 != ld) { + char err[4096]; + snprintf(err, 4096, + "Failed to convert string to long double (%.40Lf != %.40Lf)", + ld2, + ld); + RedisModule_ReplyWithError(ctx, err); + goto final; + } + RedisModule_ReplyWithLongDouble(ctx, ld2); +final: + RedisModule_FreeString(ctx, s1); + RedisModule_FreeString(ctx, s2); + return REDISMODULE_OK; +} + int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); @@ -50,6 +87,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"test.call_info", test_call_info,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; - + if (RedisModule_CreateCommand(ctx,"test.ld_conversion", test_ld_conv, "",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; return REDISMODULE_OK; } diff --git a/tests/unit/moduleapi/misc.tcl b/tests/unit/moduleapi/misc.tcl index d392aeab0..21529747b 100644 --- a/tests/unit/moduleapi/misc.tcl +++ b/tests/unit/moduleapi/misc.tcl @@ -16,4 +16,9 @@ start_server {tags {"modules"}} { assert { [string match "*cmdstat_module*" $info] } } + test {test long double conversions} { + set ld [r test.ld_conversion] + assert {[string match $ld "0.00000000000000001"]} + } + } -- cgit v1.2.1 From a68c19df6c884af6dfbc198bde75878ef741faf9 Mon Sep 17 00:00:00 2001 From: artix Date: Mon, 4 Nov 2019 18:12:03 +0100 Subject: RM_CreateStringFromLongDouble: use new ld2string 'mode' type --- src/module.c | 3 ++- tests/modules/misc.c | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/module.c b/src/module.c index e78ac0c57..0ee38a5eb 100644 --- a/src/module.c +++ b/src/module.c @@ -1022,7 +1022,8 @@ RedisModuleString *RM_CreateStringFromLongLong(RedisModuleCtx *ctx, long long ll * RedisModule_CreateString() documentation for more info. */ RedisModuleString *RM_CreateStringFromLongDouble(RedisModuleCtx *ctx, long double ld, int humanfriendly) { char buf[MAX_LONG_DOUBLE_CHARS]; - size_t len = ld2string(buf,sizeof(buf),ld,humanfriendly); + size_t len = ld2string(buf,sizeof(buf),ld, + (humanfriendly ? LD_STR_HUMAN : LD_STR_AUTO)); return RM_CreateString(ctx,buf,len); } diff --git a/tests/modules/misc.c b/tests/modules/misc.c index 52a4c4d41..ba0710538 100644 --- a/tests/modules/misc.c +++ b/tests/modules/misc.c @@ -6,6 +6,8 @@ #include #include +#define UNUSED(x) (void)(x) + int test_call_generic(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc<2) { @@ -41,6 +43,8 @@ int test_call_info(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) } int test_ld_conv(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); long double ld = 0.00000000000000001L; const char *ldstr = "0.00000000000000001"; RedisModuleString *s1 = RedisModule_CreateStringFromLongDouble(ctx, ld, 1); -- cgit v1.2.1 From 5350e7669e9dfe7b2b4bcf663171920c441c19e1 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Mon, 11 Jul 2016 16:47:37 +0300 Subject: Add ModuleDataType to/from string serialization. Add two new functions that leverage the RedisModuleDataType mechanism for RDB serialization/deserialization and make it possible to use it to/from arbitrary strings: * RM_SaveDataTypeToString() * RM_LoadDataTypeFromString() --- runtest-moduleapi | 1 + src/module.c | 55 +++++++++++++ src/redismodule.h | 4 + tests/modules/Makefile | 3 +- tests/modules/datatype.c | 161 ++++++++++++++++++++++++++++++++++++++ tests/unit/moduleapi/datatype.tcl | 27 +++++++ 6 files changed, 250 insertions(+), 1 deletion(-) create mode 100644 tests/modules/datatype.c create mode 100644 tests/unit/moduleapi/datatype.tcl diff --git a/runtest-moduleapi b/runtest-moduleapi index e48535126..e5a5a1200 100755 --- a/runtest-moduleapi +++ b/runtest-moduleapi @@ -22,4 +22,5 @@ $TCLSH tests/test_helper.tcl \ --single unit/moduleapi/hooks \ --single unit/moduleapi/misc \ --single unit/moduleapi/blockonkeys \ +--single unit/moduleapi/datatype "${@}" diff --git a/src/module.c b/src/module.c index ad34e7b64..ca9fb5804 100644 --- a/src/module.c +++ b/src/module.c @@ -3884,6 +3884,59 @@ void RM_DigestEndSequence(RedisModuleDigest *md) { memset(md->o,0,sizeof(md->o)); } +/* Decode a serialized representation of a module data type 'mt' from string + * 'str' and return a newly allocated value, or NULL if decoding failed. + * + * This call basically reuses the 'rdb_load' callback which module data types + * implement in order to allow a module to arbitrarily serialize/de-serialize + * keys, similar to how the Redis 'DUMP' and 'RESTORE' commands are implemented. + * + * Modules should generally use the REDISMODULE_OPTIONS_HANDLE_IO_ERRORS flag and + * make sure the de-serialization code properly checks and handles IO errors + * (freeing allocated buffers and returning a NULL). + * + * If this is NOT done, Redis will handle corrupted (or just truncated) serialized + * data by producing an error message and terminating the process. + */ + +void *RM_LoadDataTypeFromString(const RedisModuleString *str, const moduleType *mt) { + rio payload; + RedisModuleIO io; + + rioInitWithBuffer(&payload, str->ptr); + moduleInitIOContext(io,(moduleType *)mt,&payload,NULL); + + /* All RM_Save*() calls always write a version 2 compatible format, so we + * need to make sure we read the same. + */ + io.ver = 2; + return mt->rdb_load(&io,0); +} + +/* Encode a module data type 'mt' value 'data' into serialized form, and return it + * as a newly allocated RedisModuleString. + * + * This call basically reuses the 'rdb_save' callback which module data types + * implement in order to allow a module to arbitrarily serialize/de-serialize + * keys, similar to how the Redis 'DUMP' and 'RESTORE' commands are implemented. + */ + +RedisModuleString *RM_SaveDataTypeToString(RedisModuleCtx *ctx, void *data, const moduleType *mt) { + rio payload; + RedisModuleIO io; + + rioInitWithBuffer(&payload,sdsempty()); + moduleInitIOContext(io,(moduleType *)mt,&payload,NULL); + mt->rdb_save(&io,data); + if (io.error) { + return NULL; + } else { + robj *str = createObject(OBJ_STRING,payload.io.buffer.ptr); + autoMemoryAdd(ctx,REDISMODULE_AM_STRING,str); + return str; + } +} + /* -------------------------------------------------------------------------- * AOF API for modules data types * -------------------------------------------------------------------------- */ @@ -6876,6 +6929,8 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(LoadFloat); REGISTER_API(SaveLongDouble); REGISTER_API(LoadLongDouble); + REGISTER_API(SaveDataTypeToString); + REGISTER_API(LoadDataTypeFromString); REGISTER_API(EmitAOF); REGISTER_API(Log); REGISTER_API(LogIOError); diff --git a/src/redismodule.h b/src/redismodule.h index 728c7f584..21f3988cf 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -537,6 +537,8 @@ void REDISMODULE_API_FUNC(RedisModule_SaveFloat)(RedisModuleIO *io, float value) float REDISMODULE_API_FUNC(RedisModule_LoadFloat)(RedisModuleIO *io); void REDISMODULE_API_FUNC(RedisModule_SaveLongDouble)(RedisModuleIO *io, long double value); long double REDISMODULE_API_FUNC(RedisModule_LoadLongDouble)(RedisModuleIO *io); +void *REDISMODULE_API_FUNC(RedisModule_LoadDataTypeFromString)(const RedisModuleString *str, const RedisModuleType *mt); +RedisModuleString *REDISMODULE_API_FUNC(RedisModule_SaveDataTypeToString)(RedisModuleCtx *ctx, void *data, const RedisModuleType *mt); void REDISMODULE_API_FUNC(RedisModule_Log)(RedisModuleCtx *ctx, const char *level, const char *fmt, ...); void REDISMODULE_API_FUNC(RedisModule_LogIOError)(RedisModuleIO *io, const char *levelstr, const char *fmt, ...); void REDISMODULE_API_FUNC(RedisModule__Assert)(const char *estr, const char *file, int line); @@ -745,6 +747,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(LoadFloat); REDISMODULE_GET_API(SaveLongDouble); REDISMODULE_GET_API(LoadLongDouble); + REDISMODULE_GET_API(SaveDataTypeToString); + REDISMODULE_GET_API(LoadDataTypeFromString); REDISMODULE_GET_API(EmitAOF); REDISMODULE_GET_API(Log); REDISMODULE_GET_API(LogIOError); diff --git a/tests/modules/Makefile b/tests/modules/Makefile index 9e27758a2..26627c30c 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -19,7 +19,8 @@ TEST_MODULES = \ propagate.so \ misc.so \ hooks.so \ - blockonkeys.so + blockonkeys.so \ + datatype.so .PHONY: all diff --git a/tests/modules/datatype.c b/tests/modules/datatype.c new file mode 100644 index 000000000..7c39ab457 --- /dev/null +++ b/tests/modules/datatype.c @@ -0,0 +1,161 @@ +/* This module current tests a small subset but should be extended in the future + * for general ModuleDataType coverage. + */ + +#include "redismodule.h" + +static RedisModuleType *datatype = NULL; + +typedef struct { + long long intval; + RedisModuleString *strval; +} DataType; + +static void *datatype_load(RedisModuleIO *io, int encver) { + (void) encver; + + int intval = RedisModule_LoadSigned(io); + if (RedisModule_IsIOError(io)) return NULL; + + RedisModuleString *strval = RedisModule_LoadString(io); + if (RedisModule_IsIOError(io)) return NULL; + + DataType *dt = (DataType *) RedisModule_Alloc(sizeof(DataType)); + dt->intval = intval; + dt->strval = strval; + return dt; +} + +static void datatype_save(RedisModuleIO *io, void *value) { + DataType *dt = (DataType *) value; + RedisModule_SaveSigned(io, dt->intval); + RedisModule_SaveString(io, dt->strval); +} + +static void datatype_free(void *value) { + if (value) { + DataType *dt = (DataType *) value; + + if (dt->strval) RedisModule_FreeString(NULL, dt->strval); + RedisModule_Free(dt); + } +} + +static int datatype_set(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 4) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + long long intval; + + if (RedisModule_StringToLongLong(argv[2], &intval) != REDISMODULE_OK) { + RedisModule_ReplyWithError(ctx, "Invalid integr value"); + return REDISMODULE_OK; + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + DataType *dt = RedisModule_Calloc(sizeof(DataType), 1); + dt->intval = intval; + dt->strval = argv[3]; + RedisModule_RetainString(ctx, dt->strval); + + RedisModule_ModuleTypeSetValue(key, datatype, dt); + RedisModule_CloseKey(key); + RedisModule_ReplyWithSimpleString(ctx, "OK"); + + return REDISMODULE_OK; +} + +static int datatype_restore(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 3) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + DataType *dt = RedisModule_LoadDataTypeFromString(argv[2], datatype); + if (!dt) { + RedisModule_ReplyWithError(ctx, "Invalid data"); + return REDISMODULE_OK; + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + RedisModule_ModuleTypeSetValue(key, datatype, dt); + RedisModule_CloseKey(key); + RedisModule_ReplyWithSimpleString(ctx, "OK"); + + return REDISMODULE_OK; +} + +static int datatype_get(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ); + DataType *dt = RedisModule_ModuleTypeGetValue(key); + RedisModule_CloseKey(key); + + RedisModule_ReplyWithArray(ctx, 2); + RedisModule_ReplyWithLongLong(ctx, dt->intval); + RedisModule_ReplyWithString(ctx, dt->strval); + return REDISMODULE_OK; +} + +static int datatype_dump(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ); + DataType *dt = RedisModule_ModuleTypeGetValue(key); + RedisModule_CloseKey(key); + + RedisModuleString *reply = RedisModule_SaveDataTypeToString(ctx, dt, datatype); + if (!reply) { + RedisModule_ReplyWithError(ctx, "Failed to save"); + return REDISMODULE_OK; + } + + RedisModule_ReplyWithString(ctx, reply); + RedisModule_FreeString(ctx, reply); + return REDISMODULE_OK; +} + + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + if (RedisModule_Init(ctx,"datatype",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_HANDLE_IO_ERRORS); + + RedisModuleTypeMethods datatype_methods = { + .version = REDISMODULE_TYPE_METHOD_VERSION, + .rdb_load = datatype_load, + .rdb_save = datatype_save, + .free = datatype_free, + }; + + datatype = RedisModule_CreateDataType(ctx, "test___dt", 1, &datatype_methods); + if (datatype == NULL) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"datatype.set", datatype_set,"deny-oom",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"datatype.get", datatype_get,"",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"datatype.restore", datatype_restore,"deny-oom",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"datatype.dump", datatype_dump,"",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + return REDISMODULE_OK; +} diff --git a/tests/unit/moduleapi/datatype.tcl b/tests/unit/moduleapi/datatype.tcl new file mode 100644 index 000000000..c1da696d3 --- /dev/null +++ b/tests/unit/moduleapi/datatype.tcl @@ -0,0 +1,27 @@ +set testmodule [file normalize tests/modules/datatype.so] + +start_server {tags {"modules"}} { + r module load $testmodule + + test {DataType: Test module is sane, GET/SET work.} { + r datatype.set dtkey 100 stringval + assert {[r datatype.get dtkey] eq {100 stringval}} + } + + test {DataType: RM_SaveDataTypeToString(), RM_LoadDataTypeFromString() work} { + r datatype.set dtkey -1111 MyString + set encoded [r datatype.dump dtkey] + + r datatype.restore dtkeycopy $encoded + assert {[r datatype.get dtkeycopy] eq {-1111 MyString}} + } + + test {DataType: Handle truncated RM_LoadDataTypeFromString()} { + r datatype.set dtkey -1111 MyString + set encoded [r datatype.dump dtkey] + set truncated [string range $encoded 0 end-1] + + catch {r datatype.restore dtkeycopy $truncated} e + set e + } {*Invalid*} +} -- cgit v1.2.1 From e542132b07a76c73cd5e1dd067671afbb4c53fe6 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Mon, 4 Nov 2019 20:32:19 +0800 Subject: expires: refactoring judgment about whether a key is expired Calling lookupKey*() many times to search a key in one command may get different result. That's because lookupKey*() calls expireIfNeeded(), and delete the key when reach the expire time. So we can get an robj before the expire time, but a NULL after the expire time. The worst is that may lead to Redis crash, for example `RPOPLPUSH foo foo` the first time we get a list form `foo` and hold the pointer, but when we get `foo` again it's expired and deleted. Now we hold a freed memory, when execute rpoplpushHandlePush() redis crash. To fix it, we can refactor the judgment about whether a key is expired, using the same basetime `server.cmd_start_mstime` instead of calling mstime() everytime. --- src/db.c | 2 +- src/server.c | 1 + src/server.h | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/db.c b/src/db.c index 2c0a0cdd3..1a272faae 100644 --- a/src/db.c +++ b/src/db.c @@ -1199,7 +1199,7 @@ int keyIsExpired(redisDb *db, robj *key) { * only the first time it is accessed and not in the middle of the * script execution, making propagation to slaves / AOF consistent. * See issue #1525 on Github for more information. */ - mstime_t now = server.lua_caller ? server.lua_time_start : mstime(); + mstime_t now = server.lua_caller ? server.lua_time_start : server.cmd_start_mstime; return now > when; } diff --git a/src/server.c b/src/server.c index 8f165113d..99438ccac 100644 --- a/src/server.c +++ b/src/server.c @@ -3596,6 +3596,7 @@ int processCommand(client *c) { queueMultiCommand(c); addReply(c,shared.queued); } else { + server.cmd_start_mstime = mstime(); call(c,CMD_CALL_FULL); c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) diff --git a/src/server.h b/src/server.h index f724f7d64..08eb2eef9 100644 --- a/src/server.h +++ b/src/server.h @@ -1401,6 +1401,7 @@ struct redisServer { time_t timezone; /* Cached timezone. As set by tzset(). */ int daylight_active; /* Currently in daylight saving time. */ long long mstime; /* 'unixtime' with milliseconds resolution. */ + mstime_t cmd_start_mstime; /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ -- cgit v1.2.1 From 824f5f0b7a1f32a6e6abf3d355d523d2c6e7bf09 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 5 Nov 2019 10:14:34 +0100 Subject: Update PR #6537 patch to for generality. After the thread in #6537 and thanks to the suggestions received, this commit updates the original patch in order to: 1. Solve the problem of updating the time in multiple places by updating it in call(). 2. Avoid introducing a new field but use our cached time. This required some minor refactoring to the function updating the time, and the introduction of a new cached time in microseconds in order to use less gettimeofday() calls. --- src/db.c | 8 ++++++-- src/rdb.c | 2 +- src/server.c | 36 +++++++++++++++++++++++------------- src/server.h | 7 ++++--- 4 files changed, 34 insertions(+), 19 deletions(-) diff --git a/src/db.c b/src/db.c index 1a272faae..6c0b190a8 100644 --- a/src/db.c +++ b/src/db.c @@ -1198,8 +1198,12 @@ int keyIsExpired(redisDb *db, robj *key) { * blocked to when the Lua script started. This way a key can expire * only the first time it is accessed and not in the middle of the * script execution, making propagation to slaves / AOF consistent. - * See issue #1525 on Github for more information. */ - mstime_t now = server.lua_caller ? server.lua_time_start : server.cmd_start_mstime; + * See issue #1525 on Github for more information. + * + * Outside the Lua script execution, we use the cached time server.mstime + * that is updated before commands executions in call(). */ + mstime_t now = server.lua_caller ? server.lua_time_start : + server.mstime; return now > when; } diff --git a/src/rdb.c b/src/rdb.c index f530219a4..a499362d5 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1961,7 +1961,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { /* The DB can take some non trivial amount of time to load. Update * our cached time since it is used to create and update the last * interaction time with clients and for other important things. */ - updateCachedTime(); + updateCachedTime(0); if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER) replicationSendNewlineToMaster(); loadingProgress(r->processed_bytes); diff --git a/src/server.c b/src/server.c index 99438ccac..2e5d8193b 100644 --- a/src/server.c +++ b/src/server.c @@ -1736,20 +1736,29 @@ void databasesCron(void) { /* We take a cached value of the unix time in the global state because with * virtual memory and aging there is to store the current time in objects at * every object access, and accuracy is not needed. To access a global var is - * a lot faster than calling time(NULL) */ -void updateCachedTime(void) { - server.unixtime = time(NULL); - server.mstime = mstime(); + * a lot faster than calling time(NULL). + * + * This function should be fast because it is called at every command execution + * in call(), so it is possible to decide if to update the daylight saving + * info or not using the 'update_daylight_info' argument. Normally we update + * such info only when calling this function from serverCron() but not when + * calling it from call(). */ +void updateCachedTime(int update_daylight_info) { + server.ustime = ustime(); + server.mstime = server.ustime / 1000; + server.unixtime = server.mstime / 1000; /* To get information about daylight saving time, we need to call * localtime_r and cache the result. However calling localtime_r in this * context is safe since we will never fork() while here, in the main * thread. The logging function will call a thread safe version of * localtime that has no locks. */ - struct tm tm; - time_t ut = server.unixtime; - localtime_r(&ut,&tm); - server.daylight_active = tm.tm_isdst; + if (update_daylight_info) { + struct tm tm; + time_t ut = server.unixtime; + localtime_r(&ut,&tm); + server.daylight_active = tm.tm_isdst; + } } void checkChildrenDone(void) { @@ -1838,7 +1847,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period); /* Update the time cache. */ - updateCachedTime(); + updateCachedTime(1); server.hz = server.config_hz; /* Adapt the server.hz value to the number of configured clients. If we have @@ -2252,7 +2261,7 @@ void createSharedObjects(void) { void initServerConfig(void) { int j; - updateCachedTime(); + updateCachedTime(1); getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE); server.runid[CONFIG_RUN_ID_SIZE] = '\0'; changeReplicationId(); @@ -3238,7 +3247,8 @@ void preventCommandReplication(client *c) { * */ void call(client *c, int flags) { - long long dirty, start, duration; + long long dirty; + ustime_t start, duration; int client_old_flags = c->flags; struct redisCommand *real_cmd = c->cmd; @@ -3259,7 +3269,8 @@ void call(client *c, int flags) { /* Call the command. */ dirty = server.dirty; - start = ustime(); + updateCachedTime(0); + start = server.ustime; c->cmd->proc(c); duration = ustime()-start; dirty = server.dirty-dirty; @@ -3596,7 +3607,6 @@ int processCommand(client *c) { queueMultiCommand(c); addReply(c,shared.queued); } else { - server.cmd_start_mstime = mstime(); call(c,CMD_CALL_FULL); c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) diff --git a/src/server.h b/src/server.h index 08eb2eef9..24dbc079c 100644 --- a/src/server.h +++ b/src/server.h @@ -50,6 +50,7 @@ #include typedef long long mstime_t; /* millisecond time type. */ +typedef long long ustime_t; /* microsecond time type. */ #include "ae.h" /* Event driven programming library */ #include "sds.h" /* Dynamic safe strings */ @@ -1400,8 +1401,8 @@ struct redisServer { _Atomic time_t unixtime; /* Unix time sampled every cron cycle. */ time_t timezone; /* Cached timezone. As set by tzset(). */ int daylight_active; /* Currently in daylight saving time. */ - long long mstime; /* 'unixtime' with milliseconds resolution. */ - mstime_t cmd_start_mstime; + mstime_t mstime; /* 'unixtime' in milliseconds. */ + ustime_t ustime; /* 'unixtime' in microseconds. */ /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ @@ -1997,7 +1998,7 @@ void populateCommandTable(void); void resetCommandTableStats(void); void adjustOpenFilesLimit(void); void closeListeningSockets(int unlink_unix_socket); -void updateCachedTime(void); +void updateCachedTime(int update_daylight_info); void resetServerStats(void); void activeDefragCycle(void); unsigned int getLRUClock(void); -- cgit v1.2.1 From 8b2c0f90442c0646d7265ef150dd5afa3172b86e Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 6 Nov 2019 09:57:29 +0100 Subject: Update PR #6537: use a fresh time outside call(). One problem with the solution proposed so far in #6537 is that key lookups outside a command execution via call(), still used a cached time. The cached time needed to be refreshed in multiple places, especially because of modules callbacks from timers, cluster bus, and thread safe contexts, that may use RM_Open(). In order to avoid this problem, this commit introduces the ability to detect if we are inside call(): this way we can use the reference fixed time only when we are in the context of a command execution or Lua script, but for the asynchronous lookups, we can still use mstime() to get a fresh time reference. --- src/db.c | 27 +++++++++++++++++++++------ src/server.c | 4 ++++ src/server.h | 3 ++- 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/src/db.c b/src/db.c index 6c0b190a8..5ea3087fc 100644 --- a/src/db.c +++ b/src/db.c @@ -1188,6 +1188,7 @@ void propagateExpire(redisDb *db, robj *key, int lazy) { /* Check if the key is expired. */ int keyIsExpired(redisDb *db, robj *key) { mstime_t when = getExpire(db,key); + mstime_t now; if (when < 0) return 0; /* No expire for this key */ @@ -1198,13 +1199,27 @@ int keyIsExpired(redisDb *db, robj *key) { * blocked to when the Lua script started. This way a key can expire * only the first time it is accessed and not in the middle of the * script execution, making propagation to slaves / AOF consistent. - * See issue #1525 on Github for more information. - * - * Outside the Lua script execution, we use the cached time server.mstime - * that is updated before commands executions in call(). */ - mstime_t now = server.lua_caller ? server.lua_time_start : - server.mstime; + * See issue #1525 on Github for more information. */ + if (server.lua_caller) { + now = server.lua_time_start; + } + /* If we are in the middle of a command execution, we still want to use + * a reference time that does not change: in that case we just use the + * cached time, that we update before each call in the call() function. + * This way we avoid that commands such as RPOPLPUSH or similar, that + * may re-open the same key multiple times, can invalidate an already + * open object in a next call, if the next call will see the key expired, + * while the first did not. */ + else if (server.call_depth > 0) { + now = server.mstime; + } + /* For the other cases, we want to use the most fresh time we have. */ + else { + now = mstime(); + } + /* The key expired if the current (virtual or real) time is greater + * than the expire time of the key. */ return now > when; } diff --git a/src/server.c b/src/server.c index 2e5d8193b..acf8eebbc 100644 --- a/src/server.c +++ b/src/server.c @@ -2780,6 +2780,7 @@ void initServer(void) { server.hz = server.config_hz; server.pid = getpid(); server.current_client = NULL; + server.call_depth = 0; server.clients = listCreate(); server.clients_index = raxNew(); server.clients_to_close = listCreate(); @@ -3252,6 +3253,8 @@ void call(client *c, int flags) { int client_old_flags = c->flags; struct redisCommand *real_cmd = c->cmd; + server.call_depth++; + /* Sent the command to clients in MONITOR mode, only if the commands are * not generated from reading an AOF. */ if (listLength(server.monitors) && @@ -3377,6 +3380,7 @@ void call(client *c, int flags) { trackingRememberKeys(caller); } + server.call_depth--; server.stat_numcommands++; } diff --git a/src/server.h b/src/server.h index 24dbc079c..4a497c47c 100644 --- a/src/server.h +++ b/src/server.h @@ -1134,7 +1134,8 @@ struct redisServer { list *clients_pending_write; /* There is to write or install handler. */ list *clients_pending_read; /* Client has pending read socket buffers. */ list *slaves, *monitors; /* List of slaves and MONITORs */ - client *current_client; /* Current client, only used on crash report */ + client *current_client; /* Current client executing the command. */ + long call_depth; /* call() re-entering count. */ rax *clients_index; /* Active clients dictionary by client ID. */ int clients_paused; /* True if clients are currently paused */ mstime_t clients_pause_end_time; /* Time when we undo clients_paused */ -- cgit v1.2.1 From 089ee5979d1b3d36dad57cecaf444b0dfc00db31 Mon Sep 17 00:00:00 2001 From: "meir@redislabs.com" Date: Wed, 28 Aug 2019 18:08:07 +0300 Subject: expose used memory via redismodule api The exposed functions: 1. RedisModule_GetUsedMemoryPercentage - return the used memory 2. RedisModue_MallocSize - return for a given pointer, the amount of memory allocated for this pointer --- src/module.c | 22 ++++++++++++++++++++++ src/redismodule.h | 4 ++++ 2 files changed, 26 insertions(+) diff --git a/src/module.c b/src/module.c index ad34e7b64..75c98f12d 100644 --- a/src/module.c +++ b/src/module.c @@ -5891,6 +5891,26 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos) return REDISMODULE_OK; } +/* + * For a given pointer, return The amount of memory + * allocated for this pointer. + */ +size_t RM_MallocSize(void* ptr){ + return zmalloc_size(ptr); +} + +/* + * Return the a number between 0 to 1 indicating + * the amount of memory currently used. + * 0 - no memory limit + * 1 and above, memory limit reached. + */ +float RM_GetUsedMemoryPercentage(){ + float level; + getMaxmemoryState(NULL, NULL, NULL, &level); + return level; +} + /* -------------------------------------------------------------------------- * Module fork API * -------------------------------------------------------------------------- */ @@ -6971,4 +6991,6 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(BlockClientOnKeys); REGISTER_API(SignalKeyAsReady); REGISTER_API(GetBlockedClientReadyKey); + REGISTER_API(GetUsedMemoryPercentage); + REGISTER_API(MallocSize); } diff --git a/src/redismodule.h b/src/redismodule.h index 728c7f584..259ea2532 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -633,6 +633,8 @@ int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgDelete)(RedisModuleCommandF int REDISMODULE_API_FUNC(RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *user_data); int REDISMODULE_API_FUNC(RedisModule_ExitFromChild)(int retcode); int REDISMODULE_API_FUNC(RedisModule_KillForkChild)(int child_pid); +float REDISMODULE_API_FUNC(RedisModule_GetUsedMemoryPercentage)(); +size_t REDISMODULE_API_FUNC(RedisModule_MallocSize)(void* ptr); #endif #define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX) @@ -842,6 +844,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(Fork); REDISMODULE_GET_API(ExitFromChild); REDISMODULE_GET_API(KillForkChild); + REDISMODULE_GET_API(GetUsedMemoryPercentage); + REDISMODULE_GET_API(MallocSize); #endif if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR; -- cgit v1.2.1 From aded138a591fea32ebf0444dd2d217fc3bde7f2d Mon Sep 17 00:00:00 2001 From: "meir@redislabs.com" Date: Wed, 6 Nov 2019 11:25:21 +0200 Subject: return value between 0 to 100 instead of 0 to 1. --- src/module.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/module.c b/src/module.c index 75c98f12d..1671934b8 100644 --- a/src/module.c +++ b/src/module.c @@ -5903,12 +5903,12 @@ size_t RM_MallocSize(void* ptr){ * Return the a number between 0 to 1 indicating * the amount of memory currently used. * 0 - no memory limit - * 1 and above, memory limit reached. + * 100 and above, memory limit reached. */ float RM_GetUsedMemoryPercentage(){ float level; getMaxmemoryState(NULL, NULL, NULL, &level); - return level; + return level * 100; } /* -------------------------------------------------------------------------- -- cgit v1.2.1 From c032dc45d1945db248af32f024ce457e9c94e083 Mon Sep 17 00:00:00 2001 From: "meir@redislabs.com" Date: Wed, 6 Nov 2019 12:17:09 +0200 Subject: changed GetUsedMemoryPresentage -> GetUsedMemoryRatio and return value between 0 and 1. --- src/module.c | 6 +++--- src/redismodule.h | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/module.c b/src/module.c index 1671934b8..4dbc57ad6 100644 --- a/src/module.c +++ b/src/module.c @@ -5905,10 +5905,10 @@ size_t RM_MallocSize(void* ptr){ * 0 - no memory limit * 100 and above, memory limit reached. */ -float RM_GetUsedMemoryPercentage(){ +float RM_GetUsedMemoryRatio(){ float level; getMaxmemoryState(NULL, NULL, NULL, &level); - return level * 100; + return level; } /* -------------------------------------------------------------------------- @@ -6991,6 +6991,6 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(BlockClientOnKeys); REGISTER_API(SignalKeyAsReady); REGISTER_API(GetBlockedClientReadyKey); - REGISTER_API(GetUsedMemoryPercentage); + REGISTER_API(GetUsedMemoryRatio); REGISTER_API(MallocSize); } diff --git a/src/redismodule.h b/src/redismodule.h index 259ea2532..c09e03541 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -633,7 +633,7 @@ int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgDelete)(RedisModuleCommandF int REDISMODULE_API_FUNC(RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *user_data); int REDISMODULE_API_FUNC(RedisModule_ExitFromChild)(int retcode); int REDISMODULE_API_FUNC(RedisModule_KillForkChild)(int child_pid); -float REDISMODULE_API_FUNC(RedisModule_GetUsedMemoryPercentage)(); +float REDISMODULE_API_FUNC(RedisModule_GetUsedMemoryRatio)(); size_t REDISMODULE_API_FUNC(RedisModule_MallocSize)(void* ptr); #endif @@ -844,7 +844,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(Fork); REDISMODULE_GET_API(ExitFromChild); REDISMODULE_GET_API(KillForkChild); - REDISMODULE_GET_API(GetUsedMemoryPercentage); + REDISMODULE_GET_API(GetUsedMemoryRatio); REDISMODULE_GET_API(MallocSize); #endif -- cgit v1.2.1 From 1833d008b3af8628835b5f082c5b4b1359557893 Mon Sep 17 00:00:00 2001 From: Guy Benoish Date: Wed, 6 Nov 2019 15:48:46 +0530 Subject: Support streams in general module API functions Fixes GitHub issue #6492 Added stream support in RM_KeyType and RM_ValueLength. Also moduleDelKeyIfEmpty was updated, even though it has no effect now (It will be relevant when stream type direct API will be coded - i.e. RM_StreamAdd) --- src/module.c | 5 ++++- src/redismodule.h | 1 + src/stream.h | 1 + src/t_stream.c | 6 ++++++ 4 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/module.c b/src/module.c index ad34e7b64..6dd15c7db 100644 --- a/src/module.c +++ b/src/module.c @@ -518,7 +518,8 @@ int moduleDelKeyIfEmpty(RedisModuleKey *key) { case OBJ_LIST: isempty = listTypeLength(o) == 0; break; case OBJ_SET: isempty = setTypeSize(o) == 0; break; case OBJ_ZSET: isempty = zsetLength(o) == 0; break; - case OBJ_HASH : isempty = hashTypeLength(o) == 0; break; + case OBJ_HASH: isempty = hashTypeLength(o) == 0; break; + case OBJ_STREAM: isempty = streamLength(o) == 0; break; default: isempty = 0; } @@ -1916,6 +1917,7 @@ int RM_KeyType(RedisModuleKey *key) { case OBJ_ZSET: return REDISMODULE_KEYTYPE_ZSET; case OBJ_HASH: return REDISMODULE_KEYTYPE_HASH; case OBJ_MODULE: return REDISMODULE_KEYTYPE_MODULE; + case OBJ_STREAM: return REDISMODULE_KEYTYPE_STREAM; default: return 0; } } @@ -1933,6 +1935,7 @@ size_t RM_ValueLength(RedisModuleKey *key) { case OBJ_SET: return setTypeSize(key->value); case OBJ_ZSET: return zsetLength(key->value); case OBJ_HASH: return hashTypeLength(key->value); + case OBJ_STREAM: return streamLength(key->value); default: return 0; } } diff --git a/src/redismodule.h b/src/redismodule.h index 728c7f584..40a73454d 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -33,6 +33,7 @@ #define REDISMODULE_KEYTYPE_SET 4 #define REDISMODULE_KEYTYPE_ZSET 5 #define REDISMODULE_KEYTYPE_MODULE 6 +#define REDISMODULE_KEYTYPE_STREAM 7 /* Reply types. */ #define REDISMODULE_REPLY_UNKNOWN -1 diff --git a/src/stream.h b/src/stream.h index 1163b3527..7de769ba1 100644 --- a/src/stream.h +++ b/src/stream.h @@ -98,6 +98,7 @@ struct client; stream *streamNew(void); void freeStream(stream *s); +unsigned long streamLength(const robj *subject); size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi); void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev); int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); diff --git a/src/t_stream.c b/src/t_stream.c index 58b59f521..fc187e318 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -67,6 +67,12 @@ void freeStream(stream *s) { zfree(s); } +/* Return the length of a stream. */ +unsigned long streamLength(const robj *subject) { + stream *s = subject->ptr; + return s->length; +} + /* Generate the next stream item ID given the previous one. If the current * milliseconds Unix time is greater than the previous one, just use this * as time part and start with sequence part of zero. Otherwise we use the -- cgit v1.2.1 From e45239e3501edfa7012955a76536dff98c87e2a5 Mon Sep 17 00:00:00 2001 From: "meir@redislabs.com" Date: Wed, 6 Nov 2019 12:26:03 +0200 Subject: fix documentation --- src/module.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/module.c b/src/module.c index 4dbc57ad6..947cc3083 100644 --- a/src/module.c +++ b/src/module.c @@ -5903,7 +5903,7 @@ size_t RM_MallocSize(void* ptr){ * Return the a number between 0 to 1 indicating * the amount of memory currently used. * 0 - no memory limit - * 100 and above, memory limit reached. + * 1 and above, memory limit reached. */ float RM_GetUsedMemoryRatio(){ float level; -- cgit v1.2.1 From 9593ffde2ed92af512c5d33a2e8e6b9fed516d8c Mon Sep 17 00:00:00 2001 From: Patrick Valsecchi Date: Thu, 7 Nov 2019 08:49:19 +0100 Subject: Redis sentinel kill pubsub client connections as well When a redis instance becomes a slave, sentinel also kills pubsub clients. Closes #6545 --- src/sentinel.c | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/src/sentinel.c b/src/sentinel.c index 0490db4e9..d5f22b97f 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -465,6 +465,12 @@ struct redisCommand sentinelcmds[] = { {"hello",helloCommand,-2,"no-script fast",0,NULL,0,0,0,0,0} }; +/* List of client types that are killed when an instance becomes a slave */ +const char* killedClientTypes[] = { + "normal", + "pubsub" +}; + /* This function overwrites a few normal Redis config default with Sentinel * specific defaults. */ void initSentinelConfig(void) { @@ -3949,6 +3955,7 @@ char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) { int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) { char portstr[32]; int retval; + unsigned int curType; ll2string(portstr,sizeof(portstr),port); @@ -3993,11 +4000,14 @@ int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) { * an issue because CLIENT is variadic command, so Redis will not * recognized as a syntax error, and the transaction will not fail (but * only the unsupported command will fail). */ - retval = redisAsyncCommand(ri->link->cc, - sentinelDiscardReplyCallback, ri, "%s KILL TYPE normal", - sentinelInstanceMapCommand(ri,"CLIENT")); - if (retval == C_ERR) return retval; - ri->link->pending_commands++; + for (curType = 0; curType < sizeof(killedClientTypes)/sizeof(killedClientTypes[0]); ++curType) { + retval = redisAsyncCommand(ri->link->cc, + sentinelDiscardReplyCallback, ri, "%s KILL TYPE %s", + sentinelInstanceMapCommand(ri,"CLIENT"), + killedClientTypes[curType]); + if (retval == C_ERR) return retval; + ri->link->pending_commands++; + } retval = redisAsyncCommand(ri->link->cc, sentinelDiscardReplyCallback, ri, "%s", -- cgit v1.2.1 From 7059eceeb027d9ffb7cf9ae47201c8554d6e5010 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Fri, 8 Nov 2019 19:06:51 +0800 Subject: expires & blocking: handle ready keys as call() --- src/blocked.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/blocked.c b/src/blocked.c index 14c2ff830..dea4cc57a 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -514,6 +514,9 @@ void handleClientsBlockedOnKeys(void) { * we can safely call signalKeyAsReady() against this key. */ dictDelete(rl->db->ready_keys,rl->key); + server.call_depth++; + updateCachedTime(0); + /* Serve clients blocked on list key. */ robj *o = lookupKeyWrite(rl->db,rl->key); @@ -530,6 +533,8 @@ void handleClientsBlockedOnKeys(void) { serveClientsBlockedOnKeyByModule(rl); } + server.call_depth++; + /* Free this item. */ decrRefCount(rl->key); zfree(rl); -- cgit v1.2.1 From 02f21113ab6c61d2c01544607b464eb501dbe8fa Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 10 Nov 2019 09:21:19 +0200 Subject: fix leak in module api rdb test recently added more reads into that function, if a later read fails, i must either free what's already allocated, or return the pointer so that the free callback will release it. --- tests/modules/testrdb.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/modules/testrdb.c b/tests/modules/testrdb.c index 8a262e8a7..7c04bb4ef 100644 --- a/tests/modules/testrdb.c +++ b/tests/modules/testrdb.c @@ -18,8 +18,11 @@ void *testrdb_type_load(RedisModuleIO *rdb, int encver) { RedisModuleString *str = RedisModule_LoadString(rdb); float f = RedisModule_LoadFloat(rdb); long double ld = RedisModule_LoadLongDouble(rdb); - if (RedisModule_IsIOError(rdb)) + if (RedisModule_IsIOError(rdb)) { + RedisModuleCtx *ctx = RedisModule_GetContextFromIO(rdb); + RedisModule_FreeString(ctx, str); return NULL; + } /* Using the values only after checking for io errors. */ assert(count==1); assert(encver==1); -- cgit v1.2.1 From 28c20b4ef95aa5f74b938681a6f78f6b92dec2a8 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 10 Nov 2019 09:04:39 +0200 Subject: rename RN_SetLRUOrLFU -> RM_SetLRU and RN_SetLFU - the API name was odd, separated to two apis one for LRU and one for LFU - the LRU idle time was in 1 second resolution, which might be ok for RDB and RESTORE, but i think modules may need higher resolution - adding tests for LFU and for handling maxmemory policy mismatch --- src/cluster.c | 2 +- src/module.c | 64 ++++++++++++++++++++++++++--------------- src/object.c | 4 +-- src/rdb.c | 2 +- src/redismodule.h | 12 +++++--- src/server.h | 2 +- tests/modules/misc.c | 66 ++++++++++++++++++++++++++++++++++++------- tests/unit/moduleapi/misc.tcl | 33 ++++++++++++++++++++-- 8 files changed, 141 insertions(+), 44 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index a7d8a02c3..9e6ddb2c4 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -4966,7 +4966,7 @@ void restoreCommand(client *c) { if (!absttl) ttl+=mstime(); setExpire(c,c->db,c->argv[1],ttl); } - objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock); + objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000); signalModifiedKey(c->db,c->argv[1]); addReply(c,shared.ok); server.dirty++; diff --git a/src/module.c b/src/module.c index ad34e7b64..cd79a5908 100644 --- a/src/module.c +++ b/src/module.c @@ -6736,35 +6736,53 @@ size_t moduleCount(void) { return dictSize(modules); } -/* Set the key LRU/LFU depending on server.maxmemory_policy. - * The lru_idle arg is idle time in seconds, and is only relevant if the - * eviction policy is LRU based. - * The lfu_freq arg is a logarithmic counter that provides an indication of - * the access frequencyonly (must be <= 255) and is only relevant if the - * eviction policy is LFU based. - * Either or both of them may be <0, in that case, nothing is set. */ -/* return value is an indication if the lru field was updated or not. */ -int RM_SetLRUOrLFU(RedisModuleKey *key, long long lfu_freq, long long lru_idle) { +/* Set the key last access time for LRU based eviction. not relevent if the + * servers's maxmemory policy is LFU based. Value is idle time in milliseconds. + * returns REDISMODULE_OK if the LRU was updated, REDISMODULE_ERR otherwise. */ +int RM_SetLRU(RedisModuleKey *key, mstime_t lru_idle) { if (!key->value) return REDISMODULE_ERR; - if (objectSetLRUOrLFU(key->value, lfu_freq, lru_idle, lru_idle>=0 ? LRU_CLOCK() : 0)) + if (objectSetLRUOrLFU(key->value, -1, lru_idle, lru_idle>=0 ? LRU_CLOCK() : 0, 1)) return REDISMODULE_OK; return REDISMODULE_ERR; } -/* Gets the key LRU or LFU (depending on the current eviction policy). - * One will be set to the appropiate return value, and the other will be set to -1. - * see RedisModule_SetLRUOrLFU for units and ranges. - * return value is an indication of success. */ -int RM_GetLRUOrLFU(RedisModuleKey *key, long long *lfu_freq, long long *lru_idle) { - *lru_idle = *lfu_freq = -1; +/* Gets the key last access time. + * Value is idletime in milliseconds or -1 if the server's eviction policy is + * LFU based. + * returns REDISMODULE_OK if when key is valid. */ +int RM_GetLRU(RedisModuleKey *key, mstime_t *lru_idle) { + *lru_idle = -1; if (!key->value) return REDISMODULE_ERR; - if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { + if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) + return REDISMODULE_OK; + *lru_idle = estimateObjectIdleTime(key->value); + return REDISMODULE_OK; +} + +/* Set the key access frequency. only relevant if the server's maxmemory policy + * is LFU based. + * The frequency is a logarithmic counter that provides an indication of + * the access frequencyonly (must be <= 255). + * returns REDISMODULE_OK if the LFU was updated, REDISMODULE_ERR otherwise. */ +int RM_SetLFU(RedisModuleKey *key, long long lfu_freq) { + if (!key->value) + return REDISMODULE_ERR; + if (objectSetLRUOrLFU(key->value, lfu_freq, -1, 0, 1)) + return REDISMODULE_OK; + return REDISMODULE_ERR; +} + +/* Gets the key access frequency or -1 if the server's eviction policy is not + * LFU based. + * returns REDISMODULE_OK if when key is valid. */ +int RM_GetLFU(RedisModuleKey *key, long long *lfu_freq) { + *lfu_freq = -1; + if (!key->value) + return REDISMODULE_ERR; + if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) *lfu_freq = LFUDecrAndReturn(key->value); - } else { - *lru_idle = estimateObjectIdleTime(key->value)/1000; - } return REDISMODULE_OK; } @@ -6966,8 +6984,10 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(GetClientInfoById); REGISTER_API(PublishMessage); REGISTER_API(SubscribeToServerEvent); - REGISTER_API(SetLRUOrLFU); - REGISTER_API(GetLRUOrLFU); + REGISTER_API(SetLRU); + REGISTER_API(GetLRU); + REGISTER_API(SetLFU); + REGISTER_API(GetLFU); REGISTER_API(BlockClientOnKeys); REGISTER_API(SignalKeyAsReady); REGISTER_API(GetBlockedClientReadyKey); diff --git a/src/object.c b/src/object.c index 5e9a99dec..c6d89bfa7 100644 --- a/src/object.c +++ b/src/object.c @@ -1210,7 +1210,7 @@ sds getMemoryDoctorReport(void) { * is MAXMEMORY_FLAG_LRU. * Either or both of them may be <0, in that case, nothing is set. */ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, - long long lru_clock) { + long long lru_clock, int lru_multiplier) { if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { if (lfu_freq >= 0) { serverAssert(lfu_freq <= 255); @@ -1222,7 +1222,7 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, * according to the LRU clock resolution this Redis * instance was compiled with (normally 1000 ms, so the * below statement will expand to lru_idle*1000/1000. */ - lru_idle = lru_idle*1000/LRU_CLOCK_RESOLUTION; + lru_idle = lru_idle*lru_multiplier/LRU_CLOCK_RESOLUTION; long lru_abs = lru_clock - lru_idle; /* Absolute access time. */ /* If the LRU field underflows (since LRU it is a wrapping * clock), the best we can do is to provide a large enough LRU diff --git a/src/rdb.c b/src/rdb.c index b569edfea..301a33642 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2239,7 +2239,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { if (expiretime != -1) setExpire(NULL,db,key,expiretime); /* Set usage information (for eviction). */ - objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock); + objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000); /* Decrement the key refcount since dbAdd() will take its * own reference. */ diff --git a/src/redismodule.h b/src/redismodule.h index 728c7f584..e388d7439 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -583,8 +583,10 @@ int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldDouble)(RedisModuleInfoCtx *ctx int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldLongLong)(RedisModuleInfoCtx *ctx, char *field, long long value); int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldULongLong)(RedisModuleInfoCtx *ctx, char *field, unsigned long long value); int REDISMODULE_API_FUNC(RedisModule_SubscribeToServerEvent)(RedisModuleCtx *ctx, RedisModuleEvent event, RedisModuleEventCallback callback); -int REDISMODULE_API_FUNC(RedisModule_SetLRUOrLFU)(RedisModuleKey *key, long long lfu_freq, long long lru_idle); -int REDISMODULE_API_FUNC(RedisModule_GetLRUOrLFU)(RedisModuleKey *key, long long *lfu_freq, long long *lru_idle); +int REDISMODULE_API_FUNC(RedisModule_SetLRU)(RedisModuleKey *key, mstime_t lru_idle); +int REDISMODULE_API_FUNC(RedisModule_GetLRU)(RedisModuleKey *key, mstime_t *lru_idle); +int REDISMODULE_API_FUNC(RedisModule_SetLFU)(RedisModuleKey *key, long long lfu_freq); +int REDISMODULE_API_FUNC(RedisModule_GetLFU)(RedisModuleKey *key, long long *lfu_freq); RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata); void REDISMODULE_API_FUNC(RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientReadyKey)(RedisModuleCtx *ctx); @@ -794,8 +796,10 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(GetClientInfoById); REDISMODULE_GET_API(PublishMessage); REDISMODULE_GET_API(SubscribeToServerEvent); - REDISMODULE_GET_API(SetLRUOrLFU); - REDISMODULE_GET_API(GetLRUOrLFU); + REDISMODULE_GET_API(SetLRU); + REDISMODULE_GET_API(GetLRU); + REDISMODULE_GET_API(SetLFU); + REDISMODULE_GET_API(GetLFU); REDISMODULE_GET_API(BlockClientOnKeys); REDISMODULE_GET_API(SignalKeyAsReady); REDISMODULE_GET_API(GetBlockedClientReadyKey); diff --git a/src/server.h b/src/server.h index 8063dc101..c9ac3003e 100644 --- a/src/server.h +++ b/src/server.h @@ -2089,7 +2089,7 @@ robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags); robj *objectCommandLookup(client *c, robj *key); robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply); int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, - long long lru_clock); + long long lru_clock, int lru_multiplier); #define LOOKUP_NONE 0 #define LOOKUP_NOTOUCH (1<<0) void dbAdd(redisDb *db, robj *key, robj *val); diff --git a/tests/modules/misc.c b/tests/modules/misc.c index 7701a9c7c..06b5af620 100644 --- a/tests/modules/misc.c +++ b/tests/modules/misc.c @@ -68,16 +68,24 @@ int test_randomkey(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_OK; } +RedisModuleKey *open_key_or_reply(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode) { + RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, mode); + if (!key) { + RedisModule_ReplyWithError(ctx, "key not found"); + return NULL; + } + return key; +} + int test_getlru(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc<2) { RedisModule_WrongArity(ctx); return REDISMODULE_OK; } - RedisModuleString *keyname = argv[1]; - RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH); - long long lru, lfu; - RedisModule_GetLRUOrLFU(key, &lfu, &lru); + RedisModuleKey *key = open_key_or_reply(ctx, argv[1], REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH); + mstime_t lru; + RedisModule_GetLRU(key, &lru); RedisModule_ReplyWithLongLong(ctx, lru); RedisModule_CloseKey(key); return REDISMODULE_OK; @@ -89,12 +97,46 @@ int test_setlru(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) RedisModule_WrongArity(ctx); return REDISMODULE_OK; } - RedisModuleString *keyname = argv[1]; - RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, REDISMODULE_WRITE|REDISMODULE_OPEN_KEY_NOTOUCH); - long long lru; - RedisModule_StringToLongLong(argv[2], &lru); - RedisModule_SetLRUOrLFU(key, -1, lru); - RedisModule_ReplyWithCString(ctx, "Ok"); + RedisModuleKey *key = open_key_or_reply(ctx, argv[1], REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH); + mstime_t lru; + if (RedisModule_StringToLongLong(argv[2], &lru) != REDISMODULE_OK) { + RedisModule_ReplyWithError(ctx, "invalid idle time"); + return REDISMODULE_OK; + } + int was_set = RedisModule_SetLRU(key, lru)==REDISMODULE_OK; + RedisModule_ReplyWithLongLong(ctx, was_set); + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + +int test_getlfu(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc<2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + RedisModuleKey *key = open_key_or_reply(ctx, argv[1], REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH); + mstime_t lfu; + RedisModule_GetLFU(key, &lfu); + RedisModule_ReplyWithLongLong(ctx, lfu); + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + +int test_setlfu(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc<3) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + RedisModuleKey *key = open_key_or_reply(ctx, argv[1], REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH); + mstime_t lfu; + if (RedisModule_StringToLongLong(argv[2], &lfu) != REDISMODULE_OK) { + RedisModule_ReplyWithError(ctx, "invalid freq"); + return REDISMODULE_OK; + } + int was_set = RedisModule_SetLFU(key, lfu)==REDISMODULE_OK; + RedisModule_ReplyWithLongLong(ctx, was_set); RedisModule_CloseKey(key); return REDISMODULE_OK; } @@ -119,6 +161,10 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"test.getlru", test_getlru,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.setlfu", test_setlfu,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.getlfu", test_getlfu,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; return REDISMODULE_OK; } diff --git a/tests/unit/moduleapi/misc.tcl b/tests/unit/moduleapi/misc.tcl index ebfa9631f..376bc2eed 100644 --- a/tests/unit/moduleapi/misc.tcl +++ b/tests/unit/moduleapi/misc.tcl @@ -26,13 +26,40 @@ start_server {tags {"modules"}} { } test {test modle lru api} { + r config set maxmemory-policy allkeys-lru r set x foo set lru [r test.getlru x] - assert { $lru <= 1 } - r test.setlru x 100 + assert { $lru <= 1000 } + set was_set [r test.setlru x 100000] + assert { $was_set == 1 } set idle [r object idletime x] assert { $idle >= 100 } set lru [r test.getlru x] - assert { $lru >= 100 } + assert { $lru >= 100000 } + r config set maxmemory-policy allkeys-lfu + set lru [r test.getlru x] + assert { $lru == -1 } + set was_set [r test.setlru x 100000] + assert { $was_set == 0 } } + r config set maxmemory-policy allkeys-lru + + test {test modle lfu api} { + r config set maxmemory-policy allkeys-lfu + r set x foo + set lfu [r test.getlfu x] + assert { $lfu >= 1 } + set was_set [r test.setlfu x 100] + assert { $was_set == 1 } + set freq [r object freq x] + assert { $freq <= 100 } + set lfu [r test.getlfu x] + assert { $lfu <= 100 } + r config set maxmemory-policy allkeys-lru + set lfu [r test.getlfu x] + assert { $lfu == -1 } + set was_set [r test.setlfu x 100] + assert { $was_set == 0 } + } + } -- cgit v1.2.1 From 11c6ce812aa32cf6a6011697cbfe8881ff9450fa Mon Sep 17 00:00:00 2001 From: "meir@redislabs.com" Date: Thu, 17 Oct 2019 15:37:01 +0300 Subject: Added scan implementation to module api. The implementation expose the following new functions: 1. RedisModule_CursorCreate - allow to create a new cursor object for keys scanning 2. RedisModule_CursorRestart - restart an existing cursor to restart the scan 3. RedisModule_CursorDestroy - destroy an existing cursor 4. RedisModule_Scan - scan keys The RedisModule_Scan function gets a cursor object, a callback and void* (used as user private data). The callback will be called for each key in the database proving the key name and the value as RedisModuleKey. --- runtest-moduleapi | 1 + src/module.c | 133 ++++++++++++++++++++++++++++++++++++++---- src/redismodule.h | 10 ++++ tests/modules/Makefile | 3 +- tests/modules/scan.c | 62 ++++++++++++++++++++ tests/unit/moduleapi/scan.tcl | 18 ++++++ 6 files changed, 215 insertions(+), 12 deletions(-) create mode 100644 tests/modules/scan.c create mode 100644 tests/unit/moduleapi/scan.tcl diff --git a/runtest-moduleapi b/runtest-moduleapi index e48535126..3eb6b21b2 100755 --- a/runtest-moduleapi +++ b/runtest-moduleapi @@ -22,4 +22,5 @@ $TCLSH tests/test_helper.tcl \ --single unit/moduleapi/hooks \ --single unit/moduleapi/misc \ --single unit/moduleapi/blockonkeys \ +--single unit/moduleapi/scan \ "${@}" diff --git a/src/module.c b/src/module.c index ad34e7b64..5758abbb6 100644 --- a/src/module.c +++ b/src/module.c @@ -1848,6 +1848,17 @@ int RM_SelectDb(RedisModuleCtx *ctx, int newid) { return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR; } +static void initializeKey(RedisModuleKey *kp, RedisModuleCtx *ctx, robj *keyname, robj *value, int mode){ + kp->ctx = ctx; + kp->db = ctx->client->db; + kp->key = keyname; + incrRefCount(keyname); + kp->value = value; + kp->iter = NULL; + kp->mode = mode; + zsetKeyReset(kp); +} + /* Return an handle representing a Redis key, so that it is possible * to call other APIs with the key handle as argument to perform * operations on the key. @@ -1878,27 +1889,24 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) { /* Setup the key handle. */ kp = zmalloc(sizeof(*kp)); - kp->ctx = ctx; - kp->db = ctx->client->db; - kp->key = keyname; - incrRefCount(keyname); - kp->value = value; - kp->iter = NULL; - kp->mode = mode; - zsetKeyReset(kp); + initializeKey(kp, ctx, keyname, value, mode); autoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp); return (void*)kp; } -/* Close a key handle. */ -void RM_CloseKey(RedisModuleKey *key) { - if (key == NULL) return; +static void closeKeyInternal(RedisModuleKey *key) { int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx); if ((key->mode & REDISMODULE_WRITE) && signal) signalModifiedKey(key->db,key->key); /* TODO: if (key->iter) RM_KeyIteratorStop(kp); */ RM_ZsetRangeStop(key); decrRefCount(key->key); +} + +/* Close a key handle. */ +void RM_CloseKey(RedisModuleKey *key) { + if (key == NULL) return; + closeKeyInternal(key); autoMemoryFreed(key->ctx,REDISMODULE_AM_KEY,key); zfree(key); } @@ -5891,6 +5899,105 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos) return REDISMODULE_OK; } +/** + * Callback for scan implementation. + * + * The keyname is owned by the caller and need to be retained if used after this function. + * + * The kp is the data and provide using the best efforts approach, in some cases it might + * not be available (in such case it will be set to NULL) and it is the user responsibility + * to handle it. + * + * The kp (if given) is owned by the caller and will be free when the callback returns + * + */ +typedef void (*RedisModuleScanCB)(void *privdata, RedisModuleString* keyname, RedisModuleKey* key); + +typedef struct { + RedisModuleCtx *ctx; + void* user_data; + RedisModuleScanCB fn; +} ScanCBData; + +typedef struct RedisModuleCursor{ + int cursor; +}RedisModuleCursor; + +void ScanCallback(void *privdata, const dictEntry *de) { + ScanCBData *data = privdata; + sds key = dictGetKey(de); + robj* val = dictGetVal(de); + RedisModuleString *keyname = createObject(OBJ_STRING,sdsdup(key)); + + /* Setup the key handle. */ + RedisModuleKey kp = {0}; + initializeKey(&kp, data->ctx, keyname, val, REDISMODULE_READ); + + data->fn(data->user_data, keyname, &kp); + + closeKeyInternal(&kp); + decrRefCount(keyname); +} + +/** + * Create a new cursor to scan keys. + */ +RedisModuleCursor* RM_CursorCreate() { + RedisModuleCursor* cursor = zmalloc(sizeof(*cursor)); + cursor->cursor = 0; + return cursor; +} + +/** + * Restart an existing cursor. The keys will be rescanned. + */ +void RM_CursorRestart(RedisModuleCursor* cursor) { + cursor->cursor = 0; +} + +/** + * Destroy the cursor struct. + */ +void RM_CursorDestroy(RedisModuleCursor* cursor) { + zfree(cursor); +} + +/** + * Scan api that allows module writer to scan all the keys and value in redis. + * The way it should be used: + * Cursor* c = RedisModule_CursorCreate(); + * while(RedisModule_Scan(ctx, c, callback, privateData)); + * RedisModule_CursorDestroy(c); + * + * It is also possible to use this api from another thread such that the GIL only have to + * be acquired durring the actuall call to RM_Scan: + * Cursor* c = RedisModule_CursorCreate(); + * RedisModule_ThreadSafeCtxLock(ctx); + * while(RedisModule_Scan(ctx, c, callback, privateData)){ + * RedisModule_ThreadSafeCtxUnlock(ctx); + * // do some background job + * RedisModule_ThreadSafeCtxLock(ctx); + * } + * RedisModule_CursorDestroy(c); + * + * The function will return 1 if there is more elements to scan and 0 otherwise. + * It is also possible to restart and existing cursor using RM_CursorRestart + */ +int RM_Scan(RedisModuleCtx *ctx, RedisModuleCursor* cursor, RedisModuleScanCB fn, void* privdata) { + if(cursor->cursor == -1){ + return 0; + } + int ret = 1; + ScanCBData data = { ctx, privdata, fn }; + cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, ScanCallback, NULL, &data); + if (cursor->cursor == 0){ + cursor->cursor = -1; + ret = 0; + } + return ret; +} + + /* -------------------------------------------------------------------------- * Module fork API * -------------------------------------------------------------------------- */ @@ -6969,6 +7076,10 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(SetLRUOrLFU); REGISTER_API(GetLRUOrLFU); REGISTER_API(BlockClientOnKeys); + REGISTER_API(Scan); + REGISTER_API(CursorCreate); + REGISTER_API(CursorDestroy); + REGISTER_API(CursorRestart); REGISTER_API(SignalKeyAsReady); REGISTER_API(GetBlockedClientReadyKey); } diff --git a/src/redismodule.h b/src/redismodule.h index 728c7f584..c74772d0f 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -392,6 +392,7 @@ typedef struct RedisModuleDictIter RedisModuleDictIter; typedef struct RedisModuleCommandFilterCtx RedisModuleCommandFilterCtx; typedef struct RedisModuleCommandFilter RedisModuleCommandFilter; typedef struct RedisModuleInfoCtx RedisModuleInfoCtx; +typedef struct RedisModuleCursor RedisModuleCursor; typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc); @@ -409,6 +410,7 @@ typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter); typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data); typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_report); +typedef void (*RedisModuleScanCB)(void *privdata, RedisModuleString* keyname, RedisModuleKey* key); #define REDISMODULE_TYPE_METHOD_VERSION 2 typedef struct RedisModuleTypeMethods { @@ -633,6 +635,10 @@ int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgDelete)(RedisModuleCommandF int REDISMODULE_API_FUNC(RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *user_data); int REDISMODULE_API_FUNC(RedisModule_ExitFromChild)(int retcode); int REDISMODULE_API_FUNC(RedisModule_KillForkChild)(int child_pid); +RedisModuleCursor* REDISMODULE_API_FUNC(RedisModule_CursorCreate)(); +void REDISMODULE_API_FUNC(RedisModule_CursorRestart)(RedisModuleCursor* cursor); +void REDISMODULE_API_FUNC(RedisModule_CursorDestroy)(RedisModuleCursor* cursor); +int REDISMODULE_API_FUNC(RedisModule_Scan)(RedisModuleCtx *ctx, RedisModuleCursor* cursor, RedisModuleScanCB fn, void* privdata); #endif #define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX) @@ -842,6 +848,10 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(Fork); REDISMODULE_GET_API(ExitFromChild); REDISMODULE_GET_API(KillForkChild); + REDISMODULE_GET_API(Scan); + REDISMODULE_GET_API(CursorCreate); + REDISMODULE_GET_API(CursorRestart); + REDISMODULE_GET_API(CursorDestroy); #endif if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR; diff --git a/tests/modules/Makefile b/tests/modules/Makefile index 9e27758a2..07c3cb829 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -19,7 +19,8 @@ TEST_MODULES = \ propagate.so \ misc.so \ hooks.so \ - blockonkeys.so + blockonkeys.so \ + scan.so .PHONY: all diff --git a/tests/modules/scan.c b/tests/modules/scan.c new file mode 100644 index 000000000..21071720a --- /dev/null +++ b/tests/modules/scan.c @@ -0,0 +1,62 @@ +#define REDISMODULE_EXPERIMENTAL_API +#include "redismodule.h" + +#include +#include +#include + +#define UNUSED(V) ((void) V) + +typedef struct scan_pd{ + size_t nkeys; + RedisModuleCtx *ctx; +} scan_pd; + +void scan_callback(void *privdata, RedisModuleString* keyname, RedisModuleKey* key){ + scan_pd* pd = privdata; + RedisModule_ReplyWithArray(pd->ctx, 2); + + RedisModule_ReplyWithString(pd->ctx, keyname); + if(key && RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_STRING){ + size_t len; + char * data = RedisModule_StringDMA(key, &len, REDISMODULE_READ); + RedisModule_ReplyWithStringBuffer(pd->ctx, data, len); + }else{ + RedisModule_ReplyWithNull(pd->ctx); + } + pd->nkeys++; +} + +int scan_keys_values(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + scan_pd pd = { + .nkeys = 0, + .ctx = ctx, + }; + + RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN); + + RedisModuleCursor* cursor = RedisModule_CursorCreate(); + while(RedisModule_Scan(ctx, cursor, scan_callback, &pd)); + RedisModule_CursorDestroy(cursor); + + RedisModule_ReplySetArrayLength(ctx, pd.nkeys); + return 0; +} + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + if (RedisModule_Init(ctx, "scan", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "scan.scankeysvalues", scan_keys_values, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + return REDISMODULE_OK; +} + + + + + diff --git a/tests/unit/moduleapi/scan.tcl b/tests/unit/moduleapi/scan.tcl new file mode 100644 index 000000000..5a77e8195 --- /dev/null +++ b/tests/unit/moduleapi/scan.tcl @@ -0,0 +1,18 @@ +set testmodule [file normalize tests/modules/scan.so] + +proc count_log_message {pattern} { + set result [exec grep -c $pattern < [srv 0 stdout]] +} + +start_server {tags {"modules"}} { + r module load $testmodule + + test {Module scan} { + # the module create a scan command which also return values + r set x 1 + r set y 2 + r set z 3 + lsort [r scan.scankeysvalues] + } {{x 1} {y 2} {z 3}} + +} \ No newline at end of file -- cgit v1.2.1 From 0f8692b4646013b7d98d4b21f86da0686546d43a Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Mon, 11 Nov 2019 13:30:37 +0200 Subject: Add RM_ScanKey to scan hash, set, zset, changes to RM_Scan API - Adding RM_ScanKey - Adding tests for RM_ScanKey - Refactoring RM_Scan API Changes in RM_Scan - cleanup in docs and coding convention - Moving out of experimantal Api - Adding ctx to scan callback - Dont use cursor of -1 as an indication of done (can be a valid cursor) - Set errno when returning 0 for various reasons - Rename Cursor to ScanCursor - Test filters key that are not strings, and opens a key if NULL --- src/module.c | 255 ++++++++++++++++++++++++++++++++---------- src/redismodule.h | 23 ++-- tests/modules/scan.c | 107 +++++++++++++----- tests/unit/moduleapi/scan.tcl | 45 ++++++-- 4 files changed, 323 insertions(+), 107 deletions(-) diff --git a/src/module.c b/src/module.c index 5758abbb6..b92a9e692 100644 --- a/src/module.c +++ b/src/module.c @@ -1848,7 +1848,8 @@ int RM_SelectDb(RedisModuleCtx *ctx, int newid) { return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR; } -static void initializeKey(RedisModuleKey *kp, RedisModuleCtx *ctx, robj *keyname, robj *value, int mode){ +/* Initialize a RedisModuleKey struct */ +static void moduleInitKey(RedisModuleKey *kp, RedisModuleCtx *ctx, robj *keyname, robj *value, int mode){ kp->ctx = ctx; kp->db = ctx->client->db; kp->key = keyname; @@ -1889,12 +1890,13 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) { /* Setup the key handle. */ kp = zmalloc(sizeof(*kp)); - initializeKey(kp, ctx, keyname, value, mode); + moduleInitKey(kp, ctx, keyname, value, mode); autoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp); return (void*)kp; } -static void closeKeyInternal(RedisModuleKey *key) { +/* Destroy a RedisModuleKey struct (freeing is the responsibility of the caller). */ +static void moduleCloseKey(RedisModuleKey *key) { int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx); if ((key->mode & REDISMODULE_WRITE) && signal) signalModifiedKey(key->db,key->key); @@ -1906,7 +1908,7 @@ static void closeKeyInternal(RedisModuleKey *key) { /* Close a key handle. */ void RM_CloseKey(RedisModuleKey *key) { if (key == NULL) return; - closeKeyInternal(key); + moduleCloseKey(key); autoMemoryFreed(key->ctx,REDISMODULE_AM_KEY,key); zfree(key); } @@ -5899,31 +5901,23 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos) return REDISMODULE_OK; } -/** - * Callback for scan implementation. - * - * The keyname is owned by the caller and need to be retained if used after this function. - * - * The kp is the data and provide using the best efforts approach, in some cases it might - * not be available (in such case it will be set to NULL) and it is the user responsibility - * to handle it. - * - * The kp (if given) is owned by the caller and will be free when the callback returns - * - */ -typedef void (*RedisModuleScanCB)(void *privdata, RedisModuleString* keyname, RedisModuleKey* key); +/* -------------------------------------------------------------------------- + * Scanning keyspace and hashes + * -------------------------------------------------------------------------- */ +typedef void (*RedisModuleScanCB)(RedisModuleCtx *ctx, RedisModuleString *keyname, RedisModuleKey *key, void *privdata); typedef struct { RedisModuleCtx *ctx; void* user_data; RedisModuleScanCB fn; } ScanCBData; -typedef struct RedisModuleCursor{ +typedef struct RedisModuleScanCursor{ int cursor; -}RedisModuleCursor; + int done; +}RedisModuleScanCursor; -void ScanCallback(void *privdata, const dictEntry *de) { +static void moduleScanCallback(void *privdata, const dictEntry *de) { ScanCBData *data = privdata; sds key = dictGetKey(de); robj* val = dictGetVal(de); @@ -5931,69 +5925,211 @@ void ScanCallback(void *privdata, const dictEntry *de) { /* Setup the key handle. */ RedisModuleKey kp = {0}; - initializeKey(&kp, data->ctx, keyname, val, REDISMODULE_READ); + moduleInitKey(&kp, data->ctx, keyname, val, REDISMODULE_READ); - data->fn(data->user_data, keyname, &kp); + data->fn(data->ctx, keyname, &kp, data->user_data); - closeKeyInternal(&kp); + moduleCloseKey(&kp); decrRefCount(keyname); } -/** - * Create a new cursor to scan keys. - */ -RedisModuleCursor* RM_CursorCreate() { - RedisModuleCursor* cursor = zmalloc(sizeof(*cursor)); +/* Create a new cursor to be used with RedisModule_Scan */ +RedisModuleScanCursor *RM_ScanCursorCreate() { + RedisModuleScanCursor* cursor = zmalloc(sizeof(*cursor)); cursor->cursor = 0; + cursor->done = 0; return cursor; } -/** - * Restart an existing cursor. The keys will be rescanned. - */ -void RM_CursorRestart(RedisModuleCursor* cursor) { +/* Restart an existing cursor. The keys will be rescanned. */ +void RM_ScanCursorRestart(RedisModuleScanCursor *cursor) { cursor->cursor = 0; + cursor->done = 0; } -/** - * Destroy the cursor struct. - */ -void RM_CursorDestroy(RedisModuleCursor* cursor) { +/* Destroy the cursor struct. */ +void RM_ScanCursorDestroy(RedisModuleScanCursor *cursor) { zfree(cursor); } -/** - * Scan api that allows module writer to scan all the keys and value in redis. +/* Scan api that allows a module to scan all the keys and value in the selected db. + * + * Callback for scan implementation. + * void scan_callback(RedisModuleCtx *ctx, RedisModuleString *keyname, RedisModuleKey *key, void *privdata); + * - ctx - the redis module context provided to for the scan. + * - keyname - owned by the caller and need to be retained if used after this function. + * - key - holds info on the key and value, it is provided as best effort, in some cases it might + * be NULL, in which case the user should (can) use RedisModule_OpenKey (and CloseKey too). + * when it is provided, it is owned by the caller and will be free when the callback returns. + * - privdata - the user data provided to RedisModule_Scan. + * * The way it should be used: - * Cursor* c = RedisModule_CursorCreate(); + * RedisModuleCursor *c = RedisModule_ScanCursorCreate(); * while(RedisModule_Scan(ctx, c, callback, privateData)); - * RedisModule_CursorDestroy(c); + * RedisModule_ScanCursorDestroy(c); * - * It is also possible to use this api from another thread such that the GIL only have to - * be acquired durring the actuall call to RM_Scan: - * Cursor* c = RedisModule_CursorCreate(); - * RedisModule_ThreadSafeCtxLock(ctx); + * It is also possible to use this API from another thread while the lock is acquired durring + * the actuall call to RM_Scan: + * RedisModuleCursor *c = RedisModule_ScanCursorCreate(); + * RedisModule_ThreadSafeContextLock(ctx); * while(RedisModule_Scan(ctx, c, callback, privateData)){ - * RedisModule_ThreadSafeCtxUnlock(ctx); + * RedisModule_ThreadSafeContextUnlock(ctx); * // do some background job - * RedisModule_ThreadSafeCtxLock(ctx); + * RedisModule_ThreadSafeContextLock(ctx); * } - * RedisModule_CursorDestroy(c); + * RedisModule_ScanCursorDestroy(c); * - * The function will return 1 if there is more elements to scan and 0 otherwise. - * It is also possible to restart and existing cursor using RM_CursorRestart - */ -int RM_Scan(RedisModuleCtx *ctx, RedisModuleCursor* cursor, RedisModuleScanCB fn, void* privdata) { - if(cursor->cursor == -1){ + * The function will return 1 if there are more elements to scan and 0 otherwise, + * possibly setting errno if the call failed. + * It is also possible to restart and existing cursor using RM_CursorRestart. */ +int RM_Scan(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanCB fn, void *privdata) { + if (cursor->done) { + errno = ENOENT; return 0; } int ret = 1; ScanCBData data = { ctx, privdata, fn }; - cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, ScanCallback, NULL, &data); - if (cursor->cursor == 0){ - cursor->cursor = -1; + cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, moduleScanCallback, NULL, &data); + if (cursor->cursor == 0) { + cursor->done = 1; ret = 0; } + errno = 0; + return ret; +} + +typedef void (*RedisModuleScanKeyCB)(RedisModuleKey *key, RedisModuleString *field, RedisModuleString *value, void *privdata); +typedef struct { + RedisModuleKey *key; + void* user_data; + RedisModuleScanKeyCB fn; +} ScanKeyCBData; + +static void moduleScanKeyCallback(void *privdata, const dictEntry *de) { + ScanKeyCBData *data = privdata; + sds key = dictGetKey(de); + robj *o = data->key->value; + robj *field = createStringObject(key, sdslen(key)); + robj *value = NULL; + if (o->type == OBJ_SET) { + value = NULL; + } else if (o->type == OBJ_HASH) { + sds val = dictGetVal(de); + value = createStringObject(val, sdslen(val)); + } else if (o->type == OBJ_ZSET) { + double *val = (double*)dictGetVal(de); + value = createStringObjectFromLongDouble(*val, 0); + } + + data->fn(data->key, field, value, data->user_data); + decrRefCount(field); + if (value) decrRefCount(value); +} + +/* Scan api that allows a module to scan the elements in a hash, set or sorted set key + * + * Callback for scan implementation. + * void scan_callback(RedisModuleKey *key, RedisModuleString* field, RedisModuleString* value, void *privdata); + * - key - the redis key context provided to for the scan. + * - field - field name, owned by the caller and need to be retained if used + * after this function. + * - value - value string or NULL for set type, owned by the caller and need to + * be retained if used after this function. + * - privdata - the user data provided to RedisModule_ScanKey. + * + * The way it should be used: + * RedisModuleCursor *c = RedisModule_ScanCursorCreate(); + * RedisModuleKey *key = RedisModule_OpenKey(...) + * while(RedisModule_ScanKey(key, c, callback, privateData)); + * RedisModule_CloseKey(key); + * RedisModule_ScanCursorDestroy(c); + * + * It is also possible to use this API from another thread while the lock is acquired durring + * the actuall call to RM_Scan, and re-opening the key each time: + * RedisModuleCursor *c = RedisModule_ScanCursorCreate(); + * RedisModule_ThreadSafeContextLock(ctx); + * RedisModuleKey *key = RedisModule_OpenKey(...) + * while(RedisModule_ScanKey(ctx, c, callback, privateData)){ + * RedisModule_CloseKey(key); + * RedisModule_ThreadSafeContextUnlock(ctx); + * // do some background job + * RedisModule_ThreadSafeContextLock(ctx); + * RedisModuleKey *key = RedisModule_OpenKey(...) + * } + * RedisModule_CloseKey(key); + * RedisModule_ScanCursorDestroy(c); + * + * The function will return 1 if there are more elements to scan and 0 otherwise, + * possibly setting errno if the call failed. + * It is also possible to restart and existing cursor using RM_CursorRestart. */ +int RM_ScanKey(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleScanKeyCB fn, void *privdata) { + if (key == NULL || key->value == NULL) { + errno = EINVAL; + return 0; + } + dict *ht = NULL; + robj *o = key->value; + if (o->type == OBJ_SET) { + if (o->encoding == OBJ_ENCODING_HT) + ht = o->ptr; + } else if (o->type == OBJ_HASH) { + if (o->encoding == OBJ_ENCODING_HT) + ht = o->ptr; + } else if (o->type == OBJ_ZSET) { + if (o->encoding == OBJ_ENCODING_SKIPLIST) + ht = ((zset *)o->ptr)->dict; + } else { + errno = EINVAL; + return 0; + } + if (cursor->done) { + errno = ENOENT; + return 0; + } + int ret = 1; + if (ht) { + ScanKeyCBData data = { key, privdata, fn }; + cursor->cursor = dictScan(ht, cursor->cursor, moduleScanKeyCallback, NULL, &data); + if (cursor->cursor == 0) { + cursor->done = 1; + ret = 0; + } + } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_INTSET) { + int pos = 0; + int64_t ll; + while(intsetGet(o->ptr,pos++,&ll)) { + robj *field = createStringObjectFromLongLong(ll); + fn(key, field, NULL, privdata); + decrRefCount(field); + } + cursor->cursor = 1; + cursor->done = 1; + ret = 0; + } else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) { + unsigned char *p = ziplistIndex(o->ptr,0); + unsigned char *vstr; + unsigned int vlen; + long long vll; + while(p) { + ziplistGet(p,&vstr,&vlen,&vll); + robj *field = (vstr != NULL) ? + createStringObject((char*)vstr,vlen) : + createStringObjectFromLongLong(vll); + p = ziplistNext(o->ptr,p); + ziplistGet(p,&vstr,&vlen,&vll); + robj *value = (vstr != NULL) ? + createStringObject((char*)vstr,vlen) : + createStringObjectFromLongLong(vll); + fn(key, field, value, privdata); + p = ziplistNext(o->ptr,p); + decrRefCount(field); + decrRefCount(value); + } + cursor->cursor = 1; + cursor->done = 1; + ret = 0; + } + errno = 0; return ret; } @@ -7076,10 +7212,11 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(SetLRUOrLFU); REGISTER_API(GetLRUOrLFU); REGISTER_API(BlockClientOnKeys); - REGISTER_API(Scan); - REGISTER_API(CursorCreate); - REGISTER_API(CursorDestroy); - REGISTER_API(CursorRestart); REGISTER_API(SignalKeyAsReady); REGISTER_API(GetBlockedClientReadyKey); + REGISTER_API(ScanCursorCreate); + REGISTER_API(ScanCursorDestroy); + REGISTER_API(ScanCursorRestart); + REGISTER_API(Scan); + REGISTER_API(ScanKey); } diff --git a/src/redismodule.h b/src/redismodule.h index c74772d0f..07e1452e1 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -392,7 +392,7 @@ typedef struct RedisModuleDictIter RedisModuleDictIter; typedef struct RedisModuleCommandFilterCtx RedisModuleCommandFilterCtx; typedef struct RedisModuleCommandFilter RedisModuleCommandFilter; typedef struct RedisModuleInfoCtx RedisModuleInfoCtx; -typedef struct RedisModuleCursor RedisModuleCursor; +typedef struct RedisModuleScanCursor RedisModuleScanCursor; typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc); @@ -410,7 +410,8 @@ typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter); typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data); typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_report); -typedef void (*RedisModuleScanCB)(void *privdata, RedisModuleString* keyname, RedisModuleKey* key); +typedef void (*RedisModuleScanCB)(RedisModuleCtx *ctx, RedisModuleString *keyname, RedisModuleKey *key, void *privdata); +typedef void (*RedisModuleScanKeyCB)(RedisModuleKey *key, RedisModuleString *field, RedisModuleString *value, void *privdata); #define REDISMODULE_TYPE_METHOD_VERSION 2 typedef struct RedisModuleTypeMethods { @@ -590,6 +591,11 @@ int REDISMODULE_API_FUNC(RedisModule_GetLRUOrLFU)(RedisModuleKey *key, long long RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata); void REDISMODULE_API_FUNC(RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientReadyKey)(RedisModuleCtx *ctx); +RedisModuleScanCursor *REDISMODULE_API_FUNC(RedisModule_ScanCursorCreate)(); +void REDISMODULE_API_FUNC(RedisModule_ScanCursorRestart)(RedisModuleScanCursor *cursor); +void REDISMODULE_API_FUNC(RedisModule_ScanCursorDestroy)(RedisModuleScanCursor *cursor); +int REDISMODULE_API_FUNC(RedisModule_Scan)(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanCB fn, void *privdata); +int REDISMODULE_API_FUNC(RedisModule_ScanKey)(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleScanKeyCB fn, void *privdata); /* Experimental APIs */ #ifdef REDISMODULE_EXPERIMENTAL_API @@ -635,10 +641,6 @@ int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgDelete)(RedisModuleCommandF int REDISMODULE_API_FUNC(RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *user_data); int REDISMODULE_API_FUNC(RedisModule_ExitFromChild)(int retcode); int REDISMODULE_API_FUNC(RedisModule_KillForkChild)(int child_pid); -RedisModuleCursor* REDISMODULE_API_FUNC(RedisModule_CursorCreate)(); -void REDISMODULE_API_FUNC(RedisModule_CursorRestart)(RedisModuleCursor* cursor); -void REDISMODULE_API_FUNC(RedisModule_CursorDestroy)(RedisModuleCursor* cursor); -int REDISMODULE_API_FUNC(RedisModule_Scan)(RedisModuleCtx *ctx, RedisModuleCursor* cursor, RedisModuleScanCB fn, void* privdata); #endif #define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX) @@ -805,6 +807,11 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(BlockClientOnKeys); REDISMODULE_GET_API(SignalKeyAsReady); REDISMODULE_GET_API(GetBlockedClientReadyKey); + REDISMODULE_GET_API(ScanCursorCreate); + REDISMODULE_GET_API(ScanCursorRestart); + REDISMODULE_GET_API(ScanCursorDestroy); + REDISMODULE_GET_API(Scan); + REDISMODULE_GET_API(ScanKey); #ifdef REDISMODULE_EXPERIMENTAL_API REDISMODULE_GET_API(GetThreadSafeContext); @@ -848,10 +855,6 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(Fork); REDISMODULE_GET_API(ExitFromChild); REDISMODULE_GET_API(KillForkChild); - REDISMODULE_GET_API(Scan); - REDISMODULE_GET_API(CursorCreate); - REDISMODULE_GET_API(CursorRestart); - REDISMODULE_GET_API(CursorDestroy); #endif if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR; diff --git a/tests/modules/scan.c b/tests/modules/scan.c index 21071720a..afede244b 100644 --- a/tests/modules/scan.c +++ b/tests/modules/scan.c @@ -1,62 +1,109 @@ -#define REDISMODULE_EXPERIMENTAL_API #include "redismodule.h" #include #include #include -#define UNUSED(V) ((void) V) +typedef struct { + size_t nkeys; +} scan_strings_pd; -typedef struct scan_pd{ - size_t nkeys; - RedisModuleCtx *ctx; -} scan_pd; - -void scan_callback(void *privdata, RedisModuleString* keyname, RedisModuleKey* key){ - scan_pd* pd = privdata; - RedisModule_ReplyWithArray(pd->ctx, 2); +void scan_strings_callback(RedisModuleCtx *ctx, RedisModuleString* keyname, RedisModuleKey* key, void *privdata) { + scan_strings_pd* pd = privdata; + int was_opened = 0; + if (!key) { + key = RedisModule_OpenKey(ctx, keyname, REDISMODULE_READ); + was_opened = 1; + } - RedisModule_ReplyWithString(pd->ctx, keyname); - if(key && RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_STRING){ + if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_STRING) { size_t len; char * data = RedisModule_StringDMA(key, &len, REDISMODULE_READ); - RedisModule_ReplyWithStringBuffer(pd->ctx, data, len); - }else{ - RedisModule_ReplyWithNull(pd->ctx); + RedisModule_ReplyWithArray(ctx, 2); + RedisModule_ReplyWithString(ctx, keyname); + RedisModule_ReplyWithStringBuffer(ctx, data, len); + pd->nkeys++; } - pd->nkeys++; + if (was_opened) + RedisModule_CloseKey(key); } -int scan_keys_values(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +int scan_strings(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - scan_pd pd = { - .nkeys = 0, - .ctx = ctx, + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + scan_strings_pd pd = { + .nkeys = 0, }; RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN); - RedisModuleCursor* cursor = RedisModule_CursorCreate(); - while(RedisModule_Scan(ctx, cursor, scan_callback, &pd)); - RedisModule_CursorDestroy(cursor); + RedisModuleScanCursor* cursor = RedisModule_ScanCursorCreate(); + while(RedisModule_Scan(ctx, cursor, scan_strings_callback, &pd)); + RedisModule_ScanCursorDestroy(cursor); RedisModule_ReplySetArrayLength(ctx, pd.nkeys); - return 0; + return REDISMODULE_OK; +} + +typedef struct { + RedisModuleCtx *ctx; + size_t nreplies; +} scan_key_pd; + +void scan_key_callback(RedisModuleKey *key, RedisModuleString* field, RedisModuleString* value, void *privdata) { + REDISMODULE_NOT_USED(key); + scan_key_pd* pd = privdata; + RedisModule_ReplyWithArray(pd->ctx, 2); + RedisModule_ReplyWithString(pd->ctx, field); + if (value) + RedisModule_ReplyWithString(pd->ctx, value); + else + RedisModule_ReplyWithNull(pd->ctx); + pd->nreplies++; +} + +int scan_key(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + scan_key_pd pd = { + .ctx = ctx, + .nreplies = 0, + }; + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ); + if (!key) { + RedisModule_ReplyWithError(ctx, "not found"); + return REDISMODULE_OK; + } + + RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN); + + RedisModuleScanCursor* cursor = RedisModule_ScanCursorCreate(); + while(RedisModule_ScanKey(key, cursor, scan_key_callback, &pd)); + RedisModule_ScanCursorDestroy(cursor); + + RedisModule_ReplySetArrayLength(ctx, pd.nreplies); + RedisModule_CloseKey(key); + return REDISMODULE_OK; } int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - UNUSED(argv); - UNUSED(argc); + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); if (RedisModule_Init(ctx, "scan", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR) return REDISMODULE_ERR; - if (RedisModule_CreateCommand(ctx, "scan.scankeysvalues", scan_keys_values, "", 0, 0, 0) == REDISMODULE_ERR) + if (RedisModule_CreateCommand(ctx, "scan.scan_strings", scan_strings, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "scan.scan_key", scan_key, "", 0, 0, 0) == REDISMODULE_ERR) return REDISMODULE_ERR; return REDISMODULE_OK; } - - - diff --git a/tests/unit/moduleapi/scan.tcl b/tests/unit/moduleapi/scan.tcl index 5a77e8195..de1672e0a 100644 --- a/tests/unit/moduleapi/scan.tcl +++ b/tests/unit/moduleapi/scan.tcl @@ -1,18 +1,47 @@ set testmodule [file normalize tests/modules/scan.so] -proc count_log_message {pattern} { - set result [exec grep -c $pattern < [srv 0 stdout]] -} - start_server {tags {"modules"}} { r module load $testmodule - test {Module scan} { - # the module create a scan command which also return values + test {Module scan keyspace} { + # the module create a scan command with filtering which also return values r set x 1 r set y 2 r set z 3 - lsort [r scan.scankeysvalues] + r hset h f v + lsort [r scan.scan_strings] } {{x 1} {y 2} {z 3}} -} \ No newline at end of file + test {Module scan hash ziplist} { + r hmset hh f1 v1 f2 v2 + lsort [r scan.scan_key hh] + } {{f1 v1} {f2 v2}} + + test {Module scan hash dict} { + r config set hash-max-ziplist-entries 2 + r hmset hh f3 v3 + lsort [r scan.scan_key hh] + } {{f1 v1} {f2 v2} {f3 v3}} + + test {Module scan zset ziplist} { + r zadd zz 1 f1 2 f2 + lsort [r scan.scan_key zz] + } {{f1 1} {f2 2}} + + test {Module scan zset dict} { + r config set zset-max-ziplist-entries 2 + r zadd zz 3 f3 + lsort [r scan.scan_key zz] + } {{f1 1} {f2 2} {f3 3}} + + test {Module scan set intset} { + r sadd ss 1 2 + lsort [r scan.scan_key ss] + } {{1 {}} {2 {}}} + + test {Module scan set dict} { + r config set set-max-intset-entries 2 + r sadd ss 3 + lsort [r scan.scan_key ss] + } {{1 {}} {2 {}} {3 {}}} +} -- cgit v1.2.1 From 0bc3dab0954e481b882a722f768ec0a5a7f725ae Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 10 Nov 2019 09:38:50 +0200 Subject: Adjustments for active defrag defaults and tuning Reduce default minimum effort, so that when fragmentation is just detected, the impact on the latency will be minor. Reduce the default maximum effort, mainly to prevent a case were a sudden massive deletions, won't trigger an aggressive defrag that will cause latency. When activedefrag is disabled mid-run, reset the 'running' info field, and clear the scan cursor, so that when it'll be re-enabled, a new fresh scan will start. Clearing the 'running' variable is important since lowering the defragger tunables mid-scan won't help, the defragger only considers new threshold when a new scan starts, and during a scan it can only become more aggressive, (when more severe fragmentation is detected), it'll never go less aggressive. So by temporarily disabling activedefrag, one can lower th the tunables. Removing the experimantal warning. --- redis.conf | 16 +++++++--------- src/defrag.c | 44 ++++++++++++++++++++++++++++++-------------- src/server.c | 4 ++-- src/server.h | 4 ++-- 4 files changed, 41 insertions(+), 27 deletions(-) diff --git a/redis.conf b/redis.conf index 0ec3321a5..fb6f34b35 100644 --- a/redis.conf +++ b/redis.conf @@ -1606,10 +1606,6 @@ rdb-save-incremental-fsync yes ########################### ACTIVE DEFRAGMENTATION ####################### # -# WARNING THIS FEATURE IS EXPERIMENTAL. However it was stress tested -# even in production and manually tested by multiple engineers for some -# time. -# # What is active defragmentation? # ------------------------------- # @@ -1649,7 +1645,7 @@ rdb-save-incremental-fsync yes # a good idea to leave the defaults untouched. # Enabled active defragmentation -# activedefrag yes +# activedefrag no # Minimum amount of fragmentation waste to start active defrag # active-defrag-ignore-bytes 100mb @@ -1660,11 +1656,13 @@ rdb-save-incremental-fsync yes # Maximum percentage of fragmentation at which we use maximum effort # active-defrag-threshold-upper 100 -# Minimal effort for defrag in CPU percentage -# active-defrag-cycle-min 5 +# Minimal effort for defrag in CPU percentage, to be used when the lower +# threshold is reached +# active-defrag-cycle-min 1 -# Maximal effort for defrag in CPU percentage -# active-defrag-cycle-max 75 +# Maximal effort for defrag in CPU percentage, to be used when the upper +# threshold is reached +# active-defrag-cycle-max 25 # Maximum number of set/hash/zset/list fields that will be processed from # the main dictionary scan diff --git a/src/defrag.c b/src/defrag.c index e794c8e41..04e57955b 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -919,10 +919,12 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) { return 0; } +/* static variables serving defragLaterStep to continue scanning a key from were we stopped last time. */ +static sds defrag_later_current_key = NULL; +static unsigned long defrag_later_cursor = 0; + /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ int defragLaterStep(redisDb *db, long long endtime) { - static sds current_key = NULL; - static unsigned long cursor = 0; unsigned int iterations = 0; unsigned long long prev_defragged = server.stat_active_defrag_hits; unsigned long long prev_scanned = server.stat_active_defrag_scanned; @@ -930,16 +932,15 @@ int defragLaterStep(redisDb *db, long long endtime) { do { /* if we're not continuing a scan from the last call or loop, start a new one */ - if (!cursor) { + if (!defrag_later_cursor) { listNode *head = listFirst(db->defrag_later); /* Move on to next key */ - if (current_key) { - serverAssert(current_key == head->value); - sdsfree(head->value); + if (defrag_later_current_key) { + serverAssert(defrag_later_current_key == head->value); listDelNode(db->defrag_later, head); - cursor = 0; - current_key = NULL; + defrag_later_cursor = 0; + defrag_later_current_key = NULL; } /* stop if we reached the last one. */ @@ -948,21 +949,21 @@ int defragLaterStep(redisDb *db, long long endtime) { return 0; /* start a new key */ - current_key = head->value; - cursor = 0; + defrag_later_current_key = head->value; + defrag_later_cursor = 0; } /* each time we enter this function we need to fetch the key from the dict again (if it still exists) */ - dictEntry *de = dictFind(db->dict, current_key); + dictEntry *de = dictFind(db->dict, defrag_later_current_key); key_defragged = server.stat_active_defrag_hits; do { int quit = 0; - if (defragLaterItem(de, &cursor, endtime)) + if (defragLaterItem(de, &defrag_later_cursor, endtime)) quit = 1; /* time is up, we didn't finish all the work */ /* Don't start a new BIG key in this loop, this is because the * next key can be a list, and scanLaterList must be done in once cycle */ - if (!cursor) + if (!defrag_later_cursor) quit = 1; /* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields @@ -982,7 +983,7 @@ int defragLaterStep(redisDb *db, long long endtime) { prev_defragged = server.stat_active_defrag_hits; prev_scanned = server.stat_active_defrag_scanned; } - } while(cursor); + } while(defrag_later_cursor); if(key_defragged != server.stat_active_defrag_hits) server.stat_active_defrag_key_hits++; else @@ -1039,6 +1040,21 @@ void activeDefragCycle(void) { mstime_t latency; int quit = 0; + if (!server.active_defrag_enabled) { + if (server.active_defrag_running) { + /* if active defrag was disabled mid-run, start from fresh next time. */ + server.active_defrag_running = 0; + if (db) + listEmpty(db->defrag_later); + defrag_later_current_key = NULL; + defrag_later_cursor = 0; + current_db = -1; + cursor = 0; + db = NULL; + } + return; + } + if (hasActiveChildProcess()) return; /* Defragging memory while there's a fork will just do damage. */ diff --git a/src/server.c b/src/server.c index 113d92cbb..f67fab632 100644 --- a/src/server.c +++ b/src/server.c @@ -1691,8 +1691,7 @@ void databasesCron(void) { } /* Defrag keys gradually. */ - if (server.active_defrag_enabled) - activeDefragCycle(); + activeDefragCycle(); /* Perform hash tables rehashing if needed, but only if there are no * other processes saving the DB on disk. Otherwise rehashing is bad @@ -2854,6 +2853,7 @@ void initServer(void) { server.db[j].id = j; server.db[j].avg_ttl = 0; server.db[j].defrag_later = listCreate(); + listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree); } evictionPoolAlloc(); /* Initialize the LRU keys pool. */ server.pubsub_channels = dictCreate(&keylistDictType,NULL); diff --git a/src/server.h b/src/server.h index a2c94a22a..cff45b5f5 100644 --- a/src/server.h +++ b/src/server.h @@ -174,8 +174,8 @@ typedef long long ustime_t; /* microsecond time type. */ #define CONFIG_DEFAULT_DEFRAG_THRESHOLD_LOWER 10 /* don't defrag when fragmentation is below 10% */ #define CONFIG_DEFAULT_DEFRAG_THRESHOLD_UPPER 100 /* maximum defrag force at 100% fragmentation */ #define CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES (100<<20) /* don't defrag if frag overhead is below 100mb */ -#define CONFIG_DEFAULT_DEFRAG_CYCLE_MIN 5 /* 5% CPU min (at lower threshold) */ -#define CONFIG_DEFAULT_DEFRAG_CYCLE_MAX 75 /* 75% CPU max (at upper threshold) */ +#define CONFIG_DEFAULT_DEFRAG_CYCLE_MIN 1 /* 1% CPU min (at lower threshold) */ +#define CONFIG_DEFAULT_DEFRAG_CYCLE_MAX 25 /* 25% CPU max (at upper threshold) */ #define CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS 1000 /* keys with more than 1000 fields will be processed separately */ #define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */ #define CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL 10 /* 10% tracking table max fill. */ -- cgit v1.2.1 From 253d9d6d12c5a19ae8faa7632068772845b4a552 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=9C=E6=AC=A2=E5=85=B0=E8=8A=B1=E5=B1=B1=E4=B8=98?= Date: Wed, 13 Nov 2019 10:14:45 +0800 Subject: Update adlist.h Update listGetFree keep format consistent --- src/adlist.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/adlist.h b/src/adlist.h index c954fac87..28b9016ce 100644 --- a/src/adlist.h +++ b/src/adlist.h @@ -66,7 +66,7 @@ typedef struct list { #define listSetMatchMethod(l,m) ((l)->match = (m)) #define listGetDupMethod(l) ((l)->dup) -#define listGetFree(l) ((l)->free) +#define listGetFreeMethod(l) ((l)->free) #define listGetMatchMethod(l) ((l)->match) /* Prototypes */ -- cgit v1.2.1 From 4a12047c61570ec3a4dbf9513b0881b41b224399 Mon Sep 17 00:00:00 2001 From: Guy Benoish Date: Wed, 13 Nov 2019 16:43:07 +0530 Subject: XADD with ID 0-0 stores an empty key Calling XADD with 0-0 or 0 would result in creating an empty key and storing it in the database. Even worse, because XADD will reply with error the action will not be replicated, creating a master-replica inconsistency --- src/t_stream.c | 8 ++++++++ tests/unit/type/stream.tcl | 6 ++++++ 2 files changed, 14 insertions(+) diff --git a/src/t_stream.c b/src/t_stream.c index 58b59f521..9bf87831f 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1220,6 +1220,14 @@ void xaddCommand(client *c) { return; } + /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating + * a new stream and have streamAppendItem fail, leaving an empty key in the + * database. */ + if (id_given && id.ms == 0 && id.seq == 0) { + addReplyError(c,"The ID specified in XADD must be greater than 0-0"); + return; + } + /* Lookup the stream at key. */ robj *o; stream *s; diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index aa9c5f3a9..a4431c654 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -123,6 +123,12 @@ start_server { assert {[r xlen mystream] == $j} } + test {XADD with ID 0-0} { + r DEL mystream + catch {r XADD mystream 0-0 k v} err + assert {[r EXISTS mystream] == 0} + } + test {XRANGE COUNT works as expected} { assert {[llength [r xrange mystream - + COUNT 10]] == 10} } -- cgit v1.2.1 From 9c76875f413190a547244e72a3f9e9bbeb6811e9 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Fri, 23 Jun 2017 10:29:49 +0300 Subject: Add RM_ModuleTypeReplaceValue. This is a light-weight replace function, useful for use cases such as realloc()ing an existing value, etc. Using RM_ModuleTypeSetValue() in such cases is wasteful and complex as it attempts to delete the old value, call its destructor, etc. --- src/module.c | 29 ++++++++++++++++++++++++++++- src/redismodule.h | 2 ++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/src/module.c b/src/module.c index ad34e7b64..0f22178c9 100644 --- a/src/module.c +++ b/src/module.c @@ -1950,7 +1950,7 @@ int RM_DeleteKey(RedisModuleKey *key) { return REDISMODULE_OK; } -/* If the key is open for writing, unlink it (that is delete it in a +/* If the key is open for writing, unlink it (that is delete it in a * non-blocking way, not reclaiming memory immediately) and setup the key to * accept new writes as an empty key (that will be created on demand). * On success REDISMODULE_OK is returned. If the key is not open for @@ -6768,6 +6768,32 @@ int RM_GetLRUOrLFU(RedisModuleKey *key, long long *lfu_freq, long long *lru_idle return REDISMODULE_OK; } +/* Replace the value assigned to a module type. + * + * The key must be open for writing, have an existing value, and have a moduleType + * that matches the one specified by the caller. + * + * Unlike RM_ModuleTypeSetValue() which will free the old value, this function + * simply swaps the old value with the new value. + * + * The function returns the old value, or NULL if any of the above conditions is + * not met. + */ +void *RM_ModuleTypeReplaceValue(RedisModuleKey *key, moduleType *mt, void *new_value) { + if (!(key->mode & REDISMODULE_WRITE) || key->iter) + return NULL; + if (!key->value || key->value->type != OBJ_MODULE) + return NULL; + + moduleValue *mv = key->value->ptr; + if (mv->type != mt) + return NULL; + + void *old_val = mv->value; + mv->value = new_value; + return old_val; +} + /* Register all the APIs we export. Keep this function at the end of the * file so that's easy to seek it to add new entries. */ void moduleRegisterCoreAPI(void) { @@ -6857,6 +6883,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(PoolAlloc); REGISTER_API(CreateDataType); REGISTER_API(ModuleTypeSetValue); + REGISTER_API(ModuleTypeReplaceValue); REGISTER_API(ModuleTypeGetType); REGISTER_API(ModuleTypeGetValue); REGISTER_API(IsIOError); diff --git a/src/redismodule.h b/src/redismodule.h index 728c7f584..8cf789fb0 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -517,6 +517,7 @@ int REDISMODULE_API_FUNC(RedisModule_GetContextFlags)(RedisModuleCtx *ctx); void *REDISMODULE_API_FUNC(RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes); RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods); int REDISMODULE_API_FUNC(RedisModule_ModuleTypeSetValue)(RedisModuleKey *key, RedisModuleType *mt, void *value); +void *REDISMODULE_API_FUNC(RedisModule_ModuleTypeReplaceValue)(RedisModuleKey *key, RedisModuleType *mt, void *new_value); RedisModuleType *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetType)(RedisModuleKey *key); void *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetValue)(RedisModuleKey *key); int REDISMODULE_API_FUNC(RedisModule_IsIOError)(RedisModuleIO *io); @@ -726,6 +727,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(PoolAlloc); REDISMODULE_GET_API(CreateDataType); REDISMODULE_GET_API(ModuleTypeSetValue); + REDISMODULE_GET_API(ModuleTypeReplaceValue); REDISMODULE_GET_API(ModuleTypeGetType); REDISMODULE_GET_API(ModuleTypeGetValue); REDISMODULE_GET_API(IsIOError); -- cgit v1.2.1 From 2d30afc45fac6d0dcb0e5e12ebdf197d4c77649a Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Thu, 14 Nov 2019 09:09:10 +0200 Subject: module docs, missing LOADING flag --- src/module.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/module.c b/src/module.c index ad34e7b64..ceb18a557 100644 --- a/src/module.c +++ b/src/module.c @@ -1749,6 +1749,8 @@ int RM_GetSelectedDb(RedisModuleCtx *ctx) { * * REDISMODULE_CTX_FLAGS_OOM_WARNING: Less than 25% of memory remains before * reaching the maxmemory level. * + * * REDISMODULE_CTX_FLAGS_LOADING: Server is loading RDB/AOF + * * * REDISMODULE_CTX_FLAGS_REPLICA_IS_STALE: No active link with the master. * * * REDISMODULE_CTX_FLAGS_REPLICA_IS_CONNECTING: The replica is trying to -- cgit v1.2.1 From c426bbf3a54939775fceac1a318f2fa22778ee08 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Thu, 14 Nov 2019 09:46:46 +0200 Subject: Slightly more efficient RM_ReplyWithEmptyString trimming talk about RESP protocol from API docs (should be independent to that anyway) --- src/module.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/module.c b/src/module.c index ad34e7b64..6a6db3fe8 100644 --- a/src/module.c +++ b/src/module.c @@ -1389,7 +1389,7 @@ int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) { int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyBulkCBuffer(c, "", 0); + addReply(c,shared.emptybulk); return REDISMODULE_OK; } @@ -1404,8 +1404,7 @@ int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len) return REDISMODULE_OK; } -/* Reply to the client with a NULL. In the RESP protocol a NULL is encoded - * as the string "$-1\r\n". +/* Reply to the client with a NULL. * * The function always returns REDISMODULE_OK. */ int RM_ReplyWithNull(RedisModuleCtx *ctx) { -- cgit v1.2.1 From 8d50a8327e9f77f70ac7c11edd41a74a193ed830 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 14 Nov 2019 12:48:54 +0100 Subject: Rax library updated. --- src/rax.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/rax.c b/src/rax.c index be474b058..29b74ae90 100644 --- a/src/rax.c +++ b/src/rax.c @@ -1673,6 +1673,7 @@ int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len) { * node, but will be our match, representing the key "f". * * So in that case, we don't seek backward. */ + it->data = raxGetData(it->node); } else { if (gt && !raxIteratorNextStep(it,0)) return 0; if (lt && !raxIteratorPrevStep(it,0)) return 0; @@ -1791,7 +1792,7 @@ int raxCompare(raxIterator *iter, const char *op, unsigned char *key, size_t key if (eq && key_len == iter->key_len) return 1; else if (lt) return iter->key_len < key_len; else if (gt) return iter->key_len > key_len; - return 0; + else return 0; /* Avoid warning, just 'eq' is handled before. */ } else if (cmp > 0) { return gt ? 1 : 0; } else /* (cmp < 0) */ { -- cgit v1.2.1 From 2f6fe5ce3adb84cfc7506577c49504a8a65aaf2b Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 14 Nov 2019 18:27:37 +0100 Subject: Expire cycle: introduce the new state needed for the new algo. --- src/db.c | 2 ++ src/expire.c | 5 +++++ src/server.c | 2 ++ src/server.h | 5 ++--- 4 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/db.c b/src/db.c index 8a7ea98ae..e47a4a681 100644 --- a/src/db.c +++ b/src/db.c @@ -1077,10 +1077,12 @@ int dbSwapDatabases(long id1, long id2) { db1->dict = db2->dict; db1->expires = db2->expires; db1->avg_ttl = db2->avg_ttl; + db1->expires_cursor = db2->expires_cursor; db2->dict = aux.dict; db2->expires = aux.expires; db2->avg_ttl = aux.avg_ttl; + db2->expires_cursor = aux.expires_cursor; /* Now we need to handle clients blocked on lists: as an effect * of swapping the two DBs, a client that was waiting for list diff --git a/src/expire.c b/src/expire.c index 598b27f96..3d0bae249 100644 --- a/src/expire.c +++ b/src/expire.c @@ -95,6 +95,10 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { * executed, where the time limit is a percentage of the REDIS_HZ period * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. */ +#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */ +#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds. */ +#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Percentage of CPU to use. */ + void activeExpireCycle(int type) { /* This function has some global state in order to continue the work * incrementally across calls. */ @@ -231,6 +235,7 @@ void activeExpireCycle(int type) { } elapsed = ustime()-start; + server.stat_expire_cycle_time_used += elapsed; latencyAddSampleIfNeeded("expire-cycle",elapsed/1000); /* Update our estimate of keys existing but yet to be expired. diff --git a/src/server.c b/src/server.c index 113d92cbb..8c8b668de 100644 --- a/src/server.c +++ b/src/server.c @@ -2745,6 +2745,7 @@ void resetServerStats(void) { server.stat_expiredkeys = 0; server.stat_expired_stale_perc = 0; server.stat_expired_time_cap_reached_count = 0; + server.stat_expire_cycle_time_used = 0; server.stat_evictedkeys = 0; server.stat_keyspace_misses = 0; server.stat_keyspace_hits = 0; @@ -2848,6 +2849,7 @@ void initServer(void) { for (j = 0; j < server.dbnum; j++) { server.db[j].dict = dictCreate(&dbDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL); + server.db[j].expires_cursor = 0; server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); diff --git a/src/server.h b/src/server.h index a2c94a22a..2b7974594 100644 --- a/src/server.h +++ b/src/server.h @@ -180,9 +180,6 @@ typedef long long ustime_t; /* microsecond time type. */ #define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */ #define CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL 10 /* 10% tracking table max fill. */ -#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */ -#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */ -#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* CPU max % for keys collection */ #define ACTIVE_EXPIRE_CYCLE_SLOW 0 #define ACTIVE_EXPIRE_CYCLE_FAST 1 @@ -721,6 +718,7 @@ typedef struct redisDb { dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; /* Database ID */ long long avg_ttl; /* Average TTL, just for stats */ + unsigned long expires_cursor; /* Cursor of the active expire cycle. */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ } redisDb; @@ -1167,6 +1165,7 @@ struct redisServer { long long stat_expiredkeys; /* Number of expired keys */ double stat_expired_stale_perc; /* Percentage of keys probably expired */ long long stat_expired_time_cap_reached_count; /* Early expire cylce stops.*/ + long long stat_expire_cycle_time_used; /* Cumulative microseconds used. */ long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */ long long stat_keyspace_hits; /* Number of successful lookups of keys */ long long stat_keyspace_misses; /* Number of failed lookups of keys */ -- cgit v1.2.1 From ffc7e509aa93c2441f8d40ceab62cd299a18f275 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 15 Nov 2019 10:38:55 +0100 Subject: Expire cycle: scan hash table buckets directly. --- src/expire.c | 97 ++++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 68 insertions(+), 29 deletions(-) diff --git a/src/expire.c b/src/expire.c index 3d0bae249..af9eaebf1 100644 --- a/src/expire.c +++ b/src/expire.c @@ -78,24 +78,40 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { * it will get more aggressive to avoid that too much memory is used by * keys that can be removed from the keyspace. * - * No more than CRON_DBS_PER_CALL databases are tested at every - * iteration. + * Every expire cycle tests multiple databases: the next call will start + * again from the next db, with the exception of exists for time limit: in that + * case we restart again from the last database we were processing. Anyway + * no more than CRON_DBS_PER_CALL databases are tested at every iteration. * - * This kind of call is used when Redis detects that timelimit_exit is - * true, so there is more work to do, and we do it more incrementally from - * the beforeSleep() function of the event loop. + * The function can perform more or less work, depending on the "type" + * argument. It can execute a "fast cycle" or a "slow cycle". The slow + * cycle is the main way we collect expired cycles: this happens with + * the "server.hz" frequency (usually 10 hertz). * - * Expire cycle type: + * However the slow cycle can exit for timeout, since it used too much time. + * For this reason the function is also invoked to perform a fast cycle + * at every event loop cycle, in the beforeSleep() function. The fast cycle + * will try to perform less work, but will do it much more often. + * + * The following are the details of the two expire cycles and their stop + * conditions: * * If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a * "fast" expire cycle that takes no longer than EXPIRE_FAST_CYCLE_DURATION * microseconds, and is not repeated again before the same amount of time. + * The cycle will also refuse to run at all if the latest slow cycle did not + * terminate because of a time limit condition. * * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is * executed, where the time limit is a percentage of the REDIS_HZ period - * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. */ + * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. In the + * fast cycle, the check of every database is interrupted once the number + * of already expired keys in the database is estimated to be lower than + * a given percentage, in order to avoid doing too much work to gain too + * little memory. + */ -#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */ +#define ACTIVE_EXPIRE_CYCLE_BUCKETS_PER_LOOP 20 /* HT buckets checked. */ #define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds. */ #define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Percentage of CPU to use. */ @@ -152,7 +168,9 @@ void activeExpireCycle(int type) { long total_expired = 0; for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) { - int expired; + /* Expired and checked in a single loop. */ + unsigned long expired, sampled; + redisDb *db = server.db+(current_db % server.dbnum); /* Increment the DB now so we are sure if we run out of time @@ -176,8 +194,8 @@ void activeExpireCycle(int type) { slots = dictSlots(db->expires); now = mstime(); - /* When there are less than 1% filled slots getting random - * keys is expensive, so stop here waiting for better times... + /* When there are less than 1% filled slots, sampling the key + * space is expensive, so stop here waiting for better times... * The dictionary will be resized asap. */ if (num && slots > DICT_HT_INITIAL_SIZE && (num*100/slots < 1)) break; @@ -185,27 +203,47 @@ void activeExpireCycle(int type) { /* The main collection cycle. Sample random keys among keys * with an expire set, checking for expired ones. */ expired = 0; + sampled = 0; ttl_sum = 0; ttl_samples = 0; - if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) - num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP; - - while (num--) { - dictEntry *de; - long long ttl; - - if ((de = dictGetRandomKey(db->expires)) == NULL) break; - ttl = dictGetSignedIntegerVal(de)-now; - if (activeExpireCycleTryExpire(db,de,now)) expired++; - if (ttl > 0) { - /* We want the average TTL of keys yet not expired. */ - ttl_sum += ttl; - ttl_samples++; + if (num > ACTIVE_EXPIRE_CYCLE_BUCKETS_PER_LOOP) + num = ACTIVE_EXPIRE_CYCLE_BUCKETS_PER_LOOP; + + /* Here we access the low level representation of the hash table + * for speed concerns: this makes this code coupled with dict.c, + * but it hardly changed in ten years. */ + while (sampled < num) { + for (int table = 0; table < 2; table++) { + if (table == 1 && !dictIsRehashing(db->expires)) break; + + unsigned long idx = db->expires_cursor; + idx &= db->expires->ht[table].sizemask; + dictEntry *de = db->expires->ht[table].table[idx]; + long long ttl; + + /* Scan the current bucket of the current table. */ + while(de) { + /* Get the next entry now since this entry may get + * deleted. */ + dictEntry *e = de; + de = de->next; + + ttl = dictGetSignedIntegerVal(e)-now; + if (activeExpireCycleTryExpire(db,e,now)) expired++; + if (ttl > 0) { + /* We want the average TTL of keys yet + * not expired. */ + ttl_sum += ttl; + ttl_samples++; + } + sampled++; + } } - total_sampled++; + db->expires_cursor++; } total_expired += expired; + total_sampled += sampled; /* Update the average TTL stats for this database. */ if (ttl_samples) { @@ -229,9 +267,10 @@ void activeExpireCycle(int type) { break; } } - /* We don't repeat the cycle if there are less than 25% of keys - * found expired in the current DB. */ - } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4); + /* We don't repeat the cycle for the current database if there are + * less than 25% of keys found expired in the current DB. */ + // printf("[%d] Expired %d, sampled %d\n", type, (int) expired, (int) sampled); + } while (expired > sampled/4); } elapsed = ustime()-start; -- cgit v1.2.1 From 27668056808192a3192bc6a79a2bec862a66c94e Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 15 Nov 2019 11:08:03 +0100 Subject: Expire cycle: tollerate less stale keys, expire cycle CPU in INFO. --- src/expire.c | 31 +++++++++++++++++++------------ src/server.c | 2 ++ 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/src/expire.c b/src/expire.c index af9eaebf1..9cb4c4a0a 100644 --- a/src/expire.c +++ b/src/expire.c @@ -111,9 +111,11 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { * little memory. */ -#define ACTIVE_EXPIRE_CYCLE_BUCKETS_PER_LOOP 20 /* HT buckets checked. */ +#define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 20 /* Keys for each DB loop. */ #define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds. */ -#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Percentage of CPU to use. */ +#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Max % of CPU to use. */ +#define ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE 10 /* % of stale keys after which + we do extra efforts. */ void activeExpireCycle(int type) { /* This function has some global state in order to continue the work @@ -133,10 +135,15 @@ void activeExpireCycle(int type) { if (type == ACTIVE_EXPIRE_CYCLE_FAST) { /* Don't start a fast cycle if the previous cycle did not exit - * for time limit. Also don't repeat a fast cycle for the same period + * for time limit, unless the percentage of estimated stale keys is + * too high. Also never repeat a fast cycle for the same period * as the fast cycle total duration itself. */ - if (!timelimit_exit) return; - if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return; + if (!timelimit_exit && server.stat_expired_stale_perc < + ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE) return; + + if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) + return; + last_fast_cycle = start; } @@ -150,8 +157,8 @@ void activeExpireCycle(int type) { if (dbs_per_call > server.dbnum || timelimit_exit) dbs_per_call = server.dbnum; - /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time - * per iteration. Since this function gets called with a frequency of + /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU + * time per iteration. Since this function gets called with a frequency of * server.hz times per second, the following is the max amount of * microseconds we can spend in this function. */ timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100; @@ -207,8 +214,8 @@ void activeExpireCycle(int type) { ttl_sum = 0; ttl_samples = 0; - if (num > ACTIVE_EXPIRE_CYCLE_BUCKETS_PER_LOOP) - num = ACTIVE_EXPIRE_CYCLE_BUCKETS_PER_LOOP; + if (num > ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP) + num = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP; /* Here we access the low level representation of the hash table * for speed concerns: this makes this code coupled with dict.c, @@ -268,9 +275,9 @@ void activeExpireCycle(int type) { } } /* We don't repeat the cycle for the current database if there are - * less than 25% of keys found expired in the current DB. */ - // printf("[%d] Expired %d, sampled %d\n", type, (int) expired, (int) sampled); - } while (expired > sampled/4); + * an acceptable amount of stale keys (logically expired but yet + * not reclained). */ + } while ((expired*100/sampled) > ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE); } elapsed = ustime()-start; diff --git a/src/server.c b/src/server.c index 8c8b668de..f834119ee 100644 --- a/src/server.c +++ b/src/server.c @@ -4270,6 +4270,7 @@ sds genRedisInfoString(char *section) { "expired_keys:%lld\r\n" "expired_stale_perc:%.2f\r\n" "expired_time_cap_reached_count:%lld\r\n" + "expire_cycle_cpu_milliseconds:%lld\r\n" "evicted_keys:%lld\r\n" "keyspace_hits:%lld\r\n" "keyspace_misses:%lld\r\n" @@ -4297,6 +4298,7 @@ sds genRedisInfoString(char *section) { server.stat_expiredkeys, server.stat_expired_stale_perc*100, server.stat_expired_time_cap_reached_count, + server.stat_expire_cycle_time_used/1000, server.stat_evictedkeys, server.stat_keyspace_hits, server.stat_keyspace_misses, -- cgit v1.2.1 From 84b01f63dbe28d5541e09313d35deacf4344ab16 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 18 Nov 2019 11:30:05 +0100 Subject: Expire cycle: introduce configurable effort. --- src/expire.c | 37 ++++++++++++++++++++++++++++--------- src/server.c | 1 + src/server.h | 2 ++ 3 files changed, 31 insertions(+), 9 deletions(-) diff --git a/src/expire.c b/src/expire.c index 9cb4c4a0a..dd8a42726 100644 --- a/src/expire.c +++ b/src/expire.c @@ -109,6 +109,9 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { * of already expired keys in the database is estimated to be lower than * a given percentage, in order to avoid doing too much work to gain too * little memory. + * + * The configured expire "effort" will modify the baseline parameters in + * order to do more work in both the fast and slow expire cycles. */ #define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 20 /* Keys for each DB loop. */ @@ -118,6 +121,21 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { we do extra efforts. */ void activeExpireCycle(int type) { + /* Adjust the running parameters according to the configured expire + * effort. The default effort is 1, and the maximum configurable effort + * is 10. */ + unsigned long + effort = server.active_expire_effort, + config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP + + ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4*effort, + config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION * + ACTIVE_EXPIRE_CYCLE_FAST_DURATION/4*effort, + config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC + + 2*effort, + config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE- + effort; + if (config_cycle_acceptable_stale < 1) config_cycle_acceptable_stale = 1; + /* This function has some global state in order to continue the work * incrementally across calls. */ static unsigned int current_db = 0; /* Last DB tested. */ @@ -138,10 +156,11 @@ void activeExpireCycle(int type) { * for time limit, unless the percentage of estimated stale keys is * too high. Also never repeat a fast cycle for the same period * as the fast cycle total duration itself. */ - if (!timelimit_exit && server.stat_expired_stale_perc < - ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE) return; + if (!timelimit_exit && + server.stat_expired_stale_perc < config_cycle_acceptable_stale) + return; - if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) + if (start < last_fast_cycle + (long long)config_cycle_fast_duration*2) return; last_fast_cycle = start; @@ -157,16 +176,16 @@ void activeExpireCycle(int type) { if (dbs_per_call > server.dbnum || timelimit_exit) dbs_per_call = server.dbnum; - /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU + /* We can use at max 'config_cycle_slow_time_perc' percentage of CPU * time per iteration. Since this function gets called with a frequency of * server.hz times per second, the following is the max amount of * microseconds we can spend in this function. */ - timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100; + timelimit = config_cycle_slow_time_perc*1000000/server.hz/100; timelimit_exit = 0; if (timelimit <= 0) timelimit = 1; if (type == ACTIVE_EXPIRE_CYCLE_FAST) - timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */ + timelimit = config_cycle_fast_duration; /* in microseconds. */ /* Accumulate some global stats as we expire keys, to have some idea * about the number of keys that are already logically expired, but still @@ -214,8 +233,8 @@ void activeExpireCycle(int type) { ttl_sum = 0; ttl_samples = 0; - if (num > ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP) - num = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP; + if (num > config_keys_per_loop) + num = config_keys_per_loop; /* Here we access the low level representation of the hash table * for speed concerns: this makes this code coupled with dict.c, @@ -277,7 +296,7 @@ void activeExpireCycle(int type) { /* We don't repeat the cycle for the current database if there are * an acceptable amount of stale keys (logically expired but yet * not reclained). */ - } while ((expired*100/sampled) > ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE); + } while ((expired*100/sampled) > config_cycle_acceptable_stale); } elapsed = ustime()-start; diff --git a/src/server.c b/src/server.c index f834119ee..c811b869d 100644 --- a/src/server.c +++ b/src/server.c @@ -2294,6 +2294,7 @@ void initServerConfig(void) { server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT; server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE; server.active_expire_enabled = 1; + server.active_expire_effort = CONFIG_DEFAULT_ACTIVE_EXPIRE_EFFORT; server.jemalloc_bg_thread = 1; server.active_defrag_enabled = CONFIG_DEFAULT_ACTIVE_DEFRAG; server.active_defrag_ignore_bytes = CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES; diff --git a/src/server.h b/src/server.h index 2b7974594..51dce955d 100644 --- a/src/server.h +++ b/src/server.h @@ -179,6 +179,7 @@ typedef long long ustime_t; /* microsecond time type. */ #define CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS 1000 /* keys with more than 1000 fields will be processed separately */ #define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */ #define CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL 10 /* 10% tracking table max fill. */ +#define CONFIG_DEFAULT_ACTIVE_EXPIRE_EFFORT 1 /* From 1 to 10. */ #define ACTIVE_EXPIRE_CYCLE_SLOW 0 #define ACTIVE_EXPIRE_CYCLE_FAST 1 @@ -1204,6 +1205,7 @@ struct redisServer { int maxidletime; /* Client timeout in seconds */ int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */ int active_expire_enabled; /* Can be disabled for testing purposes. */ + int active_expire_effort; /* From 1 (default) to 10, active effort. */ int active_defrag_enabled; int jemalloc_bg_thread; /* Enable jemalloc background thread */ size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */ -- cgit v1.2.1 From 1ccc270a2cb3f489e85560f1c953b105997c8b1a Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 18 Nov 2019 11:33:40 +0100 Subject: Expire cycle: make expire effort configurable. --- src/config.c | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/config.c b/src/config.c index 505dabc9c..de8e8d8cf 100644 --- a/src/config.c +++ b/src/config.c @@ -256,7 +256,7 @@ void loadServerConfigFromString(char *config) { for (configYesNo *config = configs_yesno; config->name != NULL; config++) { if ((!strcasecmp(argv[0],config->name) || (config->alias && !strcasecmp(argv[0],config->alias))) && - (argc == 2)) + (argc == 2)) { if ((*(config->config) = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; @@ -580,6 +580,14 @@ void loadServerConfigFromString(char *config) { err = "active-defrag-max-scan-fields must be positive"; goto loaderr; } + } else if (!strcasecmp(argv[0],"active-expire-effort") && argc == 2) { + server.active_expire_effort = atoi(argv[1]); + if (server.active_expire_effort < 1 || + server.active_expire_effort > 10) + { + err = "active-expire-effort must be between 1 and 10"; + goto loaderr; + } } else if (!strcasecmp(argv[0],"hash-max-ziplist-entries") && argc == 2) { server.hash_max_ziplist_entries = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"hash-max-ziplist-value") && argc == 2) { @@ -1165,6 +1173,8 @@ void configSetCommand(client *c) { "active-defrag-cycle-max",server.active_defrag_cycle_max,1,99) { } config_set_numerical_field( "active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,1,LONG_MAX) { + } config_set_numerical_field( + "active-expire-effort",server.active_expire_effort,1,10) { } config_set_numerical_field( "auto-aof-rewrite-percentage",server.aof_rewrite_perc,0,INT_MAX){ } config_set_numerical_field( @@ -1478,6 +1488,7 @@ void configGetCommand(client *c) { config_get_numerical_field("active-defrag-cycle-min",server.active_defrag_cycle_min); config_get_numerical_field("active-defrag-cycle-max",server.active_defrag_cycle_max); config_get_numerical_field("active-defrag-max-scan-fields",server.active_defrag_max_scan_fields); + config_get_numerical_field("active-expire-effort",server.active_expire_effort); config_get_numerical_field("auto-aof-rewrite-percentage", server.aof_rewrite_perc); config_get_numerical_field("auto-aof-rewrite-min-size", @@ -2327,6 +2338,7 @@ int rewriteConfig(char *path) { rewriteConfigNumericalOption(state,"active-defrag-cycle-min",server.active_defrag_cycle_min,CONFIG_DEFAULT_DEFRAG_CYCLE_MIN); rewriteConfigNumericalOption(state,"active-defrag-cycle-max",server.active_defrag_cycle_max,CONFIG_DEFAULT_DEFRAG_CYCLE_MAX); rewriteConfigNumericalOption(state,"active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS); + rewriteConfigNumericalOption(state,"active-expire-effort",server.active_expire_effort,CONFIG_DEFAULT_ACTIVE_EXPIRE_EFFORT); rewriteConfigYesNoOption(state,"appendonly",server.aof_enabled,0); rewriteConfigStringOption(state,"appendfilename",server.aof_filename,CONFIG_DEFAULT_AOF_FILENAME); rewriteConfigEnumOption(state,"appendfsync",server.aof_fsync,aof_fsync_enum,CONFIG_DEFAULT_AOF_FSYNC); -- cgit v1.2.1 From 2ab51a644d3d390df50dc1bc59958a15affeb341 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 18 Nov 2019 11:43:42 +0100 Subject: Expire cycle: fix parameters computation. --- src/expire.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/expire.c b/src/expire.c index dd8a42726..ea7e2b456 100644 --- a/src/expire.c +++ b/src/expire.c @@ -125,16 +125,15 @@ void activeExpireCycle(int type) { * effort. The default effort is 1, and the maximum configurable effort * is 10. */ unsigned long - effort = server.active_expire_effort, + effort = server.active_expire_effort-1, /* Rescale from 0 to 9. */ config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP + ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4*effort, - config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION * + config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION + ACTIVE_EXPIRE_CYCLE_FAST_DURATION/4*effort, config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC + 2*effort, config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE- effort; - if (config_cycle_acceptable_stale < 1) config_cycle_acceptable_stale = 1; /* This function has some global state in order to continue the work * incrementally across calls. */ -- cgit v1.2.1 From e8ceba4e64d6ae7ce8baef90785b4f758e84f5e7 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 18 Nov 2019 17:47:19 +0100 Subject: Expire cycle: set a buckets limit as well. --- src/expire.c | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/expire.c b/src/expire.c index ea7e2b456..b4ab9ab18 100644 --- a/src/expire.c +++ b/src/expire.c @@ -237,8 +237,18 @@ void activeExpireCycle(int type) { /* Here we access the low level representation of the hash table * for speed concerns: this makes this code coupled with dict.c, - * but it hardly changed in ten years. */ - while (sampled < num) { + * but it hardly changed in ten years. + * + * Note that certain places of the hash table may be empty, + * so we want also a stop condition about the number of + * buckets that we scanned. However scanning for free buckets + * is very fast: we are in the cache line scanning a sequential + * array of NULL pointers, so we can scan a lot more buckets + * than keys in the same time. */ + long max_buckets = num*20; + long checked_buckets = 0; + + while (sampled < num && checked_buckets < max_buckets) { for (int table = 0; table < 2; table++) { if (table == 1 && !dictIsRehashing(db->expires)) break; @@ -248,6 +258,7 @@ void activeExpireCycle(int type) { long long ttl; /* Scan the current bucket of the current table. */ + checked_buckets++; while(de) { /* Get the next entry now since this entry may get * deleted. */ -- cgit v1.2.1 From 3243252cb05869ed4abd49f06e45d7eac4912298 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 18 Nov 2019 18:11:38 +0100 Subject: Expire cycle: document expire effort in redis.conf. --- redis.conf | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/redis.conf b/redis.conf index 0ec3321a5..39e21b5e7 100644 --- a/redis.conf +++ b/redis.conf @@ -813,11 +813,11 @@ replica-priority 100 # MAXMEMORY POLICY: how Redis will select what to remove when maxmemory # is reached. You can select among five behaviors: # -# volatile-lru -> Evict using approximated LRU among the keys with an expire set. +# volatile-lru -> Evict using approximated LRU, only keys with an expire set. # allkeys-lru -> Evict any key using approximated LRU. -# volatile-lfu -> Evict using approximated LFU among the keys with an expire set. +# volatile-lfu -> Evict using approximated LFU, only keys with an expire set. # allkeys-lfu -> Evict any key using approximated LFU. -# volatile-random -> Remove a random key among the ones with an expire set. +# volatile-random -> Remove a random key having an expire set. # allkeys-random -> Remove a random key, any key. # volatile-ttl -> Remove the key with the nearest expire time (minor TTL) # noeviction -> Don't evict anything, just return an error on write operations. @@ -872,6 +872,23 @@ replica-priority 100 # # replica-ignore-maxmemory yes +# Redis reclaims expired keys in two ways: upon access when those keys are +# found to be expired, and also in background, in what is called the +# "active expire key". The key space is slowly and interactively scanned +# looking for expired keys to reclaim, so that it is possible to free memory +# of keys that are expired and will never be accessed again in a short time. +# +# The default effort of the expire cycle will try to avoid having more than +# ten percent of expired keys still in memory, and will try to avoid consuming +# more than 25% of total memory and to add latency to the system. However +# it is possible to increase the expire "effort" that is normally set to +# "1", to a greater value, up to the value "10". At its maximum value the +# system will use more CPU, longer cycles (and technically may introduce +# more latency), and will tollerate less already expired keys still present +# in the system. It's a tradeoff betweeen memory, CPU and latecy. +# +# active-expire-effort 1 + ############################# LAZY FREEING #################################### # Redis has two primitives to delete keys. One is called DEL and is a blocking -- cgit v1.2.1 From 7c95e89ec320c2f8ecc767e7dc5e93abfbbd07bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=9C=E6=AC=A2=E5=85=B0=E8=8A=B1=E5=B1=B1=E4=B8=98?= Date: Tue, 19 Nov 2019 17:23:47 +0800 Subject: Update mkreleasehdr.sh fix date +%s errata --- src/mkreleasehdr.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mkreleasehdr.sh b/src/mkreleasehdr.sh index e6d558b17..236c26c2b 100755 --- a/src/mkreleasehdr.sh +++ b/src/mkreleasehdr.sh @@ -3,7 +3,7 @@ GIT_SHA1=`(git show-ref --head --hash=8 2> /dev/null || echo 00000000) | head -n GIT_DIRTY=`git diff --no-ext-diff 2> /dev/null | wc -l` BUILD_ID=`uname -n`"-"`date +%s` if [ -n "$SOURCE_DATE_EPOCH" ]; then - BUILD_ID=$(date -u -d "@$SOURCE_DATE_EPOCH" +%s 2>/dev/null || date -u -r "$SOURCE_DATE_EPOCH" +%s 2>/dev/null || date -u %s) + BUILD_ID=$(date -u -d "@$SOURCE_DATE_EPOCH" +%s 2>/dev/null || date -u -r "$SOURCE_DATE_EPOCH" +%s 2>/dev/null || date -u +%s) fi test -f release.h || touch release.h (cat release.h | grep SHA1 | grep $GIT_SHA1) && \ -- cgit v1.2.1 From 5b80a41caddb9c5fa820de08a3c8646cd1b9640d Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 19 Nov 2019 11:05:55 +0100 Subject: Remove additional space from comment. --- src/t_stream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/t_stream.c b/src/t_stream.c index 9bf87831f..0b07d7110 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1220,7 +1220,7 @@ void xaddCommand(client *c) { return; } - /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating + /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating * a new stream and have streamAppendItem fail, leaving an empty key in the * database. */ if (id_given && id.ms == 0 && id.seq == 0) { -- cgit v1.2.1 From 2d1e893b3e133dcceecef2110d2c41aa8f904b87 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Tue, 19 Nov 2019 12:10:48 +0200 Subject: Improve RM_Call() errno classification. RM_Call() will now use EBADF and ENONET in addition to EINVAL in order to provide more information about errors (i.e. when return value is NULL). --- src/module.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/module.c b/src/module.c index ad34e7b64..8796a7ab1 100644 --- a/src/module.c +++ b/src/module.c @@ -3110,7 +3110,9 @@ fmterr: * On success a RedisModuleCallReply object is returned, otherwise * NULL is returned and errno is set to the following values: * - * EINVAL: command non existing, wrong arity, wrong format specifier. + * EBADF: wrong format specifier. + * EINVAL: wrong command arity. + * ENOENT: command does not exist. * EPERM: operation in Cluster instance with key in non local slot. * * This API is documented here: https://redis.io/topics/modules-intro @@ -3142,7 +3144,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch /* We handle the above format error only when the client is setup so that * we can free it normally. */ if (argv == NULL) { - errno = EINVAL; + errno = EBADF; goto cleanup; } @@ -3154,7 +3156,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch */ cmd = lookupCommand(c->argv[0]->ptr); if (!cmd) { - errno = EINVAL; + errno = ENOENT; goto cleanup; } c->cmd = c->lastcmd = cmd; -- cgit v1.2.1 From b42466b92586d1adfeec54bda14d4ad54b0764d6 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 19 Nov 2019 11:23:43 +0100 Subject: Fix patch provided in #6554. --- src/blocked.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/blocked.c b/src/blocked.c index dea4cc57a..47bb290a4 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -514,6 +514,13 @@ void handleClientsBlockedOnKeys(void) { * we can safely call signalKeyAsReady() against this key. */ dictDelete(rl->db->ready_keys,rl->key); + /* Even if we are not inside call(), increment the call depth + * in order to make sure that keys are expired against a fixed + * reference time, and not against the wallclock time. This + * way we can lookup an object multiple times (BRPOPLPUSH does + * that) without the risk of it being freed in the second + * lookup, invalidating the first one. + * See https://github.com/antirez/redis/pull/6554. */ server.call_depth++; updateCachedTime(0); @@ -533,7 +540,7 @@ void handleClientsBlockedOnKeys(void) { serveClientsBlockedOnKeyByModule(rl); } - server.call_depth++; + server.call_depth--; /* Free this item. */ decrRefCount(rl->key); -- cgit v1.2.1 From ce03d6833213901c95e0b5961b555744d3815bd2 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 19 Nov 2019 11:28:04 +0100 Subject: Rename var to fixed_time_expire now that is more general. --- src/blocked.c | 5 ++--- src/db.c | 2 +- src/server.c | 6 +++--- src/server.h | 2 +- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 47bb290a4..20c0e760a 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -521,7 +521,7 @@ void handleClientsBlockedOnKeys(void) { * that) without the risk of it being freed in the second * lookup, invalidating the first one. * See https://github.com/antirez/redis/pull/6554. */ - server.call_depth++; + server.fixed_time_expire++; updateCachedTime(0); /* Serve clients blocked on list key. */ @@ -539,8 +539,7 @@ void handleClientsBlockedOnKeys(void) { * module is trying to accomplish right now. */ serveClientsBlockedOnKeyByModule(rl); } - - server.call_depth--; + server.fixed_time_expire--; /* Free this item. */ decrRefCount(rl->key); diff --git a/src/db.c b/src/db.c index e47a4a681..aac049eb7 100644 --- a/src/db.c +++ b/src/db.c @@ -1220,7 +1220,7 @@ int keyIsExpired(redisDb *db, robj *key) { * may re-open the same key multiple times, can invalidate an already * open object in a next call, if the next call will see the key expired, * while the first did not. */ - else if (server.call_depth > 0) { + else if (server.fixed_time_expire > 0) { now = server.mstime; } /* For the other cases, we want to use the most fresh time we have. */ diff --git a/src/server.c b/src/server.c index c811b869d..2e9a329dd 100644 --- a/src/server.c +++ b/src/server.c @@ -2788,7 +2788,7 @@ void initServer(void) { server.hz = server.config_hz; server.pid = getpid(); server.current_client = NULL; - server.call_depth = 0; + server.fixed_time_expire = 0; server.clients = listCreate(); server.clients_index = raxNew(); server.clients_to_close = listCreate(); @@ -3262,7 +3262,7 @@ void call(client *c, int flags) { int client_old_flags = c->flags; struct redisCommand *real_cmd = c->cmd; - server.call_depth++; + server.fixed_time_expire++; /* Sent the command to clients in MONITOR mode, only if the commands are * not generated from reading an AOF. */ @@ -3389,7 +3389,7 @@ void call(client *c, int flags) { trackingRememberKeys(caller); } - server.call_depth--; + server.fixed_time_expire--; server.stat_numcommands++; } diff --git a/src/server.h b/src/server.h index 51dce955d..5da7b4960 100644 --- a/src/server.h +++ b/src/server.h @@ -1134,7 +1134,7 @@ struct redisServer { list *clients_pending_read; /* Client has pending read socket buffers. */ list *slaves, *monitors; /* List of slaves and MONITORs */ client *current_client; /* Current client executing the command. */ - long call_depth; /* call() re-entering count. */ + long fixed_time_expire; /* If > 0, expire keys against server.mstime. */ rax *clients_index; /* Active clients dictionary by client ID. */ int clients_paused; /* True if clients are currently paused */ mstime_t clients_pause_end_time; /* Time when we undo clients_paused */ -- cgit v1.2.1 From 936e01e5bb286d9a8701b2d71d8e90f5b7168475 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 19 Nov 2019 11:49:05 +0100 Subject: Fix stream test after addition of 0-0 ID test. --- tests/unit/type/stream.tcl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index a4431c654..656bac5de 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -124,9 +124,9 @@ start_server { } test {XADD with ID 0-0} { - r DEL mystream - catch {r XADD mystream 0-0 k v} err - assert {[r EXISTS mystream] == 0} + r DEL otherstream + catch {r XADD otherstream 0-0 k v} err + assert {[r EXISTS otherstream] == 0} } test {XRANGE COUNT works as expected} { -- cgit v1.2.1 From fe5aea38c35e3fc35a744ad2de73543df553ae48 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 19 Nov 2019 11:56:02 +0100 Subject: Simplify PR #6551 implementation. --- src/sentinel.c | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/sentinel.c b/src/sentinel.c index d5f22b97f..42c4d2467 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -465,12 +465,6 @@ struct redisCommand sentinelcmds[] = { {"hello",helloCommand,-2,"no-script fast",0,NULL,0,0,0,0,0} }; -/* List of client types that are killed when an instance becomes a slave */ -const char* killedClientTypes[] = { - "normal", - "pubsub" -}; - /* This function overwrites a few normal Redis config default with Sentinel * specific defaults. */ void initSentinelConfig(void) { @@ -3955,7 +3949,6 @@ char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) { int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) { char portstr[32]; int retval; - unsigned int curType; ll2string(portstr,sizeof(portstr),port); @@ -4000,11 +3993,11 @@ int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) { * an issue because CLIENT is variadic command, so Redis will not * recognized as a syntax error, and the transaction will not fail (but * only the unsupported command will fail). */ - for (curType = 0; curType < sizeof(killedClientTypes)/sizeof(killedClientTypes[0]); ++curType) { + for (int type = 0; type < 2; type++) { retval = redisAsyncCommand(ri->link->cc, sentinelDiscardReplyCallback, ri, "%s KILL TYPE %s", sentinelInstanceMapCommand(ri,"CLIENT"), - killedClientTypes[curType]); + type == 0 ? "normal" : "pubsub"); if (retval == C_ERR) return retval; ri->link->pending_commands++; } -- cgit v1.2.1 From 99b5696390735fb53756dc0f2e54edaa0cf0c9e2 Mon Sep 17 00:00:00 2001 From: Daniel Dai <764122422@qq.com> Date: Tue, 19 Nov 2019 20:14:59 -0500 Subject: fix typo --- tests/unit/scripting.tcl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/scripting.tcl b/tests/unit/scripting.tcl index b3e1c48e6..2543a0377 100644 --- a/tests/unit/scripting.tcl +++ b/tests/unit/scripting.tcl @@ -536,7 +536,7 @@ foreach cmdrepl {0 1} { start_server {tags {"scripting repl"}} { start_server {} { if {$cmdrepl == 1} { - set rt "(commmands replication)" + set rt "(commands replication)" } else { set rt "(scripts replication)" r debug lua-always-replicate-commands 1 -- cgit v1.2.1 From da47d52c79ec970a2e47812b1e8fae714c47010d Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 21 Nov 2019 10:01:49 +0100 Subject: Recomment PR #6346. --- src/module.c | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/module.c b/src/module.c index 77b885b00..4c609e4ff 100644 --- a/src/module.c +++ b/src/module.c @@ -5995,19 +5995,23 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos) return REDISMODULE_OK; } -/* - * For a given pointer, return The amount of memory - * allocated for this pointer. +/* For a given pointer allocated via RedisModule_Alloc() or + * RedisModule_Realloc(), return the amount of memory allocated for it. + * Note that this may be different (larger) than the memory we allocated + * with the allocation calls, since sometimes the underlying allocator + * will allocate more memory. */ size_t RM_MallocSize(void* ptr){ return zmalloc_size(ptr); } -/* - * Return the a number between 0 to 1 indicating - * the amount of memory currently used. - * 0 - no memory limit - * 1 and above, memory limit reached. +/* Return the a number between 0 to 1 indicating the amount of memory + * currently used, relative to the Redis "maxmemory" configuration. + * + * 0 - No memory limit configured. + * Between 0 and 1 - The percentage of the memory used normalized in 0-1 range. + * Exactly 1 - Memory limit reached. + * Greater 1 - More memory used than the configured limit. */ float RM_GetUsedMemoryRatio(){ float level; -- cgit v1.2.1