diff options
Diffstat (limited to 'src/module.c')
-rw-r--r-- | src/module.c | 262 |
1 files changed, 224 insertions, 38 deletions
diff --git a/src/module.c b/src/module.c index a8df08bbb..7277e3e72 100644 --- a/src/module.c +++ b/src/module.c @@ -58,6 +58,7 @@ #include "monotonic.h" #include "script.h" #include "call_reply.h" +#include "hdr_histogram.h" #include <dlfcn.h> #include <sys/stat.h> #include <sys/wait.h> @@ -516,13 +517,20 @@ void moduleCreateContext(RedisModuleCtx *out_ctx, RedisModule *module, int ctx_f * You should avoid using malloc(). * This function panics if unable to allocate enough memory. */ void *RM_Alloc(size_t bytes) { - return zmalloc(bytes); + /* Use 'zmalloc_usable()' instead of 'zmalloc()' to allow the compiler + * to recognize the additional memory size, which means that modules can + * use the memory reported by 'RM_MallocUsableSize()' safely. In theory this + * isn't really needed since this API can't be inlined (not even for embedded + * modules like TLS (we use function pointers for module APIs), and the API doesn't + * have the malloc_size attribute, but it's hard to predict how smart future compilers + * will be, so better safe than sorry. */ + return zmalloc_usable(bytes,NULL); } /* Similar to RM_Alloc, but returns NULL in case of allocation failure, instead * of panicking. */ void *RM_TryAlloc(size_t bytes) { - return ztrymalloc(bytes); + return ztrymalloc_usable(bytes,NULL); } /* Use like calloc(). Memory allocated with this function is reported in @@ -530,12 +538,12 @@ void *RM_TryAlloc(size_t bytes) { * and in general is taken into account as memory allocated by Redis. * You should avoid using calloc() directly. */ void *RM_Calloc(size_t nmemb, size_t size) { - return zcalloc(nmemb*size); + return zcalloc_usable(nmemb*size,NULL); } /* Use like realloc() for memory obtained with RedisModule_Alloc(). */ void* RM_Realloc(void *ptr, size_t bytes) { - return zrealloc(ptr,bytes); + return zrealloc_usable(ptr,bytes,NULL); } /* Use like free() for memory obtained by RedisModule_Alloc() and @@ -647,6 +655,7 @@ void moduleReleaseTempClient(client *c) { clearClientConnectionState(c); listEmpty(c->reply); c->reply_bytes = 0; + c->duration = 0; resetClient(c); c->bufpos = 0; c->flags = CLIENT_MODULE; @@ -780,7 +789,7 @@ int RM_GetApi(const char *funcname, void **targetPtrPtr) { return REDISMODULE_OK; } -void modulePostExecutionUnitOperations() { +void modulePostExecutionUnitOperations(void) { if (server.execution_nesting) return; @@ -1294,10 +1303,9 @@ RedisModuleCommand *moduleCreateCommandProxy(struct RedisModule *module, sds dec cp->rediscmd->proc = RedisModuleCommandDispatcher; cp->rediscmd->flags = flags | CMD_MODULE; cp->rediscmd->module_cmd = cp; - cp->rediscmd->key_specs_max = STATIC_KEY_SPECS_NUM; - cp->rediscmd->key_specs = cp->rediscmd->key_specs_static; if (firstkey != 0) { cp->rediscmd->key_specs_num = 1; + cp->rediscmd->key_specs = zcalloc(sizeof(keySpec)); cp->rediscmd->key_specs[0].flags = CMD_KEY_FULL_ACCESS; if (flags & CMD_MODULE_GETKEYS) cp->rediscmd->key_specs[0].flags |= CMD_KEY_VARIABLE_FLAGS; @@ -1309,6 +1317,7 @@ RedisModuleCommand *moduleCreateCommandProxy(struct RedisModule *module, sds dec cp->rediscmd->key_specs[0].fk.range.limit = 0; } else { cp->rediscmd->key_specs_num = 0; + cp->rediscmd->key_specs = NULL; } populateCommandLegacyRangeSpec(cp->rediscmd); cp->rediscmd->microseconds = 0; @@ -1424,6 +1433,21 @@ moduleCmdArgAt(const RedisModuleCommandInfoVersion *version, return (RedisModuleCommandArg *)((char *)(args) + offset); } +/* Recursively populate the args structure (setting num_args to the number of + * subargs) and return the number of args. */ +int populateArgsStructure(struct redisCommandArg *args) { + if (!args) + return 0; + int count = 0; + while (args->name) { + serverAssert(count < INT_MAX); + args->num_args = populateArgsStructure(args->subargs); + count++; + args++; + } + return count; +} + /* Helper for categoryFlagsFromString(). Attempts to find an acl flag representing the provided flag string * and adds that flag to acl_categories_flags if a match is found. * @@ -1796,7 +1820,7 @@ int RM_SetCommandInfo(RedisModuleCommand *command, const RedisModuleCommandInfo cmd->tips || cmd->args || !(cmd->key_specs_num == 0 || /* Allow key spec populated from legacy (first,last,step) to exist. */ - (cmd->key_specs_num == 1 && cmd->key_specs == cmd->key_specs_static && + (cmd->key_specs_num == 1 && cmd->key_specs[0].begin_search_type == KSPEC_BS_INDEX && cmd->key_specs[0].find_keys_type == KSPEC_FK_RANGE))) { errno = EEXIST; @@ -1847,13 +1871,8 @@ int RM_SetCommandInfo(RedisModuleCommand *command, const RedisModuleCommandInfo while (moduleCmdKeySpecAt(version, info->key_specs, count)->begin_search_type) count++; serverAssert(count < INT_MAX); - if (count <= STATIC_KEY_SPECS_NUM) { - cmd->key_specs_max = STATIC_KEY_SPECS_NUM; - cmd->key_specs = cmd->key_specs_static; - } else { - cmd->key_specs_max = count; - cmd->key_specs = zmalloc(sizeof(keySpec) * count); - } + zfree(cmd->key_specs); + cmd->key_specs = zmalloc(sizeof(keySpec) * count); /* Copy the contents of the RedisModuleCommandKeySpec array. */ cmd->key_specs_num = count; @@ -2257,7 +2276,7 @@ uint64_t RM_MonotonicMicroseconds(void) { } /* Return the current UNIX time in microseconds */ -ustime_t RM_Microseconds() { +ustime_t RM_Microseconds(void) { return ustime(); } @@ -2267,7 +2286,7 @@ ustime_t RM_Microseconds() { * key space notification, causing a module to execute a RedisModule_Call, * causing another notification, etc. * It makes sense that all this callbacks would use the same clock. */ -ustime_t RM_CachedMicroseconds() { +ustime_t RM_CachedMicroseconds(void) { return server.ustime; } @@ -2978,6 +2997,32 @@ int RM_ReplyWithError(RedisModuleCtx *ctx, const char *err) { return REDISMODULE_OK; } +/* Reply with the error create from a printf format and arguments. + * + * If the error code is already passed in the string 'fmt', the error + * code provided is used, otherwise the string "-ERR " for the generic + * error code is automatically added. + * + * The usage is, for example: + * + * RedisModule_ReplyWithErrorFormat(ctx, "An error: %s", "foo"); + * + * RedisModule_ReplyWithErrorFormat(ctx, "-WRONGTYPE Wrong Type: %s", "foo"); + * + * The function always returns REDISMODULE_OK. + */ +int RM_ReplyWithErrorFormat(RedisModuleCtx *ctx, const char *fmt, ...) { + client *c = moduleGetReplyClient(ctx); + if (c == NULL) return REDISMODULE_OK; + + va_list ap; + va_start(ap, fmt); + addReplyErrorFormatInternal(c, 0, fmt, ap); + va_end(ap); + + return REDISMODULE_OK; +} + /* Reply with a simple string (`+... \r\n` in RESP protocol). This replies * are suitable only when sending a small non-binary string with small * overhead, like "OK" or similar replies. @@ -3868,7 +3913,7 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) { * garbage collection tasks, or that do writes and replicate such writes * periodically in timer callbacks or other periodic callbacks. */ -int RM_AvoidReplicaTraffic() { +int RM_AvoidReplicaTraffic(void) { return !!(isPausedActionsWithUpdate(PAUSE_ACTION_REPLICA)); } @@ -3978,7 +4023,7 @@ RedisModuleKey *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) { * // REDISMODULE_OPEN_KEY_NOTOUCH is not supported * } */ -int RM_GetOpenKeyModesAll() { +int RM_GetOpenKeyModesAll(void) { return _REDISMODULE_OPEN_KEY_ALL; } @@ -4650,6 +4695,7 @@ int RM_ZsetAdd(RedisModuleKey *key, double score, RedisModuleString *ele, int *f if (flagsptr) in_flags = moduleZsetAddFlagsToCoreFlags(*flagsptr); if (zsetAdd(key->value,score,ele->ptr,in_flags,&out_flags,NULL) == 0) { if (flagsptr) *flagsptr = 0; + moduleDelKeyIfEmpty(key); return REDISMODULE_ERR; } if (flagsptr) *flagsptr = moduleZsetAddFlagsFromCoreFlags(out_flags); @@ -4678,6 +4724,7 @@ int RM_ZsetIncrby(RedisModuleKey *key, double score, RedisModuleString *ele, int in_flags |= ZADD_IN_INCR; if (zsetAdd(key->value,score,ele->ptr,in_flags,&out_flags,newscore) == 0) { if (flagsptr) *flagsptr = 0; + moduleDelKeyIfEmpty(key); return REDISMODULE_ERR; } if (flagsptr) *flagsptr = moduleZsetAddFlagsFromCoreFlags(out_flags); @@ -5360,6 +5407,7 @@ int RM_StreamAdd(RedisModuleKey *key, int flags, RedisModuleStreamID *id, RedisM /* Either the ID not greater than all existing IDs in the stream, or * the elements are too large to be stored. either way, errno is already * set by streamAppendItem. */ + if (created) moduleDelKeyIfEmpty(key); return REDISMODULE_ERR; } /* Postponed signalKeyAsReady(). Done implicitly by moduleCreateEmptyKey() @@ -6905,7 +6953,7 @@ void moduleRDBLoadError(RedisModuleIO *io) { /* Returns 0 if there's at least one registered data type that did not declare * REDISMODULE_OPTIONS_HANDLE_IO_ERRORS, in which case diskless loading should * be avoided since it could cause data loss. */ -int moduleAllDatatypesHandleErrors() { +int moduleAllDatatypesHandleErrors(void) { dictIterator *di = dictGetIterator(modules); dictEntry *de; @@ -6925,7 +6973,7 @@ int moduleAllDatatypesHandleErrors() { /* Returns 0 if module did not declare REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD, in which case * diskless async loading should be avoided because module doesn't know there can be traffic during * database full resynchronization. */ -int moduleAllModulesHandleReplAsyncLoad() { +int moduleAllModulesHandleReplAsyncLoad(void) { dictIterator *di = dictGetIterator(modules); dictEntry *de; @@ -8373,7 +8421,7 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) { zfree(ctx); } -void moduleGILAfterLock() { +void moduleGILAfterLock(void) { /* We should never get here if we already inside a module * code block which already opened a context. */ serverAssert(server.execution_nesting == 0); @@ -8409,7 +8457,7 @@ int RM_ThreadSafeContextTryLock(RedisModuleCtx *ctx) { return REDISMODULE_OK; } -void moduleGILBeforeUnlock() { +void moduleGILBeforeUnlock(void) { /* We should never get here if we already inside a module * code block which already opened a context, except * the bump-up from moduleGILAcquired. */ @@ -8509,7 +8557,7 @@ void moduleReleaseGIL(void) { * that the notification code will be executed in the middle on Redis logic * (commands logic, eviction, expire). Changing the key space while the logic * runs is dangerous and discouraged. In order to react to key space events with - * write actions, please refer to `RM_AddPostExecutionUnitJob`. + * write actions, please refer to `RM_AddPostNotificationJob`. * * See https://redis.io/topics/notifications for more information. */ @@ -8524,7 +8572,7 @@ int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNoti return REDISMODULE_OK; } -void firePostExecutionUnitJobs() { +void firePostExecutionUnitJobs(void) { /* Avoid propagation of commands. * In that way, postExecutionUnitOperations will prevent * recursive calls to firePostExecutionUnitJobs. @@ -8577,7 +8625,7 @@ int RM_AddPostNotificationJob(RedisModuleCtx *ctx, RedisModulePostNotificationJo /* Get the configured bitmap of notify-keyspace-events (Could be used * for additional filtering in RedisModuleNotificationFunc) */ -int RM_GetNotifyKeyspaceEvents() { +int RM_GetNotifyKeyspaceEvents(void) { return server.notify_keyspace_events; } @@ -9292,7 +9340,7 @@ int RM_EventLoopAddOneShot(RedisModuleEventLoopOneShotFunc func, void *user_data /* This function will check the moduleEventLoopOneShots queue in order to * call the callback for the registered oneshot events. */ -static void eventLoopHandleOneShotEvents() { +static void eventLoopHandleOneShotEvents(void) { pthread_mutex_lock(&moduleEventLoopMutex); if (moduleEventLoopOneShots) { while (listLength(moduleEventLoopOneShots)) { @@ -10393,7 +10441,7 @@ int RM_ExportSharedAPI(RedisModuleCtx *ctx, const char *apiname, void *func) { * * Here is an example: * - * int ... myCommandImplementation() { + * int ... myCommandImplementation(void) { * if (getExternalAPIs() == 0) { * reply with an error here if we cannot have the APIs * } @@ -10693,6 +10741,9 @@ size_t RM_MallocSize(void* ptr) { /* Similar to RM_MallocSize, the difference is that RM_MallocUsableSize * returns the usable size of memory by the module. */ size_t RM_MallocUsableSize(void *ptr) { + /* It is safe to use 'zmalloc_usable_size()' to manipulate additional + * memory space, as we guarantee that the compiler can recognize this + * after 'RM_Alloc', 'RM_TryAlloc', 'RM_Realloc', or 'RM_Calloc'. */ return zmalloc_usable_size(ptr); } @@ -10723,7 +10774,7 @@ size_t RM_MallocSizeDict(RedisModuleDict* dict) { * * Exactly 1 - Memory limit reached. * * Greater 1 - More memory used than the configured limit. */ -float RM_GetUsedMemoryRatio(){ +float RM_GetUsedMemoryRatio(void){ float level; getMaxmemoryState(NULL, NULL, NULL, &level); return level; @@ -10762,7 +10813,7 @@ static void moduleScanCallback(void *privdata, const dictEntry *de) { } /* Create a new cursor to be used with RedisModule_Scan */ -RedisModuleScanCursor *RM_ScanCursorCreate() { +RedisModuleScanCursor *RM_ScanCursorCreate(void) { RedisModuleScanCursor* cursor = zmalloc(sizeof(*cursor)); cursor->cursor = 0; cursor->done = 0; @@ -11645,7 +11696,7 @@ void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid, int flags) { } else if (flags & DB_FLAG_KEY_OVERWRITE) { subevent = REDISMODULE_SUBEVENT_KEY_OVERWRITTEN; } - KeyInfo info = {dbid, key, val, REDISMODULE_WRITE}; + KeyInfo info = {dbid, key, val, REDISMODULE_READ}; moduleFireServerEvent(REDISMODULE_EVENT_KEY, subevent, &info); if (val->type == OBJ_MODULE) { @@ -11925,8 +11976,7 @@ int moduleFreeCommand(struct RedisModule *module, struct redisCommand *cmd) { if (cmd->key_specs[j].begin_search_type == KSPEC_BS_KEYWORD) zfree((char *)cmd->key_specs[j].bs.keyword.keyword); } - if (cmd->key_specs != cmd->key_specs_static) - zfree(cmd->key_specs); + zfree(cmd->key_specs); for (int j = 0; cmd->tips && cmd->tips[j]; j++) zfree((char *)cmd->tips[j]); zfree(cmd->tips); @@ -12741,6 +12791,137 @@ int RM_LoadConfigs(RedisModuleCtx *ctx) { return REDISMODULE_OK; } +/* -------------------------------------------------------------------------- + * ## RDB load/save API + * -------------------------------------------------------------------------- */ + +#define REDISMODULE_RDB_STREAM_FILE 1 + +typedef struct RedisModuleRdbStream { + int type; + + union { + char *filename; + } data; +} RedisModuleRdbStream; + +/* Create a stream object to save/load RDB to/from a file. + * + * This function returns a pointer to RedisModuleRdbStream which is owned + * by the caller. It requires a call to RM_RdbStreamFree() to free + * the object. */ +RedisModuleRdbStream *RM_RdbStreamCreateFromFile(const char *filename) { + RedisModuleRdbStream *stream = zmalloc(sizeof(*stream)); + stream->type = REDISMODULE_RDB_STREAM_FILE; + stream->data.filename = zstrdup(filename); + return stream; +} + +/* Release an RDB stream object. */ +void RM_RdbStreamFree(RedisModuleRdbStream *stream) { + switch (stream->type) { + case REDISMODULE_RDB_STREAM_FILE: + zfree(stream->data.filename); + break; + default: + serverAssert(0); + break; + } + zfree(stream); +} + +/* Load RDB file from the `stream`. Dataset will be cleared first and then RDB + * file will be loaded. + * + * `flags` must be zero. This parameter is for future use. + * + * On success REDISMODULE_OK is returned, otherwise REDISMODULE_ERR is returned + * and errno is set accordingly. + * + * Example: + * + * RedisModuleRdbStream *s = RedisModule_RdbStreamCreateFromFile("exp.rdb"); + * RedisModule_RdbLoad(ctx, s, 0); + * RedisModule_RdbStreamFree(s); + */ +int RM_RdbLoad(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) { + UNUSED(ctx); + + if (!stream || flags != 0) { + errno = EINVAL; + return REDISMODULE_ERR; + } + + /* Not allowed on replicas. */ + if (server.masterhost != NULL) { + errno = ENOTSUP; + return REDISMODULE_ERR; + } + + /* Drop replicas if exist. */ + disconnectSlaves(); + freeReplicationBacklog(); + + if (server.aof_state != AOF_OFF) stopAppendOnly(); + + /* Kill existing RDB fork as it is saving outdated data. Also killing it + * will prevent COW memory issue. */ + if (server.child_type == CHILD_TYPE_RDB) killRDBChild(); + + emptyData(-1,EMPTYDB_NO_FLAGS,NULL); + + /* rdbLoad() can go back to the networking and process network events. If + * RM_RdbLoad() is called inside a command callback, we don't want to + * process the current client. Otherwise, we may free the client or try to + * process next message while we are already in the command callback. */ + if (server.current_client) protectClient(server.current_client); + + serverAssert(stream->type == REDISMODULE_RDB_STREAM_FILE); + int ret = rdbLoad(stream->data.filename,NULL,RDBFLAGS_NONE); + + if (server.current_client) unprotectClient(server.current_client); + if (server.aof_state != AOF_OFF) startAppendOnly(); + + if (ret != RDB_OK) { + errno = (ret == RDB_NOT_EXIST) ? ENOENT : EIO; + return REDISMODULE_ERR; + } + + errno = 0; + return REDISMODULE_OK; +} + +/* Save dataset to the RDB stream. + * + * `flags` must be zero. This parameter is for future use. + * + * On success REDISMODULE_OK is returned, otherwise REDISMODULE_ERR is returned + * and errno is set accordingly. + * + * Example: + * + * RedisModuleRdbStream *s = RedisModule_RdbStreamCreateFromFile("exp.rdb"); + * RedisModule_RdbSave(ctx, s, 0); + * RedisModule_RdbStreamFree(s); + */ +int RM_RdbSave(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) { + UNUSED(ctx); + + if (!stream || flags != 0) { + errno = EINVAL; + return REDISMODULE_ERR; + } + + serverAssert(stream->type == REDISMODULE_RDB_STREAM_FILE); + + if (rdbSaveToFile(stream->data.filename) != C_OK) { + return REDISMODULE_ERR; + } + + errno = 0; + return REDISMODULE_OK; +} + /* Redis MODULE command. * * MODULE LIST @@ -12890,7 +13071,7 @@ int RM_GetLFU(RedisModuleKey *key, long long *lfu_freq) { * // REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS is not supported * } */ -int RM_GetModuleOptionsAll() { +int RM_GetModuleOptionsAll(void) { return _REDISMODULE_OPTIONS_FLAGS_NEXT - 1; } @@ -12907,7 +13088,7 @@ int RM_GetModuleOptionsAll() { * // REDISMODULE_CTX_FLAGS_MULTI is not supported * } */ -int RM_GetContextFlagsAll() { +int RM_GetContextFlagsAll(void) { return _REDISMODULE_CTX_FLAGS_NEXT - 1; } @@ -12924,7 +13105,7 @@ int RM_GetContextFlagsAll() { * // REDISMODULE_NOTIFY_LOADED is not supported * } */ -int RM_GetKeyspaceNotificationFlagsAll() { +int RM_GetKeyspaceNotificationFlagsAll(void) { return _REDISMODULE_NOTIFY_NEXT - 1; } @@ -12932,7 +13113,7 @@ int RM_GetKeyspaceNotificationFlagsAll() { * Return the redis version in format of 0x00MMmmpp. * Example for 6.0.7 the return value will be 0x00060007. */ -int RM_GetServerVersion() { +int RM_GetServerVersion(void) { return REDIS_VERSION_NUM; } @@ -12941,7 +13122,7 @@ int RM_GetServerVersion() { * You can use that when calling RM_CreateDataType to know which fields of * RedisModuleTypeMethods are gonna be supported and which will be ignored. */ -int RM_GetTypeMethodVersion() { +int RM_GetTypeMethodVersion(void) { return REDISMODULE_TYPE_METHOD_VERSION; } @@ -13287,6 +13468,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(WrongArity); REGISTER_API(ReplyWithLongLong); REGISTER_API(ReplyWithError); + REGISTER_API(ReplyWithErrorFormat); REGISTER_API(ReplyWithSimpleString); REGISTER_API(ReplyWithArray); REGISTER_API(ReplyWithMap); @@ -13617,4 +13799,8 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(RegisterEnumConfig); REGISTER_API(LoadConfigs); REGISTER_API(RegisterAuthCallback); + REGISTER_API(RdbStreamCreateFromFile); + REGISTER_API(RdbStreamFree); + REGISTER_API(RdbLoad); + REGISTER_API(RdbSave); } |