summaryrefslogtreecommitdiff
path: root/src/module.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/module.c')
-rw-r--r--src/module.c262
1 files changed, 224 insertions, 38 deletions
diff --git a/src/module.c b/src/module.c
index a8df08bbb..7277e3e72 100644
--- a/src/module.c
+++ b/src/module.c
@@ -58,6 +58,7 @@
#include "monotonic.h"
#include "script.h"
#include "call_reply.h"
+#include "hdr_histogram.h"
#include <dlfcn.h>
#include <sys/stat.h>
#include <sys/wait.h>
@@ -516,13 +517,20 @@ void moduleCreateContext(RedisModuleCtx *out_ctx, RedisModule *module, int ctx_f
* You should avoid using malloc().
* This function panics if unable to allocate enough memory. */
void *RM_Alloc(size_t bytes) {
- return zmalloc(bytes);
+ /* Use 'zmalloc_usable()' instead of 'zmalloc()' to allow the compiler
+ * to recognize the additional memory size, which means that modules can
+ * use the memory reported by 'RM_MallocUsableSize()' safely. In theory this
+ * isn't really needed since this API can't be inlined (not even for embedded
+ * modules like TLS (we use function pointers for module APIs), and the API doesn't
+ * have the malloc_size attribute, but it's hard to predict how smart future compilers
+ * will be, so better safe than sorry. */
+ return zmalloc_usable(bytes,NULL);
}
/* Similar to RM_Alloc, but returns NULL in case of allocation failure, instead
* of panicking. */
void *RM_TryAlloc(size_t bytes) {
- return ztrymalloc(bytes);
+ return ztrymalloc_usable(bytes,NULL);
}
/* Use like calloc(). Memory allocated with this function is reported in
@@ -530,12 +538,12 @@ void *RM_TryAlloc(size_t bytes) {
* and in general is taken into account as memory allocated by Redis.
* You should avoid using calloc() directly. */
void *RM_Calloc(size_t nmemb, size_t size) {
- return zcalloc(nmemb*size);
+ return zcalloc_usable(nmemb*size,NULL);
}
/* Use like realloc() for memory obtained with RedisModule_Alloc(). */
void* RM_Realloc(void *ptr, size_t bytes) {
- return zrealloc(ptr,bytes);
+ return zrealloc_usable(ptr,bytes,NULL);
}
/* Use like free() for memory obtained by RedisModule_Alloc() and
@@ -647,6 +655,7 @@ void moduleReleaseTempClient(client *c) {
clearClientConnectionState(c);
listEmpty(c->reply);
c->reply_bytes = 0;
+ c->duration = 0;
resetClient(c);
c->bufpos = 0;
c->flags = CLIENT_MODULE;
@@ -780,7 +789,7 @@ int RM_GetApi(const char *funcname, void **targetPtrPtr) {
return REDISMODULE_OK;
}
-void modulePostExecutionUnitOperations() {
+void modulePostExecutionUnitOperations(void) {
if (server.execution_nesting)
return;
@@ -1294,10 +1303,9 @@ RedisModuleCommand *moduleCreateCommandProxy(struct RedisModule *module, sds dec
cp->rediscmd->proc = RedisModuleCommandDispatcher;
cp->rediscmd->flags = flags | CMD_MODULE;
cp->rediscmd->module_cmd = cp;
- cp->rediscmd->key_specs_max = STATIC_KEY_SPECS_NUM;
- cp->rediscmd->key_specs = cp->rediscmd->key_specs_static;
if (firstkey != 0) {
cp->rediscmd->key_specs_num = 1;
+ cp->rediscmd->key_specs = zcalloc(sizeof(keySpec));
cp->rediscmd->key_specs[0].flags = CMD_KEY_FULL_ACCESS;
if (flags & CMD_MODULE_GETKEYS)
cp->rediscmd->key_specs[0].flags |= CMD_KEY_VARIABLE_FLAGS;
@@ -1309,6 +1317,7 @@ RedisModuleCommand *moduleCreateCommandProxy(struct RedisModule *module, sds dec
cp->rediscmd->key_specs[0].fk.range.limit = 0;
} else {
cp->rediscmd->key_specs_num = 0;
+ cp->rediscmd->key_specs = NULL;
}
populateCommandLegacyRangeSpec(cp->rediscmd);
cp->rediscmd->microseconds = 0;
@@ -1424,6 +1433,21 @@ moduleCmdArgAt(const RedisModuleCommandInfoVersion *version,
return (RedisModuleCommandArg *)((char *)(args) + offset);
}
+/* Recursively populate the args structure (setting num_args to the number of
+ * subargs) and return the number of args. */
+int populateArgsStructure(struct redisCommandArg *args) {
+ if (!args)
+ return 0;
+ int count = 0;
+ while (args->name) {
+ serverAssert(count < INT_MAX);
+ args->num_args = populateArgsStructure(args->subargs);
+ count++;
+ args++;
+ }
+ return count;
+}
+
/* Helper for categoryFlagsFromString(). Attempts to find an acl flag representing the provided flag string
* and adds that flag to acl_categories_flags if a match is found.
*
@@ -1796,7 +1820,7 @@ int RM_SetCommandInfo(RedisModuleCommand *command, const RedisModuleCommandInfo
cmd->tips || cmd->args ||
!(cmd->key_specs_num == 0 ||
/* Allow key spec populated from legacy (first,last,step) to exist. */
- (cmd->key_specs_num == 1 && cmd->key_specs == cmd->key_specs_static &&
+ (cmd->key_specs_num == 1 &&
cmd->key_specs[0].begin_search_type == KSPEC_BS_INDEX &&
cmd->key_specs[0].find_keys_type == KSPEC_FK_RANGE))) {
errno = EEXIST;
@@ -1847,13 +1871,8 @@ int RM_SetCommandInfo(RedisModuleCommand *command, const RedisModuleCommandInfo
while (moduleCmdKeySpecAt(version, info->key_specs, count)->begin_search_type)
count++;
serverAssert(count < INT_MAX);
- if (count <= STATIC_KEY_SPECS_NUM) {
- cmd->key_specs_max = STATIC_KEY_SPECS_NUM;
- cmd->key_specs = cmd->key_specs_static;
- } else {
- cmd->key_specs_max = count;
- cmd->key_specs = zmalloc(sizeof(keySpec) * count);
- }
+ zfree(cmd->key_specs);
+ cmd->key_specs = zmalloc(sizeof(keySpec) * count);
/* Copy the contents of the RedisModuleCommandKeySpec array. */
cmd->key_specs_num = count;
@@ -2257,7 +2276,7 @@ uint64_t RM_MonotonicMicroseconds(void) {
}
/* Return the current UNIX time in microseconds */
-ustime_t RM_Microseconds() {
+ustime_t RM_Microseconds(void) {
return ustime();
}
@@ -2267,7 +2286,7 @@ ustime_t RM_Microseconds() {
* key space notification, causing a module to execute a RedisModule_Call,
* causing another notification, etc.
* It makes sense that all this callbacks would use the same clock. */
-ustime_t RM_CachedMicroseconds() {
+ustime_t RM_CachedMicroseconds(void) {
return server.ustime;
}
@@ -2978,6 +2997,32 @@ int RM_ReplyWithError(RedisModuleCtx *ctx, const char *err) {
return REDISMODULE_OK;
}
+/* Reply with the error create from a printf format and arguments.
+ *
+ * If the error code is already passed in the string 'fmt', the error
+ * code provided is used, otherwise the string "-ERR " for the generic
+ * error code is automatically added.
+ *
+ * The usage is, for example:
+ *
+ * RedisModule_ReplyWithErrorFormat(ctx, "An error: %s", "foo");
+ *
+ * RedisModule_ReplyWithErrorFormat(ctx, "-WRONGTYPE Wrong Type: %s", "foo");
+ *
+ * The function always returns REDISMODULE_OK.
+ */
+int RM_ReplyWithErrorFormat(RedisModuleCtx *ctx, const char *fmt, ...) {
+ client *c = moduleGetReplyClient(ctx);
+ if (c == NULL) return REDISMODULE_OK;
+
+ va_list ap;
+ va_start(ap, fmt);
+ addReplyErrorFormatInternal(c, 0, fmt, ap);
+ va_end(ap);
+
+ return REDISMODULE_OK;
+}
+
/* Reply with a simple string (`+... \r\n` in RESP protocol). This replies
* are suitable only when sending a small non-binary string with small
* overhead, like "OK" or similar replies.
@@ -3868,7 +3913,7 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) {
* garbage collection tasks, or that do writes and replicate such writes
* periodically in timer callbacks or other periodic callbacks.
*/
-int RM_AvoidReplicaTraffic() {
+int RM_AvoidReplicaTraffic(void) {
return !!(isPausedActionsWithUpdate(PAUSE_ACTION_REPLICA));
}
@@ -3978,7 +4023,7 @@ RedisModuleKey *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
* // REDISMODULE_OPEN_KEY_NOTOUCH is not supported
* }
*/
-int RM_GetOpenKeyModesAll() {
+int RM_GetOpenKeyModesAll(void) {
return _REDISMODULE_OPEN_KEY_ALL;
}
@@ -4650,6 +4695,7 @@ int RM_ZsetAdd(RedisModuleKey *key, double score, RedisModuleString *ele, int *f
if (flagsptr) in_flags = moduleZsetAddFlagsToCoreFlags(*flagsptr);
if (zsetAdd(key->value,score,ele->ptr,in_flags,&out_flags,NULL) == 0) {
if (flagsptr) *flagsptr = 0;
+ moduleDelKeyIfEmpty(key);
return REDISMODULE_ERR;
}
if (flagsptr) *flagsptr = moduleZsetAddFlagsFromCoreFlags(out_flags);
@@ -4678,6 +4724,7 @@ int RM_ZsetIncrby(RedisModuleKey *key, double score, RedisModuleString *ele, int
in_flags |= ZADD_IN_INCR;
if (zsetAdd(key->value,score,ele->ptr,in_flags,&out_flags,newscore) == 0) {
if (flagsptr) *flagsptr = 0;
+ moduleDelKeyIfEmpty(key);
return REDISMODULE_ERR;
}
if (flagsptr) *flagsptr = moduleZsetAddFlagsFromCoreFlags(out_flags);
@@ -5360,6 +5407,7 @@ int RM_StreamAdd(RedisModuleKey *key, int flags, RedisModuleStreamID *id, RedisM
/* Either the ID not greater than all existing IDs in the stream, or
* the elements are too large to be stored. either way, errno is already
* set by streamAppendItem. */
+ if (created) moduleDelKeyIfEmpty(key);
return REDISMODULE_ERR;
}
/* Postponed signalKeyAsReady(). Done implicitly by moduleCreateEmptyKey()
@@ -6905,7 +6953,7 @@ void moduleRDBLoadError(RedisModuleIO *io) {
/* Returns 0 if there's at least one registered data type that did not declare
* REDISMODULE_OPTIONS_HANDLE_IO_ERRORS, in which case diskless loading should
* be avoided since it could cause data loss. */
-int moduleAllDatatypesHandleErrors() {
+int moduleAllDatatypesHandleErrors(void) {
dictIterator *di = dictGetIterator(modules);
dictEntry *de;
@@ -6925,7 +6973,7 @@ int moduleAllDatatypesHandleErrors() {
/* Returns 0 if module did not declare REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD, in which case
* diskless async loading should be avoided because module doesn't know there can be traffic during
* database full resynchronization. */
-int moduleAllModulesHandleReplAsyncLoad() {
+int moduleAllModulesHandleReplAsyncLoad(void) {
dictIterator *di = dictGetIterator(modules);
dictEntry *de;
@@ -8373,7 +8421,7 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
zfree(ctx);
}
-void moduleGILAfterLock() {
+void moduleGILAfterLock(void) {
/* We should never get here if we already inside a module
* code block which already opened a context. */
serverAssert(server.execution_nesting == 0);
@@ -8409,7 +8457,7 @@ int RM_ThreadSafeContextTryLock(RedisModuleCtx *ctx) {
return REDISMODULE_OK;
}
-void moduleGILBeforeUnlock() {
+void moduleGILBeforeUnlock(void) {
/* We should never get here if we already inside a module
* code block which already opened a context, except
* the bump-up from moduleGILAcquired. */
@@ -8509,7 +8557,7 @@ void moduleReleaseGIL(void) {
* that the notification code will be executed in the middle on Redis logic
* (commands logic, eviction, expire). Changing the key space while the logic
* runs is dangerous and discouraged. In order to react to key space events with
- * write actions, please refer to `RM_AddPostExecutionUnitJob`.
+ * write actions, please refer to `RM_AddPostNotificationJob`.
*
* See https://redis.io/topics/notifications for more information.
*/
@@ -8524,7 +8572,7 @@ int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNoti
return REDISMODULE_OK;
}
-void firePostExecutionUnitJobs() {
+void firePostExecutionUnitJobs(void) {
/* Avoid propagation of commands.
* In that way, postExecutionUnitOperations will prevent
* recursive calls to firePostExecutionUnitJobs.
@@ -8577,7 +8625,7 @@ int RM_AddPostNotificationJob(RedisModuleCtx *ctx, RedisModulePostNotificationJo
/* Get the configured bitmap of notify-keyspace-events (Could be used
* for additional filtering in RedisModuleNotificationFunc) */
-int RM_GetNotifyKeyspaceEvents() {
+int RM_GetNotifyKeyspaceEvents(void) {
return server.notify_keyspace_events;
}
@@ -9292,7 +9340,7 @@ int RM_EventLoopAddOneShot(RedisModuleEventLoopOneShotFunc func, void *user_data
/* This function will check the moduleEventLoopOneShots queue in order to
* call the callback for the registered oneshot events. */
-static void eventLoopHandleOneShotEvents() {
+static void eventLoopHandleOneShotEvents(void) {
pthread_mutex_lock(&moduleEventLoopMutex);
if (moduleEventLoopOneShots) {
while (listLength(moduleEventLoopOneShots)) {
@@ -10393,7 +10441,7 @@ int RM_ExportSharedAPI(RedisModuleCtx *ctx, const char *apiname, void *func) {
*
* Here is an example:
*
- * int ... myCommandImplementation() {
+ * int ... myCommandImplementation(void) {
* if (getExternalAPIs() == 0) {
* reply with an error here if we cannot have the APIs
* }
@@ -10693,6 +10741,9 @@ size_t RM_MallocSize(void* ptr) {
/* Similar to RM_MallocSize, the difference is that RM_MallocUsableSize
* returns the usable size of memory by the module. */
size_t RM_MallocUsableSize(void *ptr) {
+ /* It is safe to use 'zmalloc_usable_size()' to manipulate additional
+ * memory space, as we guarantee that the compiler can recognize this
+ * after 'RM_Alloc', 'RM_TryAlloc', 'RM_Realloc', or 'RM_Calloc'. */
return zmalloc_usable_size(ptr);
}
@@ -10723,7 +10774,7 @@ size_t RM_MallocSizeDict(RedisModuleDict* dict) {
* * Exactly 1 - Memory limit reached.
* * Greater 1 - More memory used than the configured limit.
*/
-float RM_GetUsedMemoryRatio(){
+float RM_GetUsedMemoryRatio(void){
float level;
getMaxmemoryState(NULL, NULL, NULL, &level);
return level;
@@ -10762,7 +10813,7 @@ static void moduleScanCallback(void *privdata, const dictEntry *de) {
}
/* Create a new cursor to be used with RedisModule_Scan */
-RedisModuleScanCursor *RM_ScanCursorCreate() {
+RedisModuleScanCursor *RM_ScanCursorCreate(void) {
RedisModuleScanCursor* cursor = zmalloc(sizeof(*cursor));
cursor->cursor = 0;
cursor->done = 0;
@@ -11645,7 +11696,7 @@ void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid, int flags) {
} else if (flags & DB_FLAG_KEY_OVERWRITE) {
subevent = REDISMODULE_SUBEVENT_KEY_OVERWRITTEN;
}
- KeyInfo info = {dbid, key, val, REDISMODULE_WRITE};
+ KeyInfo info = {dbid, key, val, REDISMODULE_READ};
moduleFireServerEvent(REDISMODULE_EVENT_KEY, subevent, &info);
if (val->type == OBJ_MODULE) {
@@ -11925,8 +11976,7 @@ int moduleFreeCommand(struct RedisModule *module, struct redisCommand *cmd) {
if (cmd->key_specs[j].begin_search_type == KSPEC_BS_KEYWORD)
zfree((char *)cmd->key_specs[j].bs.keyword.keyword);
}
- if (cmd->key_specs != cmd->key_specs_static)
- zfree(cmd->key_specs);
+ zfree(cmd->key_specs);
for (int j = 0; cmd->tips && cmd->tips[j]; j++)
zfree((char *)cmd->tips[j]);
zfree(cmd->tips);
@@ -12741,6 +12791,137 @@ int RM_LoadConfigs(RedisModuleCtx *ctx) {
return REDISMODULE_OK;
}
+/* --------------------------------------------------------------------------
+ * ## RDB load/save API
+ * -------------------------------------------------------------------------- */
+
+#define REDISMODULE_RDB_STREAM_FILE 1
+
+typedef struct RedisModuleRdbStream {
+ int type;
+
+ union {
+ char *filename;
+ } data;
+} RedisModuleRdbStream;
+
+/* Create a stream object to save/load RDB to/from a file.
+ *
+ * This function returns a pointer to RedisModuleRdbStream which is owned
+ * by the caller. It requires a call to RM_RdbStreamFree() to free
+ * the object. */
+RedisModuleRdbStream *RM_RdbStreamCreateFromFile(const char *filename) {
+ RedisModuleRdbStream *stream = zmalloc(sizeof(*stream));
+ stream->type = REDISMODULE_RDB_STREAM_FILE;
+ stream->data.filename = zstrdup(filename);
+ return stream;
+}
+
+/* Release an RDB stream object. */
+void RM_RdbStreamFree(RedisModuleRdbStream *stream) {
+ switch (stream->type) {
+ case REDISMODULE_RDB_STREAM_FILE:
+ zfree(stream->data.filename);
+ break;
+ default:
+ serverAssert(0);
+ break;
+ }
+ zfree(stream);
+}
+
+/* Load RDB file from the `stream`. Dataset will be cleared first and then RDB
+ * file will be loaded.
+ *
+ * `flags` must be zero. This parameter is for future use.
+ *
+ * On success REDISMODULE_OK is returned, otherwise REDISMODULE_ERR is returned
+ * and errno is set accordingly.
+ *
+ * Example:
+ *
+ * RedisModuleRdbStream *s = RedisModule_RdbStreamCreateFromFile("exp.rdb");
+ * RedisModule_RdbLoad(ctx, s, 0);
+ * RedisModule_RdbStreamFree(s);
+ */
+int RM_RdbLoad(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) {
+ UNUSED(ctx);
+
+ if (!stream || flags != 0) {
+ errno = EINVAL;
+ return REDISMODULE_ERR;
+ }
+
+ /* Not allowed on replicas. */
+ if (server.masterhost != NULL) {
+ errno = ENOTSUP;
+ return REDISMODULE_ERR;
+ }
+
+ /* Drop replicas if exist. */
+ disconnectSlaves();
+ freeReplicationBacklog();
+
+ if (server.aof_state != AOF_OFF) stopAppendOnly();
+
+ /* Kill existing RDB fork as it is saving outdated data. Also killing it
+ * will prevent COW memory issue. */
+ if (server.child_type == CHILD_TYPE_RDB) killRDBChild();
+
+ emptyData(-1,EMPTYDB_NO_FLAGS,NULL);
+
+ /* rdbLoad() can go back to the networking and process network events. If
+ * RM_RdbLoad() is called inside a command callback, we don't want to
+ * process the current client. Otherwise, we may free the client or try to
+ * process next message while we are already in the command callback. */
+ if (server.current_client) protectClient(server.current_client);
+
+ serverAssert(stream->type == REDISMODULE_RDB_STREAM_FILE);
+ int ret = rdbLoad(stream->data.filename,NULL,RDBFLAGS_NONE);
+
+ if (server.current_client) unprotectClient(server.current_client);
+ if (server.aof_state != AOF_OFF) startAppendOnly();
+
+ if (ret != RDB_OK) {
+ errno = (ret == RDB_NOT_EXIST) ? ENOENT : EIO;
+ return REDISMODULE_ERR;
+ }
+
+ errno = 0;
+ return REDISMODULE_OK;
+}
+
+/* Save dataset to the RDB stream.
+ *
+ * `flags` must be zero. This parameter is for future use.
+ *
+ * On success REDISMODULE_OK is returned, otherwise REDISMODULE_ERR is returned
+ * and errno is set accordingly.
+ *
+ * Example:
+ *
+ * RedisModuleRdbStream *s = RedisModule_RdbStreamCreateFromFile("exp.rdb");
+ * RedisModule_RdbSave(ctx, s, 0);
+ * RedisModule_RdbStreamFree(s);
+ */
+int RM_RdbSave(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) {
+ UNUSED(ctx);
+
+ if (!stream || flags != 0) {
+ errno = EINVAL;
+ return REDISMODULE_ERR;
+ }
+
+ serverAssert(stream->type == REDISMODULE_RDB_STREAM_FILE);
+
+ if (rdbSaveToFile(stream->data.filename) != C_OK) {
+ return REDISMODULE_ERR;
+ }
+
+ errno = 0;
+ return REDISMODULE_OK;
+}
+
/* Redis MODULE command.
*
* MODULE LIST
@@ -12890,7 +13071,7 @@ int RM_GetLFU(RedisModuleKey *key, long long *lfu_freq) {
* // REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS is not supported
* }
*/
-int RM_GetModuleOptionsAll() {
+int RM_GetModuleOptionsAll(void) {
return _REDISMODULE_OPTIONS_FLAGS_NEXT - 1;
}
@@ -12907,7 +13088,7 @@ int RM_GetModuleOptionsAll() {
* // REDISMODULE_CTX_FLAGS_MULTI is not supported
* }
*/
-int RM_GetContextFlagsAll() {
+int RM_GetContextFlagsAll(void) {
return _REDISMODULE_CTX_FLAGS_NEXT - 1;
}
@@ -12924,7 +13105,7 @@ int RM_GetContextFlagsAll() {
* // REDISMODULE_NOTIFY_LOADED is not supported
* }
*/
-int RM_GetKeyspaceNotificationFlagsAll() {
+int RM_GetKeyspaceNotificationFlagsAll(void) {
return _REDISMODULE_NOTIFY_NEXT - 1;
}
@@ -12932,7 +13113,7 @@ int RM_GetKeyspaceNotificationFlagsAll() {
* Return the redis version in format of 0x00MMmmpp.
* Example for 6.0.7 the return value will be 0x00060007.
*/
-int RM_GetServerVersion() {
+int RM_GetServerVersion(void) {
return REDIS_VERSION_NUM;
}
@@ -12941,7 +13122,7 @@ int RM_GetServerVersion() {
* You can use that when calling RM_CreateDataType to know which fields of
* RedisModuleTypeMethods are gonna be supported and which will be ignored.
*/
-int RM_GetTypeMethodVersion() {
+int RM_GetTypeMethodVersion(void) {
return REDISMODULE_TYPE_METHOD_VERSION;
}
@@ -13287,6 +13468,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(WrongArity);
REGISTER_API(ReplyWithLongLong);
REGISTER_API(ReplyWithError);
+ REGISTER_API(ReplyWithErrorFormat);
REGISTER_API(ReplyWithSimpleString);
REGISTER_API(ReplyWithArray);
REGISTER_API(ReplyWithMap);
@@ -13617,4 +13799,8 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(RegisterEnumConfig);
REGISTER_API(LoadConfigs);
REGISTER_API(RegisterAuthCallback);
+ REGISTER_API(RdbStreamCreateFromFile);
+ REGISTER_API(RdbStreamFree);
+ REGISTER_API(RdbLoad);
+ REGISTER_API(RdbSave);
}