diff options
Diffstat (limited to 'src/module.c')
-rw-r--r-- | src/module.c | 1340 |
1 files changed, 973 insertions, 367 deletions
diff --git a/src/module.c b/src/module.c index bf186f8b7..b04595801 100644 --- a/src/module.c +++ b/src/module.c @@ -29,7 +29,9 @@ #include "server.h" #include "cluster.h" +#include "slowlog.h" #include "rdb.h" +#include "monotonic.h" #include <dlfcn.h> #include <sys/stat.h> #include <sys/wait.h> @@ -177,15 +179,25 @@ struct RedisModuleKey { void *iter; /* Iterator. */ int mode; /* Opening mode. */ - /* Zset iterator. */ - uint32_t ztype; /* REDISMODULE_ZSET_RANGE_* */ - zrangespec zrs; /* Score range. */ - zlexrangespec zlrs; /* Lex range. */ - uint32_t zstart; /* Start pos for positional ranges. */ - uint32_t zend; /* End pos for positional ranges. */ - void *zcurrent; /* Zset iterator current node. */ - int zer; /* Zset iterator end reached flag - (true if end was reached). */ + union { + struct { + /* Zset iterator, use only if value->type == OBJ_ZSET */ + uint32_t type; /* REDISMODULE_ZSET_RANGE_* */ + zrangespec rs; /* Score range. */ + zlexrangespec lrs; /* Lex range. */ + uint32_t start; /* Start pos for positional ranges. */ + uint32_t end; /* End pos for positional ranges. */ + void *current; /* Zset iterator current node. */ + int er; /* Zset iterator end reached flag + (true if end was reached). */ + } zset; + struct { + /* Stream, use only if value->type == OBJ_STREAM */ + streamID currentid; /* Current entry while iterating. */ + int64_t numfieldsleft; /* Fields left to fetch for current entry. */ + int signalready; /* Flag that signalKeyAsReady() is needed. */ + } stream; + } u; }; typedef struct RedisModuleKey RedisModuleKey; @@ -252,6 +264,9 @@ typedef struct RedisModuleBlockedClient { int dbid; /* Database number selected by the original client. */ int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */ int unblocked; /* Already on the moduleUnblocked list. */ + monotime background_timer; /* Timer tracking the start of background work */ + uint64_t background_duration; /* Current command background time duration. + Used for measuring latency of blocking cmds */ } RedisModuleBlockedClient; static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER; @@ -376,6 +391,7 @@ robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx); void RM_ZsetRangeStop(RedisModuleKey *kp); static void zsetKeyReset(RedisModuleKey *key); +static void moduleInitKeyTypeSpecific(RedisModuleKey *key); void RM_FreeDict(RedisModuleCtx *ctx, RedisModuleDict *d); void RM_FreeServerInfo(RedisModuleCtx *ctx, RedisModuleServerInfoData *data); @@ -478,17 +494,17 @@ void *RM_PoolAlloc(RedisModuleCtx *ctx, size_t bytes) { * Helpers for modules API implementation * -------------------------------------------------------------------------- */ -/* Create an empty key of the specified type. 'kp' must point to a key object - * opened for writing where the .value member is set to NULL because the +/* Create an empty key of the specified type. `key` must point to a key object + * opened for writing where the `.value` member is set to NULL because the * key was found to be non existing. * * On success REDISMODULE_OK is returned and the key is populated with * the value of the specified type. The function fails and returns * REDISMODULE_ERR if: * - * 1) The key is not open for writing. - * 2) The key is not empty. - * 3) The specified type is unknown. + * 1. The key is not open for writing. + * 2. The key is not empty. + * 3. The specified type is unknown. */ int moduleCreateEmptyKey(RedisModuleKey *key, int type) { robj *obj; @@ -509,10 +525,14 @@ int moduleCreateEmptyKey(RedisModuleKey *key, int type) { case REDISMODULE_KEYTYPE_HASH: obj = createHashObject(); break; + case REDISMODULE_KEYTYPE_STREAM: + obj = createStreamObject(); + break; default: return REDISMODULE_ERR; } dbAdd(key->db,key->key,obj); key->value = obj; + moduleInitKeyTypeSpecific(key); return REDISMODULE_OK; } @@ -900,6 +920,30 @@ long long RM_Milliseconds(void) { return mstime(); } +/* Mark a point in time that will be used as the start time to calculate + * the elapsed execution time when RM_BlockedClientMeasureTimeEnd() is called. + * Within the same command, you can call multiple times + * RM_BlockedClientMeasureTimeStart() and RM_BlockedClientMeasureTimeEnd() + * to accummulate indepedent time intervals to the background duration. + * This method always return REDISMODULE_OK. */ +int RM_BlockedClientMeasureTimeStart(RedisModuleBlockedClient *bc) { + elapsedStart(&(bc->background_timer)); + return REDISMODULE_OK; +} + +/* Mark a point in time that will be used as the end time + * to calculate the elapsed execution time. + * On success REDISMODULE_OK is returned. + * This method only returns REDISMODULE_ERR if no start time was + * previously defined ( meaning RM_BlockedClientMeasureTimeStart was not called ). */ +int RM_BlockedClientMeasureTimeEnd(RedisModuleBlockedClient *bc) { + // If the counter is 0 then we haven't called RM_BlockedClientMeasureTimeStart + if (!bc->background_timer) + return REDISMODULE_ERR; + bc->background_duration += elapsedUs(bc->background_timer); + return REDISMODULE_OK; +} + /* Set flags defining capabilities or behavior bit flags. * * REDISMODULE_OPTIONS_HANDLE_IO_ERRORS: @@ -933,9 +977,9 @@ int RM_SignalModifiedKey(RedisModuleCtx *ctx, RedisModuleString *keyname) { * keys, call replies and Redis string objects once the command returns. In most * cases this eliminates the need of calling the following functions: * - * 1) RedisModule_CloseKey() - * 2) RedisModule_FreeCallReply() - * 3) RedisModule_FreeString() + * 1. RedisModule_CloseKey() + * 2. RedisModule_FreeCallReply() + * 3. RedisModule_FreeString() * * These functions can still be used with automatic memory management enabled, * to optimize loops that make numerous allocations for example. */ @@ -1113,6 +1157,18 @@ RedisModuleString *RM_CreateStringFromString(RedisModuleCtx *ctx, const RedisMod return o; } +/* Creates a string from a stream ID. The returned string must be released with + * RedisModule_FreeString(), unless automatic memory is enabled. + * + * The passed context `ctx` may be NULL if necessary. See the + * RedisModule_CreateString() documentation for more info. */ +RedisModuleString *RM_CreateStringFromStreamID(RedisModuleCtx *ctx, const RedisModuleStreamID *id) { + streamID streamid = {id->ms, id->seq}; + RedisModuleString *o = createObjectFromStreamID(&streamid); + if (ctx != NULL) autoMemoryAdd(ctx, REDISMODULE_AM_STRING, o); + return o; +} + /* Free a module string object obtained with one of the Redis modules API calls * that return new string objects. * @@ -1139,9 +1195,9 @@ void RM_FreeString(RedisModuleCtx *ctx, RedisModuleString *str) { * Normally you want to call this function when, at the same time * the following conditions are true: * - * 1) You have automatic memory management enabled. - * 2) You want to create string objects. - * 3) Those string objects you create need to live *after* the callback + * 1. You have automatic memory management enabled. + * 2. You want to create string objects. + * 3. Those string objects you create need to live *after* the callback * function(for example a command implementation) creating them returns. * * Usually you want this in order to store the created string object @@ -1188,7 +1244,7 @@ void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str) { * returned RedisModuleString. * * It is possible to call this function with a NULL context. - */ +*/ RedisModuleString* RM_HoldString(RedisModuleCtx *ctx, RedisModuleString *str) { if (str->refcount == OBJ_STATIC_REFCOUNT) { return RM_CreateStringFromString(ctx, str); @@ -1270,6 +1326,30 @@ int RM_StringToLongDouble(const RedisModuleString *str, long double *ld) { return retval ? REDISMODULE_OK : REDISMODULE_ERR; } +/* Convert the string into a stream ID, storing it at `*id`. + * Returns REDISMODULE_OK on success and returns REDISMODULE_ERR if the string + * is not a valid string representation of a stream ID. The special IDs "+" and + * "-" are allowed. + * + * RedisModuleStreamID is a struct with two 64-bit fields, which is used in + * stream functions and defined as + * + * typedef struct RedisModuleStreamID { + * uint64_t ms; + * uint64_t seq; + * } RedisModuleStreamID; + */ +int RM_StringToStreamID(const RedisModuleString *str, RedisModuleStreamID *id) { + streamID streamid; + if (streamParseID(str, &streamid) == C_OK) { + id->ms = streamid.ms; + id->seq = streamid.seq; + return REDISMODULE_OK; + } else { + return REDISMODULE_ERR; + } +} + /* Compare two string objects, returning -1, 0 or 1 respectively if * a < b, a == b, a > b. Strings are compared byte by byte as two * binary blobs without any encoding care / collation attempt. */ @@ -1322,7 +1402,7 @@ int RM_StringAppendBuffer(RedisModuleCtx *ctx, RedisModuleString *str, const cha * -------------------------------------------------------------------------- */ /* Send an error about the number of arguments given to the command, - * citing the command name in the error message. + * citing the command name in the error message. Returns REDISMODULE_OK. * * Example: * @@ -1394,7 +1474,7 @@ int RM_ReplyWithError(RedisModuleCtx *ctx, const char *err) { return REDISMODULE_OK; } -/* Reply with a simple string (+... \r\n in RESP protocol). This replies +/* Reply with a simple string (`+... \r\n` in RESP protocol). This replies * are suitable only when sending a small non-binary string with small * overhead, like "OK" or similar replies. * @@ -1742,7 +1822,7 @@ int RM_ReplicateVerbatim(RedisModuleCtx *ctx) { * 2. The ID increases monotonically. Clients connecting to the server later * are guaranteed to get IDs greater than any past ID previously seen. * - * Valid IDs are from 1 to 2^64-1. If 0 is returned it means there is no way + * Valid IDs are from 1 to 2^64 - 1. If 0 is returned it means there is no way * to fetch the ID in the context the function was currently called. * * After obtaining the ID, it is possible to check if the command execution @@ -2072,7 +2152,15 @@ static void moduleInitKey(RedisModuleKey *kp, RedisModuleCtx *ctx, robj *keyname kp->value = value; kp->iter = NULL; kp->mode = mode; - zsetKeyReset(kp); + if (kp->value) moduleInitKeyTypeSpecific(kp); +} + +/* Initialize the type-specific part of the key. Only when key has a value. */ +static void moduleInitKeyTypeSpecific(RedisModuleKey *key) { + switch (key->value->type) { + case OBJ_ZSET: zsetKeyReset(key); break; + case OBJ_STREAM: key->u.stream.signalready = 0; break; + } } /* Return an handle representing a Redis key, so that it is possible @@ -2115,8 +2203,13 @@ static void moduleCloseKey(RedisModuleKey *key) { int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx); if ((key->mode & REDISMODULE_WRITE) && signal) signalModifiedKey(key->ctx->client,key->db,key->key); - /* TODO: if (key->iter) RM_KeyIteratorStop(kp); */ + if (key->iter) zfree(key->iter); RM_ZsetRangeStop(key); + if (key && key->value && key->value->type == OBJ_STREAM && + key->u.stream.signalready) { + /* One of more RM_StreamAdd() have been done. */ + signalKeyAsReady(key->db, key->key, OBJ_STREAM); + } decrRefCount(key->key); } @@ -2376,9 +2469,10 @@ int RM_ListPush(RedisModuleKey *key, int where, RedisModuleString *ele) { * that the user should be free with RM_FreeString() or by enabling * automatic memory. 'where' specifies if the element should be popped from * head or tail. The command returns NULL if: - * 1) The list is empty. - * 2) The key was not open for writing. - * 3) The key is not a list. */ + * + * 1. The list is empty. + * 2. The key was not open for writing. + * 3. The key is not a list. */ RedisModuleString *RM_ListPop(RedisModuleKey *key, int where) { if (!(key->mode & REDISMODULE_WRITE) || key->value == NULL || @@ -2398,7 +2492,7 @@ RedisModuleString *RM_ListPop(RedisModuleKey *key, int where) { /* Conversion from/to public flags of the Modules API and our private flags, * so that we have everything decoupled. */ -int RM_ZsetAddFlagsToCoreFlags(int flags) { +int moduleZsetAddFlagsToCoreFlags(int flags) { int retflags = 0; if (flags & REDISMODULE_ZADD_XX) retflags |= ZADD_XX; if (flags & REDISMODULE_ZADD_NX) retflags |= ZADD_NX; @@ -2408,7 +2502,7 @@ int RM_ZsetAddFlagsToCoreFlags(int flags) { } /* See previous function comment. */ -int RM_ZsetAddFlagsFromCoreFlags(int flags) { +int moduleZsetAddFlagsFromCoreFlags(int flags) { int retflags = 0; if (flags & ZADD_ADDED) retflags |= REDISMODULE_ZADD_ADDED; if (flags & ZADD_UPDATED) retflags |= REDISMODULE_ZADD_UPDATED; @@ -2453,12 +2547,12 @@ int RM_ZsetAdd(RedisModuleKey *key, double score, RedisModuleString *ele, int *f if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR; if (key->value && key->value->type != OBJ_ZSET) return REDISMODULE_ERR; if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_ZSET); - if (flagsptr) flags = RM_ZsetAddFlagsToCoreFlags(*flagsptr); + if (flagsptr) flags = moduleZsetAddFlagsToCoreFlags(*flagsptr); if (zsetAdd(key->value,score,ele->ptr,&flags,NULL) == 0) { if (flagsptr) *flagsptr = 0; return REDISMODULE_ERR; } - if (flagsptr) *flagsptr = RM_ZsetAddFlagsFromCoreFlags(flags); + if (flagsptr) *flagsptr = moduleZsetAddFlagsFromCoreFlags(flags); return REDISMODULE_OK; } @@ -2480,7 +2574,7 @@ int RM_ZsetIncrby(RedisModuleKey *key, double score, RedisModuleString *ele, int if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR; if (key->value && key->value->type != OBJ_ZSET) return REDISMODULE_ERR; if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_ZSET); - if (flagsptr) flags = RM_ZsetAddFlagsToCoreFlags(*flagsptr); + if (flagsptr) flags = moduleZsetAddFlagsToCoreFlags(*flagsptr); flags |= ZADD_INCR; if (zsetAdd(key->value,score,ele->ptr,&flags,newscore) == 0) { if (flagsptr) *flagsptr = 0; @@ -2491,7 +2585,7 @@ int RM_ZsetIncrby(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr = 0; return REDISMODULE_ERR; } - if (flagsptr) *flagsptr = RM_ZsetAddFlagsFromCoreFlags(flags); + if (flagsptr) *flagsptr = moduleZsetAddFlagsFromCoreFlags(flags); return REDISMODULE_OK; } @@ -2544,16 +2638,17 @@ int RM_ZsetScore(RedisModuleKey *key, RedisModuleString *ele, double *score) { * -------------------------------------------------------------------------- */ void zsetKeyReset(RedisModuleKey *key) { - key->ztype = REDISMODULE_ZSET_RANGE_NONE; - key->zcurrent = NULL; - key->zer = 1; + key->u.zset.type = REDISMODULE_ZSET_RANGE_NONE; + key->u.zset.current = NULL; + key->u.zset.er = 1; } /* Stop a sorted set iteration. */ void RM_ZsetRangeStop(RedisModuleKey *key) { + if (!key->value || key->value->type != OBJ_ZSET) return; /* Free resources if needed. */ - if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) - zslFreeLexRange(&key->zlrs); + if (key->u.zset.type == REDISMODULE_ZSET_RANGE_LEX) + zslFreeLexRange(&key->u.zset.lrs); /* Setup sensible values so that misused iteration API calls when an * iterator is not active will result into something more sensible * than crashing. */ @@ -2562,7 +2657,7 @@ void RM_ZsetRangeStop(RedisModuleKey *key) { /* Return the "End of range" flag value to signal the end of the iteration. */ int RM_ZsetRangeEndReached(RedisModuleKey *key) { - return key->zer; + return key->u.zset.er; } /* Helper function for RM_ZsetFirstInScoreRange() and RM_ZsetLastInScoreRange(). @@ -2575,29 +2670,29 @@ int zsetInitScoreRange(RedisModuleKey *key, double min, double max, int minex, i if (!key->value || key->value->type != OBJ_ZSET) return REDISMODULE_ERR; RM_ZsetRangeStop(key); - key->ztype = REDISMODULE_ZSET_RANGE_SCORE; - key->zer = 0; + key->u.zset.type = REDISMODULE_ZSET_RANGE_SCORE; + key->u.zset.er = 0; /* Setup the range structure used by the sorted set core implementation * in order to seek at the specified element. */ - zrangespec *zrs = &key->zrs; + zrangespec *zrs = &key->u.zset.rs; zrs->min = min; zrs->max = max; zrs->minex = minex; zrs->maxex = maxex; if (key->value->encoding == OBJ_ENCODING_ZIPLIST) { - key->zcurrent = first ? zzlFirstInRange(key->value->ptr,zrs) : - zzlLastInRange(key->value->ptr,zrs); + key->u.zset.current = first ? zzlFirstInRange(key->value->ptr,zrs) : + zzlLastInRange(key->value->ptr,zrs); } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = key->value->ptr; zskiplist *zsl = zs->zsl; - key->zcurrent = first ? zslFirstInRange(zsl,zrs) : - zslLastInRange(zsl,zrs); + key->u.zset.current = first ? zslFirstInRange(zsl,zrs) : + zslLastInRange(zsl,zrs); } else { serverPanic("Unsupported zset encoding"); } - if (key->zcurrent == NULL) key->zer = 1; + if (key->u.zset.current == NULL) key->u.zset.er = 1; return REDISMODULE_OK; } @@ -2610,8 +2705,8 @@ int zsetInitScoreRange(RedisModuleKey *key, double min, double max, int minex, i * The range is specified according to the two double values 'min' and 'max'. * Both can be infinite using the following two macros: * - * REDISMODULE_POSITIVE_INFINITE for positive infinite value - * REDISMODULE_NEGATIVE_INFINITE for negative infinite value + * * REDISMODULE_POSITIVE_INFINITE for positive infinite value + * * REDISMODULE_NEGATIVE_INFINITE for negative infinite value * * 'minex' and 'maxex' parameters, if true, respectively setup a range * where the min and max value are exclusive (not included) instead of @@ -2639,29 +2734,29 @@ int zsetInitLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModuleStr if (!key->value || key->value->type != OBJ_ZSET) return REDISMODULE_ERR; RM_ZsetRangeStop(key); - key->zer = 0; + key->u.zset.er = 0; /* Setup the range structure used by the sorted set core implementation * in order to seek at the specified element. */ - zlexrangespec *zlrs = &key->zlrs; + zlexrangespec *zlrs = &key->u.zset.lrs; if (zslParseLexRange(min, max, zlrs) == C_ERR) return REDISMODULE_ERR; /* Set the range type to lex only after successfully parsing the range, * otherwise we don't want the zlexrangespec to be freed. */ - key->ztype = REDISMODULE_ZSET_RANGE_LEX; + key->u.zset.type = REDISMODULE_ZSET_RANGE_LEX; if (key->value->encoding == OBJ_ENCODING_ZIPLIST) { - key->zcurrent = first ? zzlFirstInLexRange(key->value->ptr,zlrs) : - zzlLastInLexRange(key->value->ptr,zlrs); + key->u.zset.current = first ? zzlFirstInLexRange(key->value->ptr,zlrs) : + zzlLastInLexRange(key->value->ptr,zlrs); } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = key->value->ptr; zskiplist *zsl = zs->zsl; - key->zcurrent = first ? zslFirstInLexRange(zsl,zlrs) : - zslLastInLexRange(zsl,zlrs); + key->u.zset.current = first ? zslFirstInLexRange(zsl,zlrs) : + zslLastInLexRange(zsl,zlrs); } else { serverPanic("Unsupported zset encoding"); } - if (key->zcurrent == NULL) key->zer = 1; + if (key->u.zset.current == NULL) key->u.zset.er = 1; return REDISMODULE_OK; } @@ -2694,10 +2789,11 @@ int RM_ZsetLastInLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModu RedisModuleString *RM_ZsetRangeCurrentElement(RedisModuleKey *key, double *score) { RedisModuleString *str; - if (key->zcurrent == NULL) return NULL; + if (!key->value || key->value->type != OBJ_ZSET) return NULL; + if (key->u.zset.current == NULL) return NULL; if (key->value->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *eptr, *sptr; - eptr = key->zcurrent; + eptr = key->u.zset.current; sds ele = ziplistGetObject(eptr); if (score) { sptr = ziplistNext(key->value->ptr,eptr); @@ -2705,7 +2801,7 @@ RedisModuleString *RM_ZsetRangeCurrentElement(RedisModuleKey *key, double *score } str = createObject(OBJ_STRING,ele); } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { - zskiplistNode *ln = key->zcurrent; + zskiplistNode *ln = key->u.zset.current; if (score) *score = ln->score; str = createStringObject(ln->ele,sdslen(ln->ele)); } else { @@ -2719,58 +2815,59 @@ RedisModuleString *RM_ZsetRangeCurrentElement(RedisModuleKey *key, double *score * a next element, 0 if we are already at the latest element or the range * does not include any item at all. */ int RM_ZsetRangeNext(RedisModuleKey *key) { - if (!key->ztype || !key->zcurrent) return 0; /* No active iterator. */ + if (!key->value || key->value->type != OBJ_ZSET) return 0; + if (!key->u.zset.type || !key->u.zset.current) return 0; /* No active iterator. */ if (key->value->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *zl = key->value->ptr; - unsigned char *eptr = key->zcurrent; + unsigned char *eptr = key->u.zset.current; unsigned char *next; next = ziplistNext(zl,eptr); /* Skip element. */ if (next) next = ziplistNext(zl,next); /* Skip score. */ if (next == NULL) { - key->zer = 1; + key->u.zset.er = 1; return 0; } else { /* Are we still within the range? */ - if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE) { + if (key->u.zset.type == REDISMODULE_ZSET_RANGE_SCORE) { /* Fetch the next element score for the * range check. */ unsigned char *saved_next = next; next = ziplistNext(zl,next); /* Skip next element. */ double score = zzlGetScore(next); /* Obtain the next score. */ - if (!zslValueLteMax(score,&key->zrs)) { - key->zer = 1; + if (!zslValueLteMax(score,&key->u.zset.rs)) { + key->u.zset.er = 1; return 0; } next = saved_next; - } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) { - if (!zzlLexValueLteMax(next,&key->zlrs)) { - key->zer = 1; + } else if (key->u.zset.type == REDISMODULE_ZSET_RANGE_LEX) { + if (!zzlLexValueLteMax(next,&key->u.zset.lrs)) { + key->u.zset.er = 1; return 0; } } - key->zcurrent = next; + key->u.zset.current = next; return 1; } } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { - zskiplistNode *ln = key->zcurrent, *next = ln->level[0].forward; + zskiplistNode *ln = key->u.zset.current, *next = ln->level[0].forward; if (next == NULL) { - key->zer = 1; + key->u.zset.er = 1; return 0; } else { /* Are we still within the range? */ - if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE && - !zslValueLteMax(next->score,&key->zrs)) + if (key->u.zset.type == REDISMODULE_ZSET_RANGE_SCORE && + !zslValueLteMax(next->score,&key->u.zset.rs)) { - key->zer = 1; + key->u.zset.er = 1; return 0; - } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) { - if (!zslLexValueLteMax(next->ele,&key->zlrs)) { - key->zer = 1; + } else if (key->u.zset.type == REDISMODULE_ZSET_RANGE_LEX) { + if (!zslLexValueLteMax(next->ele,&key->u.zset.lrs)) { + key->u.zset.er = 1; return 0; } } - key->zcurrent = next; + key->u.zset.current = next; return 1; } } else { @@ -2782,58 +2879,59 @@ int RM_ZsetRangeNext(RedisModuleKey *key) { * a previous element, 0 if we are already at the first element or the range * does not include any item at all. */ int RM_ZsetRangePrev(RedisModuleKey *key) { - if (!key->ztype || !key->zcurrent) return 0; /* No active iterator. */ + if (!key->value || key->value->type != OBJ_ZSET) return 0; + if (!key->u.zset.type || !key->u.zset.current) return 0; /* No active iterator. */ if (key->value->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *zl = key->value->ptr; - unsigned char *eptr = key->zcurrent; + unsigned char *eptr = key->u.zset.current; unsigned char *prev; prev = ziplistPrev(zl,eptr); /* Go back to previous score. */ if (prev) prev = ziplistPrev(zl,prev); /* Back to previous ele. */ if (prev == NULL) { - key->zer = 1; + key->u.zset.er = 1; return 0; } else { /* Are we still within the range? */ - if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE) { + if (key->u.zset.type == REDISMODULE_ZSET_RANGE_SCORE) { /* Fetch the previous element score for the * range check. */ unsigned char *saved_prev = prev; prev = ziplistNext(zl,prev); /* Skip element to get the score.*/ double score = zzlGetScore(prev); /* Obtain the prev score. */ - if (!zslValueGteMin(score,&key->zrs)) { - key->zer = 1; + if (!zslValueGteMin(score,&key->u.zset.rs)) { + key->u.zset.er = 1; return 0; } prev = saved_prev; - } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) { - if (!zzlLexValueGteMin(prev,&key->zlrs)) { - key->zer = 1; + } else if (key->u.zset.type == REDISMODULE_ZSET_RANGE_LEX) { + if (!zzlLexValueGteMin(prev,&key->u.zset.lrs)) { + key->u.zset.er = 1; return 0; } } - key->zcurrent = prev; + key->u.zset.current = prev; return 1; } } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { - zskiplistNode *ln = key->zcurrent, *prev = ln->backward; + zskiplistNode *ln = key->u.zset.current, *prev = ln->backward; if (prev == NULL) { - key->zer = 1; + key->u.zset.er = 1; return 0; } else { /* Are we still within the range? */ - if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE && - !zslValueGteMin(prev->score,&key->zrs)) + if (key->u.zset.type == REDISMODULE_ZSET_RANGE_SCORE && + !zslValueGteMin(prev->score,&key->u.zset.rs)) { - key->zer = 1; + key->u.zset.er = 1; return 0; - } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) { - if (!zslLexValueGteMin(prev->ele,&key->zlrs)) { - key->zer = 1; + } else if (key->u.zset.type == REDISMODULE_ZSET_RANGE_LEX) { + if (!zslLexValueGteMin(prev->ele,&key->u.zset.lrs)) { + key->u.zset.er = 1; return 0; } } - key->zcurrent = prev; + key->u.zset.current = prev; return 1; } } else { @@ -2970,7 +3068,7 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) { * * RedisModuleString *first, *second; * RedisModule_HashGet(mykey,REDISMODULE_HASH_NONE,argv[1],&first, - * argv[2],&second,NULL); + * argv[2],&second,NULL); * * As with RedisModule_HashSet() the behavior of the command can be specified * passing flags different than REDISMODULE_HASH_NONE: @@ -3049,6 +3147,455 @@ int RM_HashGet(RedisModuleKey *key, int flags, ...) { } /* -------------------------------------------------------------------------- + * Key API for the stream type. + * -------------------------------------------------------------------------- */ + +/* Adds an entry to a stream. Like XADD without trimming. + * + * - `key`: The key where the stream is (or will be) stored + * - `flags`: A bit field of + * - `REDISMODULE_STREAM_ADD_AUTOID`: Assign a stream ID automatically, like + * `*` in the XADD command. + * - `id`: If the `AUTOID` flag is set, this is where the assigned ID is + * returned. Can be NULL if `AUTOID` is set, if you don't care to receive the + * ID. If `AUTOID` is not set, this is the requested ID. + * - `argv`: A pointer to an array of size `numfields * 2` containing the + * fields and values. + * - `numfields`: The number of field-value pairs in `argv`. + * + * Returns REDISMODULE_OK if an entry has been added. On failure, + * REDISMODULE_ERR is returned and `errno` is set as follows: + * + * - EINVAL if called with invalid arguments + * - ENOTSUP if the key refers to a value of a type other than stream + * - EBADF if the key was not opened for writing + * - EDOM if the given ID was 0-0 or not greater than all other IDs in the + * stream (only if the AUTOID flag is unset) + * - EFBIG if the stream has reached the last possible ID + */ +int RM_StreamAdd(RedisModuleKey *key, int flags, RedisModuleStreamID *id, RedisModuleString **argv, long numfields) { + /* Validate args */ + if (!key || (numfields != 0 && !argv) || /* invalid key or argv */ + (flags & ~(REDISMODULE_STREAM_ADD_AUTOID)) || /* invalid flags */ + (!(flags & REDISMODULE_STREAM_ADD_AUTOID) && !id)) { /* id required */ + errno = EINVAL; + return REDISMODULE_ERR; + } else if (key->value && key->value->type != OBJ_STREAM) { + errno = ENOTSUP; /* wrong type */ + return REDISMODULE_ERR; + } else if (!(key->mode & REDISMODULE_WRITE)) { + errno = EBADF; /* key not open for writing */ + return REDISMODULE_ERR; + } else if (!(flags & REDISMODULE_STREAM_ADD_AUTOID) && + id->ms == 0 && id->seq == 0) { + errno = EDOM; /* ID out of range */ + return REDISMODULE_ERR; + } + + /* Create key if necessery */ + int created = 0; + if (key->value == NULL) { + moduleCreateEmptyKey(key, REDISMODULE_KEYTYPE_STREAM); + created = 1; + } + + stream *s = key->value->ptr; + if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) { + /* The stream has reached the last possible ID */ + errno = EFBIG; + return REDISMODULE_ERR; + } + + streamID added_id; + streamID use_id; + streamID *use_id_ptr = NULL; + if (!(flags & REDISMODULE_STREAM_ADD_AUTOID)) { + use_id.ms = id->ms; + use_id.seq = id->seq; + use_id_ptr = &use_id; + } + if (streamAppendItem(s, argv, numfields, &added_id, use_id_ptr) == C_ERR) { + /* ID not greater than all existing IDs in the stream */ + errno = EDOM; + return REDISMODULE_ERR; + } + /* Postponed signalKeyAsReady(). Done implicitly by moduleCreateEmptyKey() + * so not needed if the stream has just been created. */ + if (!created) key->u.stream.signalready = 1; + + if (id != NULL) { + id->ms = added_id.ms; + id->seq = added_id.seq; + } + + return REDISMODULE_OK; +} + +/* Deletes an entry from a stream. + * + * - `key`: A key opened for writing, with no stream iterator started. + * - `id`: The stream ID of the entry to delete. + * + * Returns REDISMODULE_OK on success. On failure, REDISMODULE_ERR is returned + * and `errno` is set as follows: + * + * - EINVAL if called with invalid arguments + * - ENOTSUP if the key refers to a value of a type other than stream or if the + * key is empty + * - EBADF if the key was not opened for writing or if a stream iterator is + * associated with the key + * - ENOENT if no entry with the given stream ID exists + * + * See also RM_StreamIteratorDelete() for deleting the current entry while + * iterating using a stream iterator. + */ +int RM_StreamDelete(RedisModuleKey *key, RedisModuleStreamID *id) { + if (!key || !id) { + errno = EINVAL; + return REDISMODULE_ERR; + } else if (!key->value || key->value->type != OBJ_STREAM) { + errno = ENOTSUP; /* wrong type */ + return REDISMODULE_ERR; + } else if (!(key->mode & REDISMODULE_WRITE) || + key->iter != NULL) { + errno = EBADF; /* key not opened for writing or iterator started */ + return REDISMODULE_ERR; + } + stream *s = key->value->ptr; + streamID streamid = {id->ms, id->seq}; + if (streamDeleteItem(s, &streamid)) { + return REDISMODULE_OK; + } else { + errno = ENOENT; /* no entry with this id */ + return REDISMODULE_ERR; + } +} + +/* Sets up a stream iterator. + * + * - `key`: The stream key opened for reading using RedisModule_OpenKey(). + * - `flags`: + * - `REDISMODULE_STREAM_ITERATOR_EXCLUSIVE`: Don't include `start` and `end` + * in the iterated range. + * - `REDISMODULE_STREAM_ITERATOR_REVERSE`: Iterate in reverse order, starting + * from the `end` of the range. + * - `start`: The lower bound of the range. Use NULL for the beginning of the + * stream. + * - `end`: The upper bound of the range. Use NULL for the end of the stream. + * + * Returns REDISMODULE_OK on success. On failure, REDISMODULE_ERR is returned + * and `errno` is set as follows: + * + * - EINVAL if called with invalid arguments + * - ENOTSUP if the key refers to a value of a type other than stream or if the + * key is empty + * - EBADF if the key was not opened for writing or if a stream iterator is + * already associated with the key + * - EDOM if `start` or `end` is outside the valid range + * + * Returns REDISMODULE_OK on success and REDISMODULE_ERR if the key doesn't + * refer to a stream or if invalid arguments were given. + * + * The stream IDs are retrieved using RedisModule_StreamIteratorNextID() and + * for each stream ID, the fields and values are retrieved using + * RedisModule_StreamIteratorNextField(). The iterator is freed by calling + * RedisModule_StreamIteratorStop(). + * + * Example (error handling omitted): + * + * RedisModule_StreamIteratorStart(key, 0, startid_ptr, endid_ptr); + * RedisModuleStreamID id; + * long numfields; + * while (RedisModule_StreamIteratorNextID(key, &id, &numfields) == + * REDISMODULE_OK) { + * RedisModuleString *field, *value; + * while (RedisModule_StreamIteratorNextField(key, &field, &value) == + * REDISMODULE_OK) { + * // + * // ... Do stuff ... + * // + * RedisModule_Free(field); + * RedisModule_Free(value); + * } + * } + * RedisModule_StreamIteratorStop(key); + */ +int RM_StreamIteratorStart(RedisModuleKey *key, int flags, RedisModuleStreamID *start, RedisModuleStreamID *end) { + /* check args */ + if (!key || + (flags & ~(REDISMODULE_STREAM_ITERATOR_EXCLUSIVE | + REDISMODULE_STREAM_ITERATOR_REVERSE))) { + errno = EINVAL; /* key missing or invalid flags */ + return REDISMODULE_ERR; + } else if (!key->value || key->value->type != OBJ_STREAM) { + errno = ENOTSUP; + return REDISMODULE_ERR; /* not a stream */ + } else if (key->iter) { + errno = EBADF; /* iterator already started */ + return REDISMODULE_ERR; + } + + /* define range for streamIteratorStart() */ + streamID lower, upper; + if (start) lower = (streamID){start->ms, start->seq}; + if (end) upper = (streamID){end->ms, end->seq}; + if (flags & REDISMODULE_STREAM_ITERATOR_EXCLUSIVE) { + if ((start && streamIncrID(&lower) != C_OK) || + (end && streamDecrID(&upper) != C_OK)) { + errno = EDOM; /* end is 0-0 or start is MAX-MAX? */ + return REDISMODULE_ERR; + } + } + + /* create iterator */ + stream *s = key->value->ptr; + int rev = flags & REDISMODULE_STREAM_ITERATOR_REVERSE; + streamIterator *si = zmalloc(sizeof(*si)); + streamIteratorStart(si, s, start ? &lower : NULL, end ? &upper : NULL, rev); + key->iter = si; + key->u.stream.currentid.ms = 0; /* for RM_StreamIteratorDelete() */ + key->u.stream.currentid.seq = 0; + key->u.stream.numfieldsleft = 0; /* for RM_StreamIteratorNextField() */ + return REDISMODULE_OK; +} + +/* Stops a stream iterator created using RedisModule_StreamIteratorStart() and + * reclaims its memory. + * + * Returns REDISMODULE_OK on success. On failure, REDISMODULE_ERR is returned + * and `errno` is set as follows: + * + * - EINVAL if called with a NULL key + * - ENOTSUP if the key refers to a value of a type other than stream or if the + * key is empty + * - EBADF if the key was not opened for writing or if no stream iterator is + * associated with the key + */ +int RM_StreamIteratorStop(RedisModuleKey *key) { + if (!key) { + errno = EINVAL; + return REDISMODULE_ERR; + } else if (!key->value || key->value->type != OBJ_STREAM) { + errno = ENOTSUP; + return REDISMODULE_ERR; + } else if (!key->iter) { + errno = EBADF; + return REDISMODULE_ERR; + } + zfree(key->iter); + key->iter = NULL; + return REDISMODULE_OK; +} + +/* Finds the next stream entry and returns its stream ID and the number of + * fields. + * + * - `key`: Key for which a stream iterator has been started using + * RedisModule_StreamIteratorStart(). + * - `id`: The stream ID returned. NULL if you don't care. + * - `numfields`: The number of fields in the found stream entry. NULL if you + * don't care. + * + * Returns REDISMODULE_OK and sets `*id` and `*numfields` if an entry was found. + * On failure, REDISMODULE_ERR is returned and `errno` is set as follows: + * + * - EINVAL if called with a NULL key + * - ENOTSUP if the key refers to a value of a type other than stream or if the + * key is empty + * - EBADF if no stream iterator is associated with the key + * - ENOENT if there are no more entries in the range of the iterator + * + * In practice, if RM_StreamIteratorNextID() is called after a successful call + * to RM_StreamIteratorStart() and with the same key, it is safe to assume that + * an REDISMODULE_ERR return value means that there are no more entries. + * + * Use RedisModule_StreamIteratorNextField() to retrieve the fields and values. + * See the example at RedisModule_StreamIteratorStart(). + */ +int RM_StreamIteratorNextID(RedisModuleKey *key, RedisModuleStreamID *id, long *numfields) { + if (!key) { + errno = EINVAL; + return REDISMODULE_ERR; + } else if (!key->value || key->value->type != OBJ_STREAM) { + errno = ENOTSUP; + return REDISMODULE_ERR; + } else if (!key->iter) { + errno = EBADF; + return REDISMODULE_ERR; + } + streamIterator *si = key->iter; + int64_t *num_ptr = &key->u.stream.numfieldsleft; + streamID *streamid_ptr = &key->u.stream.currentid; + if (streamIteratorGetID(si, streamid_ptr, num_ptr)) { + if (id) { + id->ms = streamid_ptr->ms; + id->seq = streamid_ptr->seq; + } + if (numfields) *numfields = *num_ptr; + return REDISMODULE_OK; + } else { + /* No entry found. */ + key->u.stream.currentid.ms = 0; /* for RM_StreamIteratorDelete() */ + key->u.stream.currentid.seq = 0; + key->u.stream.numfieldsleft = 0; /* for RM_StreamIteratorNextField() */ + errno = ENOENT; + return REDISMODULE_ERR; + } +} + +/* Retrieves the next field of the current stream ID and its corresponding value + * in a stream iteration. This function should be called repeatedly after calling + * RedisModule_StreamIteratorNextID() to fetch each field-value pair. + * + * - `key`: Key where a stream iterator has been started. + * - `field_ptr`: This is where the field is returned. + * - `value_ptr`: This is where the value is returned. + * + * Returns REDISMODULE_OK and points `*field_ptr` and `*value_ptr` to freshly + * allocated RedisModuleString objects. The string objects are freed + * automatically when the callback finishes if automatic memory is enabled. On + * failure, REDISMODULE_ERR is returned and `errno` is set as follows: + * + * - EINVAL if called with a NULL key + * - ENOTSUP if the key refers to a value of a type other than stream or if the + * key is empty + * - EBADF if no stream iterator is associated with the key + * - ENOENT if there are no more fields in the current stream entry + * + * In practice, if RM_StreamIteratorNextField() is called after a successful + * call to RM_StreamIteratorNextID() and with the same key, it is safe to assume + * that an REDISMODULE_ERR return value means that there are no more fields. + * + * See the example at RedisModule_StreamIteratorStart(). + */ +int RM_StreamIteratorNextField(RedisModuleKey *key, RedisModuleString **field_ptr, RedisModuleString **value_ptr) { + if (!key) { + errno = EINVAL; + return REDISMODULE_ERR; + } else if (!key->value || key->value->type != OBJ_STREAM) { + errno = ENOTSUP; + return REDISMODULE_ERR; + } else if (!key->iter) { + errno = EBADF; + return REDISMODULE_ERR; + } else if (key->u.stream.numfieldsleft <= 0) { + errno = ENOENT; + return REDISMODULE_ERR; + } + streamIterator *si = key->iter; + unsigned char *field, *value; + int64_t field_len, value_len; + streamIteratorGetField(si, &field, &value, &field_len, &value_len); + if (field_ptr) { + *field_ptr = createRawStringObject((char *)field, field_len); + autoMemoryAdd(key->ctx, REDISMODULE_AM_STRING, *field_ptr); + } + if (value_ptr) { + *value_ptr = createRawStringObject((char *)value, value_len); + autoMemoryAdd(key->ctx, REDISMODULE_AM_STRING, *value_ptr); + } + key->u.stream.numfieldsleft--; + return REDISMODULE_OK; +} + +/* Deletes the current stream entry while iterating. + * + * This function can be called after RM_StreamIteratorNextID() or after any + * calls to RM_StreamIteratorNextField(). + * + * Returns REDISMODULE_OK on success. On failure, REDISMODULE_ERR is returned + * and `errno` is set as follows: + * + * - EINVAL if key is NULL + * - ENOTSUP if the key is empty or is of another type than stream + * - EBADF if the key is not opened for writing, if no iterator has been started + * - ENOENT if the iterator has no current stream entry + */ +int RM_StreamIteratorDelete(RedisModuleKey *key) { + if (!key) { + errno = EINVAL; + return REDISMODULE_ERR; + } else if (!key->value || key->value->type != OBJ_STREAM) { + errno = ENOTSUP; + return REDISMODULE_ERR; + } else if (!(key->mode & REDISMODULE_WRITE) || !key->iter) { + errno = EBADF; + return REDISMODULE_ERR; + } else if (key->u.stream.currentid.ms == 0 && + key->u.stream.currentid.seq == 0) { + errno = ENOENT; + return REDISMODULE_ERR; + } + streamIterator *si = key->iter; + streamIteratorRemoveEntry(si, &key->u.stream.currentid); + key->u.stream.currentid.ms = 0; /* Make sure repeated Delete() fails */ + key->u.stream.currentid.seq = 0; + key->u.stream.numfieldsleft = 0; /* Make sure NextField() fails */ + return REDISMODULE_OK; +} + +/* Trim a stream by length, similar to XTRIM with MAXLEN. + * + * - `key`: Key opened for writing. + * - `flags`: A bitfield of + * - `REDISMODULE_STREAM_TRIM_APPROX`: Trim less if it improves performance, + * like XTRIM with `~`. + * - `length`: The number of stream entries to keep after trimming. + * + * Returns the number of entries deleted. On failure, a negative value is + * returned and `errno` is set as follows: + * + * - EINVAL if called with invalid arguments + * - ENOTSUP if the key is empty or of a type other than stream + * - EBADF if the key is not opened for writing + */ +long long RM_StreamTrimByLength(RedisModuleKey *key, int flags, long long length) { + if (!key || (flags & ~(REDISMODULE_STREAM_TRIM_APPROX)) || length < 0) { + errno = EINVAL; + return -1; + } else if (!key->value || key->value->type != OBJ_STREAM) { + errno = ENOTSUP; + return -1; + } else if (!(key->mode & REDISMODULE_WRITE)) { + errno = EBADF; + return -1; + } + int approx = flags & REDISMODULE_STREAM_TRIM_APPROX ? 1 : 0; + return streamTrimByLength((stream *)key->value->ptr, length, approx); +} + +/* Trim a stream by ID, similar to XTRIM with MINID. + * + * - `key`: Key opened for writing. + * - `flags`: A bitfield of + * - `REDISMODULE_STREAM_TRIM_APPROX`: Trim less if it improves performance, + * like XTRIM with `~`. + * - `id`: The smallest stream ID to keep after trimming. + * + * Returns the number of entries deleted. On failure, a negative value is + * returned and `errno` is set as follows: + * + * - EINVAL if called with invalid arguments + * - ENOTSUP if the key is empty or of a type other than stream + * - EBADF if the key is not opened for writing + */ +long long RM_StreamTrimByID(RedisModuleKey *key, int flags, RedisModuleStreamID *id) { + if (!key || (flags & ~(REDISMODULE_STREAM_TRIM_APPROX)) || !id) { + errno = EINVAL; + return -1; + } else if (!key->value || key->value->type != OBJ_STREAM) { + errno = ENOTSUP; + return -1; + } else if (!(key->mode & REDISMODULE_WRITE)) { + errno = EBADF; + return -1; + } + int approx = flags & REDISMODULE_STREAM_TRIM_APPROX ? 1 : 0; + streamID minid = (streamID){id->ms, id->seq}; + return streamTrimByID((stream *)key->value->ptr, minid, approx); +} + +/* -------------------------------------------------------------------------- * Redis <-> Modules generic Call() API * -------------------------------------------------------------------------- */ @@ -3162,9 +3709,8 @@ void moduleParseCallReply_Array(RedisModuleCallReply *reply) { reply->type = REDISMODULE_REPLY_ARRAY; } -/* Free a Call reply and all the nested replies it contains if it's an - * array. */ -void RM_FreeCallReply_Rec(RedisModuleCallReply *reply, int freenested){ +/* Recursive free reply function. */ +void moduleFreeCallReplyRec(RedisModuleCallReply *reply, int freenested){ /* Don't free nested replies by default: the user must always free the * toplevel reply. However be gentle and don't crash if the module * misuses the API. */ @@ -3174,7 +3720,7 @@ void RM_FreeCallReply_Rec(RedisModuleCallReply *reply, int freenested){ if (reply->type == REDISMODULE_REPLY_ARRAY) { size_t j; for (j = 0; j < reply->len; j++) - RM_FreeCallReply_Rec(reply->val.array+j,1); + moduleFreeCallReplyRec(reply->val.array+j,1); zfree(reply->val.array); } } @@ -3189,13 +3735,14 @@ void RM_FreeCallReply_Rec(RedisModuleCallReply *reply, int freenested){ } } -/* Wrapper for the recursive free reply function. This is needed in order - * to have the first level function to return on nested replies, but only - * if called by the module API. */ +/* Free a Call reply and all the nested replies it contains if it's an + * array. */ void RM_FreeCallReply(RedisModuleCallReply *reply) { - + /* This is a wrapper for the recursive free reply function. This is needed + * in order to have the first level function to return on nested replies, + * but only if called by the module API. */ RedisModuleCtx *ctx = reply->ctx; - RM_FreeCallReply_Rec(reply,0); + moduleFreeCallReplyRec(reply,0); autoMemoryFreed(ctx,REDISMODULE_AM_REPLY,reply); } @@ -3347,30 +3894,31 @@ fmterr: * * * **cmdname**: The Redis command to call. * * **fmt**: A format specifier string for the command's arguments. Each - * of the arguments should be specified by a valid type specification: - * b The argument is a buffer and is immediately followed by another - * argument that is the buffer's length. - * c The argument is a pointer to a plain C string (null-terminated). - * l The argument is long long integer. - * s The argument is a RedisModuleString. - * v The argument(s) is a vector of RedisModuleString. - * - * The format specifier can also include modifiers: - * ! Sends the Redis command and its arguments to replicas and AOF. - * A Suppress AOF propagation, send only to replicas (requires `!`). - * R Suppress replicas propagation, send only to AOF (requires `!`). + * of the arguments should be specified by a valid type specification. The + * format specifier can also contain the modifiers `!`, `A` and `R` which + * don't have a corresponding argument. + * + * * `b` -- The argument is a buffer and is immediately followed by another + * argument that is the buffer's length. + * * `c` -- The argument is a pointer to a plain C string (null-terminated). + * * `l` -- The argument is long long integer. + * * `s` -- The argument is a RedisModuleString. + * * `v` -- The argument(s) is a vector of RedisModuleString. + * * `!` -- Sends the Redis command and its arguments to replicas and AOF. + * * `A` -- Suppress AOF propagation, send only to replicas (requires `!`). + * * `R` -- Suppress replicas propagation, send only to AOF (requires `!`). * * **...**: The actual arguments to the Redis command. * * On success a RedisModuleCallReply object is returned, otherwise * NULL is returned and errno is set to the following values: * - * EBADF: wrong format specifier. - * EINVAL: wrong command arity. - * ENOENT: command does not exist. - * EPERM: operation in Cluster instance with key in non local slot. - * EROFS: operation in Cluster instance when a write command is sent - * in a readonly state. - * ENETDOWN: operation in Cluster instance when cluster is down. + * * EBADF: wrong format specifier. + * * EINVAL: wrong command arity. + * * ENOENT: command does not exist. + * * EPERM: operation in Cluster instance with key in non local slot. + * * EROFS: operation in Cluster instance when a write command is sent + * in a readonly state. + * * ENETDOWN: operation in Cluster instance when cluster is down. * * Example code fragment: * @@ -3682,27 +4230,28 @@ robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, robj *value) { * still load old data produced by an older version if the rdb_load * callback is able to check the encver value and act accordingly. * The encver must be a positive value between 0 and 1023. + * * * **typemethods_ptr** is a pointer to a RedisModuleTypeMethods structure * that should be populated with the methods callbacks and structure * version, like in the following example: * - * RedisModuleTypeMethods tm = { - * .version = REDISMODULE_TYPE_METHOD_VERSION, - * .rdb_load = myType_RDBLoadCallBack, - * .rdb_save = myType_RDBSaveCallBack, - * .aof_rewrite = myType_AOFRewriteCallBack, - * .free = myType_FreeCallBack, - * - * // Optional fields - * .digest = myType_DigestCallBack, - * .mem_usage = myType_MemUsageCallBack, - * .aux_load = myType_AuxRDBLoadCallBack, - * .aux_save = myType_AuxRDBSaveCallBack, - * .free_effort = myType_FreeEffortCallBack, - * .unlink = myType_UnlinkCallBack, - * .copy = myType_CopyCallback, - * .defrag = myType_DefragCallback - * } + * RedisModuleTypeMethods tm = { + * .version = REDISMODULE_TYPE_METHOD_VERSION, + * .rdb_load = myType_RDBLoadCallBack, + * .rdb_save = myType_RDBSaveCallBack, + * .aof_rewrite = myType_AOFRewriteCallBack, + * .free = myType_FreeCallBack, + * + * // Optional fields + * .digest = myType_DigestCallBack, + * .mem_usage = myType_MemUsageCallBack, + * .aux_load = myType_AuxRDBLoadCallBack, + * .aux_save = myType_AuxRDBSaveCallBack, + * .free_effort = myType_FreeEffortCallBack, + * .unlink = myType_UnlinkCallBack, + * .copy = myType_CopyCallback, + * .defrag = myType_DefragCallback + * } * * * **rdb_load**: A callback function pointer that loads data from RDB files. * * **rdb_save**: A callback function pointer that saves data to RDB files. @@ -3740,7 +4289,7 @@ robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, robj *value) { * a time limit and provides cursor support is used only for keys that are determined * to have significant internal complexity. To determine this, the defrag mechanism * uses the free_effort callback and the 'active-defrag-max-scan-fields' config directive. - * NOTE: The value is passed as a void** and the function is expected to update the + * NOTE: The value is passed as a `void**` and the function is expected to update the * pointer if the top-level value pointer is defragmented and consequentially changes. * * Note: the module name "AAAAAAAAA" is reserved and produces an error, it @@ -3900,7 +4449,7 @@ int moduleAllDatatypesHandleErrors() { } /* Returns true if any previous IO API failed. - * for Load* APIs the REDISMODULE_OPTIONS_HANDLE_IO_ERRORS flag must be set with + * for `Load*` APIs the REDISMODULE_OPTIONS_HANDLE_IO_ERRORS flag must be set with * RedisModule_SetModuleOptions first. */ int RM_IsIOError(RedisModuleIO *io) { return io->error; @@ -3926,7 +4475,7 @@ saveerr: } /* Load an unsigned 64 bit value from the RDB file. This function should only - * be called in the context of the rdb_load method of modules implementing + * be called in the context of the `rdb_load` method of modules implementing * new data types. */ uint64_t RM_LoadUnsigned(RedisModuleIO *io) { if (io->error) return 0; @@ -4242,7 +4791,6 @@ void RM_DigestEndSequence(RedisModuleDigest *md) { * If this is NOT done, Redis will handle corrupted (or just truncated) serialized * data by producing an error message and terminating the process. */ - void *RM_LoadDataTypeFromString(const RedisModuleString *str, const moduleType *mt) { rio payload; RedisModuleIO io; @@ -4270,7 +4818,6 @@ void *RM_LoadDataTypeFromString(const RedisModuleString *str, const moduleType * * implement in order to allow a module to arbitrarily serialize/de-serialize * keys, similar to how the Redis 'DUMP' and 'RESTORE' commands are implemented. */ - RedisModuleString *RM_SaveDataTypeToString(RedisModuleCtx *ctx, void *data, const moduleType *mt) { rio payload; RedisModuleIO io; @@ -4368,7 +4915,7 @@ const RedisModuleString *RM_GetKeyNameFromIO(RedisModuleIO *io) { return io->key; } -/* Returns a RedisModuleString with the name of the key from RedisModuleKey */ +/* Returns a RedisModuleString with the name of the key from RedisModuleKey. */ const RedisModuleString *RM_GetKeyNameFromModuleKey(RedisModuleKey *key) { return key ? key->key : NULL; } @@ -4383,7 +4930,7 @@ const RedisModuleString *RM_GetKeyNameFromModuleKey(RedisModuleKey *key) { * RM_LogIOError() * */ -void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_list ap) { +void moduleLogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_list ap) { char msg[LOG_MAX_LEN]; size_t name_len; int level; @@ -4422,7 +4969,7 @@ void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_li void RM_Log(RedisModuleCtx *ctx, const char *levelstr, const char *fmt, ...) { va_list ap; va_start(ap, fmt); - RM_LogRaw(ctx? ctx->module: NULL,levelstr,fmt,ap); + moduleLogRaw(ctx? ctx->module: NULL,levelstr,fmt,ap); va_end(ap); } @@ -4434,12 +4981,15 @@ void RM_Log(RedisModuleCtx *ctx, const char *levelstr, const char *fmt, ...) { void RM_LogIOError(RedisModuleIO *io, const char *levelstr, const char *fmt, ...) { va_list ap; va_start(ap, fmt); - RM_LogRaw(io->type->module,levelstr,fmt,ap); + moduleLogRaw(io->type->module,levelstr,fmt,ap); va_end(ap); } /* Redis-like assert function. * + * The macro `RedisModule_Assert(expression)` is recommended, rather than + * calling this function directly. + * * A failed assertion will shut down the server and produce logging information * that looks identical to information generated by Redis itself. */ @@ -4570,6 +5120,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF bc->dbid = c->db->id; bc->blocked_on_keys = keys != NULL; bc->unblocked = 0; + bc->background_duration = 0; c->bpop.timeout = timeout; if (islua || ismulti) { @@ -4643,6 +5194,11 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key) { * * In these cases, a call to RedisModule_BlockClient() will **not** block the * client, but instead produce a specific error reply. + * + * Measuring background time: By default the time spent in the blocked command + * is not account for the total command duration. To include such time you should + * use RM_BlockedClientMeasureTimeStart() and RM_BlockedClientMeasureTimeEnd() one, + * or multiple times within the blocking command background work. */ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) { return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL); @@ -4673,7 +5229,7 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc * key, or a client in queue before this one can be served, modifying the key * as well and making it empty again. So when a client is blocked with * RedisModule_BlockClientOnKeys() the reply callback is not called after - * RM_UnblockCLient() is called, but every time a key is signaled as ready: + * RM_UnblockClient() is called, but every time a key is signaled as ready: * if the reply callback can serve the client, it returns REDISMODULE_OK * and the client is unblocked, otherwise it will return REDISMODULE_ERR * and we'll try again later. @@ -4837,6 +5393,7 @@ void moduleHandleBlockedClients(void) { * was blocked on keys (RM_BlockClientOnKeys()), because we already * called such callback in moduleTryServeClientBlockedOnKey() when * the key was signaled as ready. */ + uint64_t reply_us = 0; if (c && !bc->blocked_on_keys && bc->reply_callback) { RedisModuleCtx ctx = REDISMODULE_CTX_INIT; ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY; @@ -4845,9 +5402,19 @@ void moduleHandleBlockedClients(void) { ctx.module = bc->module; ctx.client = bc->client; ctx.blocked_client = bc; + monotime replyTimer; + elapsedStart(&replyTimer); bc->reply_callback(&ctx,(void**)c->argv,c->argc); + reply_us = elapsedUs(replyTimer); moduleFreeContext(&ctx); } + /* Update stats now that we've finished the blocking operation. + * This needs to be out of the reply callback above given that a + * module might not define any callback and still do blocking ops. + */ + if (c && !bc->blocked_on_keys) { + updateStatsOnUnblock(c, bc->background_duration, reply_us); + } /* Free privdata if any. */ if (bc->privdata && bc->free_privdata) { @@ -4911,6 +5478,9 @@ void moduleBlockedClientTimedOut(client *c) { ctx.blocked_privdata = bc->privdata; bc->timeout_callback(&ctx,(void**)c->argv,c->argc); moduleFreeContext(&ctx); + if (!bc->blocked_on_keys) { + updateStatsOnUnblock(c, bc->background_duration, 0); + } /* For timeout events, we do not want to call the disconnect callback, * because the blocked client will be automatically disconnected in * this case, and the user can still hook using the timeout callback. */ @@ -5103,9 +5673,9 @@ void moduleReleaseGIL(void) { * * The subscriber signature is: * - * int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, - * const char *event, - * RedisModuleString *key); + * int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, + * const char *event, + * RedisModuleString *key); * * `type` is the event type bit, that must match the mask given at registration * time. The event string is the actual command being executed, and key is the @@ -5369,28 +5939,27 @@ size_t RM_GetClusterSize(void) { return dictSize(server.cluster->nodes); } +clusterNode *clusterLookupNode(const char *name); /* We need access to internals */ + /* Populate the specified info for the node having as ID the specified 'id', * then returns REDISMODULE_OK. Otherwise if the node ID does not exist from * the POV of this local node, REDISMODULE_ERR is returned. * - * The arguments ip, master_id, port and flags can be NULL in case we don't - * need to populate back certain info. If an ip and master_id (only populated + * The arguments `ip`, `master_id`, `port` and `flags` can be NULL in case we don't + * need to populate back certain info. If an `ip` and `master_id` (only populated * if the instance is a slave) are specified, they point to buffers holding - * at least REDISMODULE_NODE_ID_LEN bytes. The strings written back as ip - * and master_id are not null terminated. + * at least REDISMODULE_NODE_ID_LEN bytes. The strings written back as `ip` + * and `master_id` are not null terminated. * * The list of flags reported is the following: * - * * REDISMODULE_NODE_MYSELF This node - * * REDISMODULE_NODE_MASTER The node is a master - * * REDISMODULE_NODE_SLAVE The node is a replica - * * REDISMODULE_NODE_PFAIL We see the node as failing - * * REDISMODULE_NODE_FAIL The cluster agrees the node is failing - * * REDISMODULE_NODE_NOFAILOVER The slave is configured to never failover + * * REDISMODULE_NODE_MYSELF: This node + * * REDISMODULE_NODE_MASTER: The node is a master + * * REDISMODULE_NODE_SLAVE: The node is a replica + * * REDISMODULE_NODE_PFAIL: We see the node as failing + * * REDISMODULE_NODE_FAIL: The cluster agrees the node is failing + * * REDISMODULE_NODE_NOFAILOVER: The slave is configured to never failover */ - -clusterNode *clusterLookupNode(const char *name); /* We need access to internals */ - int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *master_id, int *port, int *flags) { UNUSED(ctx); @@ -5434,18 +6003,18 @@ int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *m * a different distributed system, but still want to use the Redis Cluster * message bus. Flags that can be set: * - * CLUSTER_MODULE_FLAG_NO_FAILOVER - * CLUSTER_MODULE_FLAG_NO_REDIRECTION + * * CLUSTER_MODULE_FLAG_NO_FAILOVER + * * CLUSTER_MODULE_FLAG_NO_REDIRECTION * * With the following effects: * - * NO_FAILOVER: prevent Redis Cluster slaves to failover a failing master. - * Also disables the replica migration feature. + * * NO_FAILOVER: prevent Redis Cluster slaves to failover a failing master. + * Also disables the replica migration feature. * - * NO_REDIRECTION: Every node will accept any key, without trying to perform - * partitioning according to the user Redis Cluster algorithm. - * Slots informations will still be propagated across the - * cluster, but without effects. */ + * * NO_REDIRECTION: Every node will accept any key, without trying to perform + * partitioning according to the user Redis Cluster algorithm. + * Slots informations will still be propagated across the + * cluster, but without effects. */ void RM_SetClusterFlags(RedisModuleCtx *ctx, uint64_t flags) { UNUSED(ctx); if (flags & REDISMODULE_CLUSTER_FLAG_NO_FAILOVER) @@ -5964,15 +6533,15 @@ int RM_DictDel(RedisModuleDict *d, RedisModuleString *key, void *oldval) { * comparison operator to use in order to seek the first element. The * operators available are: * - * "^" -- Seek the first (lexicographically smaller) key. - * "$" -- Seek the last (lexicographically biffer) key. - * ">" -- Seek the first element greater than the specified key. - * ">=" -- Seek the first element greater or equal than the specified key. - * "<" -- Seek the first element smaller than the specified key. - * "<=" -- Seek the first element smaller or equal than the specified key. - * "==" -- Seek the first element matching exactly the specified key. + * * `^` -- Seek the first (lexicographically smaller) key. + * * `$` -- Seek the last (lexicographically biffer) key. + * * `>` -- Seek the first element greater than the specified key. + * * `>=` -- Seek the first element greater or equal than the specified key. + * * `<` -- Seek the first element smaller than the specified key. + * * `<=` -- Seek the first element smaller or equal than the specified key. + * * `==` -- Seek the first element matching exactly the specified key. * - * Note that for "^" and "$" the passed key is not used, and the user may + * Note that for `^` and `$` the passed key is not used, and the user may * just pass NULL with a length of 0. * * If the element to start the iteration cannot be seeked based on the @@ -6017,11 +6586,11 @@ int RM_DictIteratorReseek(RedisModuleDictIter *di, const char *op, RedisModuleSt return RM_DictIteratorReseekC(di,op,key->ptr,sdslen(key->ptr)); } -/* Return the current item of the dictionary iterator 'di' and steps to the +/* Return the current item of the dictionary iterator `di` and steps to the * next element. If the iterator already yield the last element and there * are no other elements to return, NULL is returned, otherwise a pointer - * to a string representing the key is provided, and the '*keylen' length - * is set by reference (if keylen is not NULL). The '*dataptr', if not NULL + * to a string representing the key is provided, and the `*keylen` length + * is set by reference (if keylen is not NULL). The `*dataptr`, if not NULL * is set to the value of the pointer stored at the returned key as auxiliary * data (as set by the RedisModule_DictSet API). * @@ -6035,7 +6604,7 @@ int RM_DictIteratorReseek(RedisModuleDictIter *di, const char *op, RedisModuleSt * } * * The returned pointer is of type void because sometimes it makes sense - * to cast it to a char* sometimes to an unsigned char* depending on the + * to cast it to a `char*` sometimes to an unsigned `char*` depending on the * fact it contains or not binary data, so this API ends being more * comfortable to use. * @@ -6119,8 +6688,8 @@ int RM_DictCompare(RedisModuleDictIter *di, const char *op, RedisModuleString *k int RM_InfoEndDictField(RedisModuleInfoCtx *ctx); /* Used to start a new section, before adding any fields. the section name will - * be prefixed by "<modulename>_" and must only include A-Z,a-z,0-9. - * NULL or empty string indicates the default section (only <modulename>) is used. + * be prefixed by `<modulename>_` and must only include A-Z,a-z,0-9. + * NULL or empty string indicates the default section (only `<modulename>`) is used. * When return value is REDISMODULE_ERR, the section should and will be skipped. */ int RM_InfoAddSection(RedisModuleInfoCtx *ctx, char *name) { sds full_name = sdsdup(ctx->module->name); @@ -6180,8 +6749,8 @@ int RM_InfoEndDictField(RedisModuleInfoCtx *ctx) { } /* Used by RedisModuleInfoFunc to add info fields. - * Each field will be automatically prefixed by "<modulename>_". - * Field names or values must not include \r\n of ":" */ + * Each field will be automatically prefixed by `<modulename>_`. + * Field names or values must not include `\r\n` or `:`. */ int RM_InfoAddFieldString(RedisModuleInfoCtx *ctx, char *field, RedisModuleString *value) { if (!ctx->in_section) return REDISMODULE_ERR; @@ -6200,6 +6769,7 @@ int RM_InfoAddFieldString(RedisModuleInfoCtx *ctx, char *field, RedisModuleStrin return REDISMODULE_OK; } +/* See RedisModule_InfoAddFieldString(). */ int RM_InfoAddFieldCString(RedisModuleInfoCtx *ctx, char *field, char *value) { if (!ctx->in_section) return REDISMODULE_ERR; @@ -6218,6 +6788,7 @@ int RM_InfoAddFieldCString(RedisModuleInfoCtx *ctx, char *field, char *value) { return REDISMODULE_OK; } +/* See RedisModule_InfoAddFieldString(). */ int RM_InfoAddFieldDouble(RedisModuleInfoCtx *ctx, char *field, double value) { if (!ctx->in_section) return REDISMODULE_ERR; @@ -6236,6 +6807,7 @@ int RM_InfoAddFieldDouble(RedisModuleInfoCtx *ctx, char *field, double value) { return REDISMODULE_OK; } +/* See RedisModule_InfoAddFieldString(). */ int RM_InfoAddFieldLongLong(RedisModuleInfoCtx *ctx, char *field, long long value) { if (!ctx->in_section) return REDISMODULE_ERR; @@ -6254,6 +6826,7 @@ int RM_InfoAddFieldLongLong(RedisModuleInfoCtx *ctx, char *field, long long valu return REDISMODULE_OK; } +/* See RedisModule_InfoAddFieldString(). */ int RM_InfoAddFieldULongLong(RedisModuleInfoCtx *ctx, char *field, unsigned long long value) { if (!ctx->in_section) return REDISMODULE_ERR; @@ -6272,6 +6845,8 @@ int RM_InfoAddFieldULongLong(RedisModuleInfoCtx *ctx, char *field, unsigned long return REDISMODULE_OK; } +/* Registers callback for the INFO command. The callback should add INFO fields + * by calling the `RedisModule_InfoAddField*()` functions. */ int RM_RegisterInfoFunc(RedisModuleCtx *ctx, RedisModuleInfoFunc cb) { ctx->module->info_cb = cb; return REDISMODULE_OK; @@ -6711,7 +7286,6 @@ const RedisModuleString *RM_CommandFilterArgGet(RedisModuleCommandFilterCtx *fct * after the filter context is destroyed, so it must not be auto-memory * allocated, freed or used elsewhere. */ - int RM_CommandFilterArgInsert(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg) { int i; @@ -6733,7 +7307,6 @@ int RM_CommandFilterArgInsert(RedisModuleCommandFilterCtx *fctx, int pos, RedisM * filter context is destroyed, so it must not be auto-memory allocated, freed * or used elsewhere. */ - int RM_CommandFilterArgReplace(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg) { if (pos < 0 || pos >= fctx->argc) return REDISMODULE_ERR; @@ -6774,10 +7347,10 @@ size_t RM_MallocSize(void* ptr){ /* Return the a number between 0 to 1 indicating the amount of memory * currently used, relative to the Redis "maxmemory" configuration. * - * 0 - No memory limit configured. - * Between 0 and 1 - The percentage of the memory used normalized in 0-1 range. - * Exactly 1 - Memory limit reached. - * Greater 1 - More memory used than the configured limit. + * * 0 - No memory limit configured. + * * Between 0 and 1 - The percentage of the memory used normalized in 0-1 range. + * * Exactly 1 - Memory limit reached. + * * Greater 1 - More memory used than the configured limit. */ float RM_GetUsedMemoryRatio(){ float level; @@ -6840,21 +7413,22 @@ void RM_ScanCursorDestroy(RedisModuleScanCursor *cursor) { * the selected db. * * Callback for scan implementation. - * void scan_callback(RedisModuleCtx *ctx, RedisModuleString *keyname, - * RedisModuleKey *key, void *privdata); - * ctx - the redis module context provided to for the scan. - * keyname - owned by the caller and need to be retained if used after this - * function. * - * key - holds info on the key and value, it is provided as best effort, in - * some cases it might be NULL, in which case the user should (can) use - * RedisModule_OpenKey (and CloseKey too). - * when it is provided, it is owned by the caller and will be free when the - * callback returns. + * void scan_callback(RedisModuleCtx *ctx, RedisModuleString *keyname, + * RedisModuleKey *key, void *privdata); * - * privdata - the user data provided to RedisModule_Scan. + * - `ctx`: the redis module context provided to for the scan. + * - `keyname`: owned by the caller and need to be retained if used after this + * function. + * - `key`: holds info on the key and value, it is provided as best effort, in + * some cases it might be NULL, in which case the user should (can) use + * RedisModule_OpenKey() (and CloseKey too). + * when it is provided, it is owned by the caller and will be free when the + * callback returns. + * - `privdata`: the user data provided to RedisModule_Scan(). * * The way it should be used: + * * RedisModuleCursor *c = RedisModule_ScanCursorCreate(); * while(RedisModule_Scan(ctx, c, callback, privateData)); * RedisModule_ScanCursorDestroy(c); @@ -6938,7 +7512,9 @@ static void moduleScanKeyCallback(void *privdata, const dictEntry *de) { /* Scan api that allows a module to scan the elements in a hash, set or sorted set key * * Callback for scan implementation. - * void scan_callback(RedisModuleKey *key, RedisModuleString* field, RedisModuleString* value, void *privdata); + * + * void scan_callback(RedisModuleKey *key, RedisModuleString* field, RedisModuleString* value, void *privdata); + * * - key - the redis key context provided to for the scan. * - field - field name, owned by the caller and need to be retained if used * after this function. @@ -6947,6 +7523,7 @@ static void moduleScanKeyCallback(void *privdata, const dictEntry *de) { * - privdata - the user data provided to RedisModule_ScanKey. * * The way it should be used: + * * RedisModuleCursor *c = RedisModule_ScanCursorCreate(); * RedisModuleKey *key = RedisModule_OpenKey(...) * while(RedisModule_ScanKey(key, c, callback, privateData)); @@ -6955,6 +7532,7 @@ static void moduleScanKeyCallback(void *privdata, const dictEntry *de) { * * It is also possible to use this API from another thread while the lock is acquired during * the actuall call to RM_ScanKey, and re-opening the key each time: + * * RedisModuleCursor *c = RedisModule_ScanCursorCreate(); * RedisModule_ThreadSafeContextLock(ctx); * RedisModuleKey *key = RedisModule_OpenKey(...) @@ -7159,10 +7737,10 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * * The callback must be of this type: * - * int (*RedisModuleEventCallback)(RedisModuleCtx *ctx, - * RedisModuleEvent eid, - * uint64_t subevent, - * void *data); + * int (*RedisModuleEventCallback)(RedisModuleCtx *ctx, + * RedisModuleEvent eid, + * uint64_t subevent, + * void *data); * * The 'ctx' is a normal Redis module context that the callback can use in * order to call other modules APIs. The 'eid' is the event itself, this @@ -7176,201 +7754,207 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * * Here is a list of events you can use as 'eid' and related sub events: * - * RedisModuleEvent_ReplicationRoleChanged + * * RedisModuleEvent_ReplicationRoleChanged: + * + * This event is called when the instance switches from master + * to replica or the other way around, however the event is + * also called when the replica remains a replica but starts to + * replicate with a different master. * - * This event is called when the instance switches from master - * to replica or the other way around, however the event is - * also called when the replica remains a replica but starts to - * replicate with a different master. + * The following sub events are available: * - * The following sub events are available: + * * `REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_MASTER` + * * `REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_REPLICA` * - * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_MASTER - * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_REPLICA + * The 'data' field can be casted by the callback to a + * `RedisModuleReplicationInfo` structure with the following fields: * - * The 'data' field can be casted by the callback to a - * RedisModuleReplicationInfo structure with the following fields: + * int master; // true if master, false if replica + * char *masterhost; // master instance hostname for NOW_REPLICA + * int masterport; // master instance port for NOW_REPLICA + * char *replid1; // Main replication ID + * char *replid2; // Secondary replication ID + * uint64_t repl1_offset; // Main replication offset + * uint64_t repl2_offset; // Offset of replid2 validity * - * int master; // true if master, false if replica - * char *masterhost; // master instance hostname for NOW_REPLICA - * int masterport; // master instance port for NOW_REPLICA - * char *replid1; // Main replication ID - * char *replid2; // Secondary replication ID - * uint64_t repl1_offset; // Main replication offset - * uint64_t repl2_offset; // Offset of replid2 validity + * * RedisModuleEvent_Persistence * - * RedisModuleEvent_Persistence + * This event is called when RDB saving or AOF rewriting starts + * and ends. The following sub events are available: * - * This event is called when RDB saving or AOF rewriting starts - * and ends. The following sub events are available: + * * `REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START` + * * `REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START` + * * `REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START` + * * `REDISMODULE_SUBEVENT_PERSISTENCE_ENDED` + * * `REDISMODULE_SUBEVENT_PERSISTENCE_FAILED` * - * REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START - * REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START - * REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START - * REDISMODULE_SUBEVENT_PERSISTENCE_ENDED - * REDISMODULE_SUBEVENT_PERSISTENCE_FAILED + * The above events are triggered not just when the user calls the + * relevant commands like BGSAVE, but also when a saving operation + * or AOF rewriting occurs because of internal server triggers. + * The SYNC_RDB_START sub events are happening in the forground due to + * SAVE command, FLUSHALL, or server shutdown, and the other RDB and + * AOF sub events are executed in a background fork child, so any + * action the module takes can only affect the generated AOF or RDB, + * but will not be reflected in the parent process and affect connected + * clients and commands. Also note that the AOF_START sub event may end + * up saving RDB content in case of an AOF with rdb-preamble. * - * The above events are triggered not just when the user calls the - * relevant commands like BGSAVE, but also when a saving operation - * or AOF rewriting occurs because of internal server triggers. - * The SYNC_RDB_START sub events are happening in the forground due to - * SAVE command, FLUSHALL, or server shutdown, and the other RDB and - * AOF sub events are executed in a background fork child, so any - * action the module takes can only affect the generated AOF or RDB, - * but will not be reflected in the parent process and affect connected - * clients and commands. Also note that the AOF_START sub event may end - * up saving RDB content in case of an AOF with rdb-preamble. + * * RedisModuleEvent_FlushDB * - * RedisModuleEvent_FlushDB + * The FLUSHALL, FLUSHDB or an internal flush (for instance + * because of replication, after the replica synchronization) + * happened. The following sub events are available: * - * The FLUSHALL, FLUSHDB or an internal flush (for instance - * because of replication, after the replica synchronization) - * happened. The following sub events are available: + * * `REDISMODULE_SUBEVENT_FLUSHDB_START` + * * `REDISMODULE_SUBEVENT_FLUSHDB_END` * - * REDISMODULE_SUBEVENT_FLUSHDB_START - * REDISMODULE_SUBEVENT_FLUSHDB_END + * The data pointer can be casted to a RedisModuleFlushInfo + * structure with the following fields: * - * The data pointer can be casted to a RedisModuleFlushInfo - * structure with the following fields: + * int32_t async; // True if the flush is done in a thread. + * // See for instance FLUSHALL ASYNC. + * // In this case the END callback is invoked + * // immediately after the database is put + * // in the free list of the thread. + * int32_t dbnum; // Flushed database number, -1 for all the DBs + * // in the case of the FLUSHALL operation. * - * int32_t async; // True if the flush is done in a thread. - * See for instance FLUSHALL ASYNC. - * In this case the END callback is invoked - * immediately after the database is put - * in the free list of the thread. - * int32_t dbnum; // Flushed database number, -1 for all the DBs - * in the case of the FLUSHALL operation. + * The start event is called *before* the operation is initated, thus + * allowing the callback to call DBSIZE or other operation on the + * yet-to-free keyspace. * - * The start event is called *before* the operation is initated, thus - * allowing the callback to call DBSIZE or other operation on the - * yet-to-free keyspace. + * * RedisModuleEvent_Loading * - * RedisModuleEvent_Loading + * Called on loading operations: at startup when the server is + * started, but also after a first synchronization when the + * replica is loading the RDB file from the master. + * The following sub events are available: * - * Called on loading operations: at startup when the server is - * started, but also after a first synchronization when the - * replica is loading the RDB file from the master. - * The following sub events are available: + * * `REDISMODULE_SUBEVENT_LOADING_RDB_START` + * * `REDISMODULE_SUBEVENT_LOADING_AOF_START` + * * `REDISMODULE_SUBEVENT_LOADING_REPL_START` + * * `REDISMODULE_SUBEVENT_LOADING_ENDED` + * * `REDISMODULE_SUBEVENT_LOADING_FAILED` * - * REDISMODULE_SUBEVENT_LOADING_RDB_START - * REDISMODULE_SUBEVENT_LOADING_AOF_START - * REDISMODULE_SUBEVENT_LOADING_REPL_START - * REDISMODULE_SUBEVENT_LOADING_ENDED - * REDISMODULE_SUBEVENT_LOADING_FAILED + * Note that AOF loading may start with an RDB data in case of + * rdb-preamble, in which case you'll only receive an AOF_START event. * - * Note that AOF loading may start with an RDB data in case of - * rdb-preamble, in which case you'll only receive an AOF_START event. + * * RedisModuleEvent_ClientChange * + * Called when a client connects or disconnects. + * The data pointer can be casted to a RedisModuleClientInfo + * structure, documented in RedisModule_GetClientInfoById(). + * The following sub events are available: * - * RedisModuleEvent_ClientChange + * * `REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED` + * * `REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED` * - * Called when a client connects or disconnects. - * The data pointer can be casted to a RedisModuleClientInfo - * structure, documented in RedisModule_GetClientInfoById(). - * The following sub events are available: + * * RedisModuleEvent_Shutdown * - * REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED - * REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED + * The server is shutting down. No subevents are available. * - * RedisModuleEvent_Shutdown + * * RedisModuleEvent_ReplicaChange * - * The server is shutting down. No subevents are available. + * This event is called when the instance (that can be both a + * master or a replica) get a new online replica, or lose a + * replica since it gets disconnected. + * The following sub events are available: * - * RedisModuleEvent_ReplicaChange + * * `REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE` + * * `REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE` * - * This event is called when the instance (that can be both a - * master or a replica) get a new online replica, or lose a - * replica since it gets disconnected. - * The following sub events are available: + * No additional information is available so far: future versions + * of Redis will have an API in order to enumerate the replicas + * connected and their state. * - * REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE - * REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE + * * RedisModuleEvent_CronLoop * - * No additional information is available so far: future versions - * of Redis will have an API in order to enumerate the replicas - * connected and their state. + * This event is called every time Redis calls the serverCron() + * function in order to do certain bookkeeping. Modules that are + * required to do operations from time to time may use this callback. + * Normally Redis calls this function 10 times per second, but + * this changes depending on the "hz" configuration. + * No sub events are available. * - * RedisModuleEvent_CronLoop + * The data pointer can be casted to a RedisModuleCronLoop + * structure with the following fields: * - * This event is called every time Redis calls the serverCron() - * function in order to do certain bookkeeping. Modules that are - * required to do operations from time to time may use this callback. - * Normally Redis calls this function 10 times per second, but - * this changes depending on the "hz" configuration. - * No sub events are available. + * int32_t hz; // Approximate number of events per second. * - * The data pointer can be casted to a RedisModuleCronLoop - * structure with the following fields: + * * RedisModuleEvent_MasterLinkChange * - * int32_t hz; // Approximate number of events per second. + * This is called for replicas in order to notify when the + * replication link becomes functional (up) with our master, + * or when it goes down. Note that the link is not considered + * up when we just connected to the master, but only if the + * replication is happening correctly. + * The following sub events are available: * - * RedisModuleEvent_MasterLinkChange + * * `REDISMODULE_SUBEVENT_MASTER_LINK_UP` + * * `REDISMODULE_SUBEVENT_MASTER_LINK_DOWN` * - * This is called for replicas in order to notify when the - * replication link becomes functional (up) with our master, - * or when it goes down. Note that the link is not considered - * up when we just connected to the master, but only if the - * replication is happening correctly. - * The following sub events are available: + * * RedisModuleEvent_ModuleChange * - * REDISMODULE_SUBEVENT_MASTER_LINK_UP - * REDISMODULE_SUBEVENT_MASTER_LINK_DOWN + * This event is called when a new module is loaded or one is unloaded. + * The following sub events are available: * - * RedisModuleEvent_ModuleChange + * * `REDISMODULE_SUBEVENT_MODULE_LOADED` + * * `REDISMODULE_SUBEVENT_MODULE_UNLOADED` * - * This event is called when a new module is loaded or one is unloaded. - * The following sub events are available: + * The data pointer can be casted to a RedisModuleModuleChange + * structure with the following fields: * - * REDISMODULE_SUBEVENT_MODULE_LOADED - * REDISMODULE_SUBEVENT_MODULE_UNLOADED + * const char* module_name; // Name of module loaded or unloaded. + * int32_t module_version; // Module version. * - * The data pointer can be casted to a RedisModuleModuleChange - * structure with the following fields: + * * RedisModuleEvent_LoadingProgress * - * const char* module_name; // Name of module loaded or unloaded. - * int32_t module_version; // Module version. + * This event is called repeatedly called while an RDB or AOF file + * is being loaded. + * The following sub events are availble: * - * RedisModuleEvent_LoadingProgress + * * `REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB` + * * `REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF` * - * This event is called repeatedly called while an RDB or AOF file - * is being loaded. - * The following sub events are availble: + * The data pointer can be casted to a RedisModuleLoadingProgress + * structure with the following fields: * - * REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB - * REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF + * int32_t hz; // Approximate number of events per second. + * int32_t progress; // Approximate progress between 0 and 1024, + * // or -1 if unknown. * - * The data pointer can be casted to a RedisModuleLoadingProgress - * structure with the following fields: + * * RedisModuleEvent_SwapDB * - * int32_t hz; // Approximate number of events per second. - * int32_t progress; // Approximate progress between 0 and 1024, - * or -1 if unknown. + * This event is called when a SWAPDB command has been successfully + * Executed. + * For this event call currently there is no subevents available. * - * RedisModuleEvent_SwapDB + * The data pointer can be casted to a RedisModuleSwapDbInfo + * structure with the following fields: * - * This event is called when a SWAPDB command has been successfully - * Executed. - * For this event call currently there is no subevents available. + * int32_t dbnum_first; // Swap Db first dbnum + * int32_t dbnum_second; // Swap Db second dbnum * - * The data pointer can be casted to a RedisModuleSwapDbInfo - * structure with the following fields: + * * RedisModuleEvent_ReplBackup * - * int32_t dbnum_first; // Swap Db first dbnum - * int32_t dbnum_second; // Swap Db second dbnum + * Called when diskless-repl-load config is set to swapdb, + * And redis needs to backup the the current database for the + * possibility to be restored later. A module with global data and + * maybe with aux_load and aux_save callbacks may need to use this + * notification to backup / restore / discard its globals. + * The following sub events are available: * - * RedisModuleEvent_ReplBackup + * * `REDISMODULE_SUBEVENT_REPL_BACKUP_CREATE` + * * `REDISMODULE_SUBEVENT_REPL_BACKUP_RESTORE` + * * `REDISMODULE_SUBEVENT_REPL_BACKUP_DISCARD` * - * Called when diskless-repl-load config is set to swapdb, - * And redis needs to backup the the current database for the - * possibility to be restored later. A module with global data and - * maybe with aux_load and aux_save callbacks may need to use this - * notification to backup / restore / discard its globals. - * The following sub events are available: + * * RedisModuleEvent_ForkChild * - * REDISMODULE_SUBEVENT_REPL_BACKUP_CREATE - * REDISMODULE_SUBEVENT_REPL_BACKUP_RESTORE - * REDISMODULE_SUBEVENT_REPL_BACKUP_DISCARD + * Called when a fork child (AOFRW, RDBSAVE, module fork...) is born/dies + * The following sub events are available: * + * * `REDISMODULE_SUBEVENT_FORK_CHILD_BORN` + * * `REDISMODULE_SUBEVENT_FORK_CHILD_DIED` * * The function returns REDISMODULE_OK if the module was successfully subscribed * for the specified event. If the API is called from a wrong context or unsupported event @@ -7444,6 +8028,8 @@ int RM_IsSubEventSupported(RedisModuleEvent event, int64_t subevent) { return subevent < _REDISMODULE_SUBEVENT_SWAPDB_NEXT; case REDISMODULE_EVENT_REPL_BACKUP: return subevent < _REDISMODULE_SUBEVENT_REPL_BACKUP_NEXT; + case REDISMODULE_EVENT_FORK_CHILD: + return subevent < _REDISMODULE_SUBEVENT_FORK_CHILD_NEXT; default: break; } @@ -7659,6 +8245,11 @@ void moduleInitModulesSystem(void) { anetNonBlock(NULL,server.module_blocked_pipe[0]); anetNonBlock(NULL,server.module_blocked_pipe[1]); + /* Enable close-on-exec flag on pipes in case of the fork-exec system calls in + * sentinels or redis servers. */ + anetCloexec(server.module_blocked_pipe[0]); + anetCloexec(server.module_blocked_pipe[1]); + /* Create the timers radix tree. */ Timers = raxNew(); @@ -8064,7 +8655,8 @@ int RM_GetLFU(RedisModuleKey *key, long long *lfu_freq) { * the module can check if a certain set of flags are supported * by the redis server version in use. * Example: - * int supportedFlags = RM_GetContextFlagsAll() + * + * int supportedFlags = RM_GetContextFlagsAll(); * if (supportedFlags & REDISMODULE_CTX_FLAGS_MULTI) { * // REDISMODULE_CTX_FLAGS_MULTI is supported * } else{ @@ -8080,7 +8672,8 @@ int RM_GetContextFlagsAll() { * the module can check if a certain set of flags are supported * by the redis server version in use. * Example: - * int supportedFlags = RM_GetKeyspaceNotificationFlagsAll() + * + * int supportedFlags = RM_GetKeyspaceNotificationFlagsAll(); * if (supportedFlags & REDISMODULE_NOTIFY_LOADED) { * // REDISMODULE_NOTIFY_LOADED is supported * } else{ @@ -8150,8 +8743,8 @@ int RM_ModuleTypeReplaceValue(RedisModuleKey *key, moduleType *mt, void *new_val * an error condition. Error conditions are indicated by setting errno * as folllows: * - * ENOENT: Specified command does not exist. - * EINVAL: Invalid command arity specified. + * * ENOENT: Specified command does not exist. + * * EINVAL: Invalid command arity specified. * * NOTE: The returned array is not a Redis Module object so it does not * get automatically freed even when auto-memory is used. The caller @@ -8247,11 +8840,11 @@ int RM_DefragShouldStop(RedisModuleDefragCtx *ctx) { * data type. * * This behavior is reserved to cases where late defrag is performed. Late - * defrag is selected for keys that implement the free_effort callback and - * return a free_effort value that is larger than the defrag + * defrag is selected for keys that implement the `free_effort` callback and + * return a `free_effort` value that is larger than the defrag * 'active-defrag-max-scan-fields' configuration directive. * - * Smaller keys, keys that do not implement free_effort or the global + * Smaller keys, keys that do not implement `free_effort` or the global * defrag callback are not called in late-defrag mode. In those cases, a * call to this function will return REDISMODULE_ERR. * @@ -8273,7 +8866,7 @@ int RM_DefragCursorSet(RedisModuleDefragCtx *ctx, unsigned long cursor) { /* Fetch a cursor value that has been previously stored using RM_DefragCursorSet(). * * If not called for a late defrag operation, REDISMODULE_ERR will be returned and - * the cursor should be ignored. See DM_DefragCursorSet() for more details on + * the cursor should be ignored. See RM_DefragCursorSet() for more details on * defrag cursors. */ int RM_DefragCursorGet(RedisModuleDefragCtx *ctx, unsigned long *cursor) { @@ -8445,6 +9038,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(StringToLongLong); REGISTER_API(StringToDouble); REGISTER_API(StringToLongDouble); + REGISTER_API(StringToStreamID); REGISTER_API(Call); REGISTER_API(CallReplyProto); REGISTER_API(FreeCallReply); @@ -8459,6 +9053,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(CreateStringFromDouble); REGISTER_API(CreateStringFromLongDouble); REGISTER_API(CreateStringFromString); + REGISTER_API(CreateStringFromStreamID); REGISTER_API(CreateStringPrintf); REGISTER_API(FreeString); REGISTER_API(StringPtrLen); @@ -8490,6 +9085,15 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(ZsetRangeEndReached); REGISTER_API(HashSet); REGISTER_API(HashGet); + REGISTER_API(StreamAdd); + REGISTER_API(StreamDelete); + REGISTER_API(StreamIteratorStart); + REGISTER_API(StreamIteratorStop); + REGISTER_API(StreamIteratorNextID); + REGISTER_API(StreamIteratorNextField); + REGISTER_API(StreamIteratorDelete); + REGISTER_API(StreamTrimByLength); + REGISTER_API(StreamTrimByID); REGISTER_API(IsKeysPositionRequest); REGISTER_API(KeyAtPos); REGISTER_API(GetClientId); @@ -8539,6 +9143,8 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(GetBlockedClientPrivateData); REGISTER_API(AbortBlock); REGISTER_API(Milliseconds); + REGISTER_API(BlockedClientMeasureTimeStart); + REGISTER_API(BlockedClientMeasureTimeEnd); REGISTER_API(GetThreadSafeContext); REGISTER_API(GetDetachedThreadSafeContext); REGISTER_API(FreeThreadSafeContext); |