summaryrefslogtreecommitdiff
path: root/src/module.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/module.c')
-rw-r--r--src/module.c1582
1 files changed, 1496 insertions, 86 deletions
diff --git a/src/module.c b/src/module.c
index ab614c529..3a161c05f 100644
--- a/src/module.c
+++ b/src/module.c
@@ -31,9 +31,7 @@
#include "cluster.h"
#include "rdb.h"
#include <dlfcn.h>
-
-#define REDISMODULE_CORE 1
-#include "redismodule.h"
+#include <sys/wait.h>
/* --------------------------------------------------------------------------
* Private data structures used by the modules system. Those are data
@@ -41,6 +39,17 @@
* pointers that have an API the module can call with them)
* -------------------------------------------------------------------------- */
+typedef struct RedisModuleInfoCtx {
+ struct RedisModule *module;
+ sds requested_section;
+ sds info; /* info string we collected so far */
+ int sections; /* number of sections we collected so far */
+ int in_section; /* indication if we're in an active section or not */
+ int in_dict_field; /* indication that we're curreintly appending to a dict */
+} RedisModuleInfoCtx;
+
+typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_report);
+
/* This structure represents a module inside the system. */
struct RedisModule {
void *handle; /* Module dlopen() handle. */
@@ -52,6 +61,10 @@ struct RedisModule {
list *using; /* List of modules we use some APIs of. */
list *filters; /* List of filters the module has registered. */
int in_call; /* RM_Call() nesting level */
+ int in_hook; /* Hooks callback nesting level for this module (0 or 1). */
+ int options; /* Module options and capabilities. */
+ int blocked_clients; /* Count of RedisModuleBlockedClient in this module. */
+ RedisModuleInfoFunc info_cb; /* Callback for module to add INFO fields. */
};
typedef struct RedisModule RedisModule;
@@ -127,16 +140,23 @@ struct RedisModuleCtx {
void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */
int postponed_arrays_count; /* Number of entries in postponed_arrays. */
void *blocked_privdata; /* Privdata set when unblocking a client. */
+ RedisModuleString *blocked_ready_key; /* Key ready when the reply callback
+ gets called for clients blocked
+ on keys. */
/* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */
int *keys_pos;
int keys_count;
struct RedisModulePoolAllocBlock *pa_head;
+ redisOpArray saved_oparray; /* When propagating commands in a callback
+ we reallocate the "also propagate" op
+ array. Here we save the old one to
+ restore it later. */
};
typedef struct RedisModuleCtx RedisModuleCtx;
-#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL}
+#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, NULL, 0, NULL, {0}}
#define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
#define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
#define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2)
@@ -144,6 +164,7 @@ typedef struct RedisModuleCtx RedisModuleCtx;
#define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<4)
#define REDISMODULE_CTX_THREAD_SAFE (1<<5)
#define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<6)
+#define REDISMODULE_CTX_MODULE_COMMAND_CALL (1<<7)
/* This represents a Redis key opened with RM_OpenKey(). */
struct RedisModuleKey {
@@ -227,6 +248,8 @@ typedef struct RedisModuleBlockedClient {
client *reply_client; /* Fake client used to accumulate replies
in thread safe contexts. */
int dbid; /* Database number selected by the original client. */
+ int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */
+ int unblocked; /* Already on the moduleUnblocked list. */
} RedisModuleBlockedClient;
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
@@ -292,6 +315,40 @@ typedef struct RedisModuleCommandFilter {
/* Registered filters */
static list *moduleCommandFilters;
+typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data);
+
+static struct RedisModuleForkInfo {
+ RedisModuleForkDoneHandler done_handler;
+ void* done_handler_user_data;
+} moduleForkInfo = {0};
+
+/* Flags for moduleCreateArgvFromUserFormat(). */
+#define REDISMODULE_ARGV_REPLICATE (1<<0)
+#define REDISMODULE_ARGV_NO_AOF (1<<1)
+#define REDISMODULE_ARGV_NO_REPLICAS (1<<2)
+
+/* Determine whether Redis should signalModifiedKey implicitly.
+ * In case 'ctx' has no 'module' member (and therefore no module->options),
+ * we assume default behavior, that is, Redis signals.
+ * (see RM_GetThreadSafeContext) */
+#define SHOULD_SIGNAL_MODIFIED_KEYS(ctx) \
+ ctx->module? !(ctx->module->options & REDISMODULE_OPTION_NO_IMPLICIT_SIGNAL_MODIFIED) : 1
+
+/* Server events hooks data structures and defines: this modules API
+ * allow modules to subscribe to certain events in Redis, such as
+ * the start and end of an RDB or AOF save, the change of role in replication,
+ * and similar other events. */
+
+typedef struct RedisModuleEventListener {
+ RedisModule *module;
+ RedisModuleEvent event;
+ RedisModuleEventCallback callback;
+} RedisModuleEventListener;
+
+list *RedisModule_EventListeners; /* Global list of all the active events. */
+unsigned long long ModulesInHooks = 0; /* Total number of modules in hooks
+ callbacks right now. */
+
/* --------------------------------------------------------------------------
* Prototypes
* -------------------------------------------------------------------------- */
@@ -497,8 +554,47 @@ int RM_GetApi(const char *funcname, void **targetPtrPtr) {
return REDISMODULE_OK;
}
+/* Helper function for when a command callback is called, in order to handle
+ * details needed to correctly replicate commands. */
+void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
+ client *c = ctx->client;
+
+ /* We don't need to do anything here if the context was never used
+ * in order to propagate commands. */
+ if (!(ctx->flags & REDISMODULE_CTX_MULTI_EMITTED)) return;
+
+ if (c->flags & CLIENT_LUA) return;
+
+ /* Handle the replication of the final EXEC, since whatever a command
+ * emits is always wrapped around MULTI/EXEC. */
+ robj *propargv[1];
+ propargv[0] = createStringObject("EXEC",4);
+ alsoPropagate(server.execCommand,c->db->id,propargv,1,
+ PROPAGATE_AOF|PROPAGATE_REPL);
+ decrRefCount(propargv[0]);
+
+ /* If this is not a module command context (but is instead a simple
+ * callback context), we have to handle directly the "also propagate"
+ * array and emit it. In a module command call this will be handled
+ * directly by call(). */
+ if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL) &&
+ server.also_propagate.numops)
+ {
+ for (int j = 0; j < server.also_propagate.numops; j++) {
+ redisOp *rop = &server.also_propagate.ops[j];
+ int target = rop->target;
+ if (target)
+ propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
+ }
+ redisOpArrayFree(&server.also_propagate);
+ /* Restore the previous oparray in case of nexted use of the API. */
+ server.also_propagate = ctx->saved_oparray;
+ }
+}
+
/* Free the context after the user function was called. */
void moduleFreeContext(RedisModuleCtx *ctx) {
+ moduleHandlePropagationAfterCommandCallback(ctx);
autoMemoryCollect(ctx);
poolAllocRelease(ctx);
if (ctx->postponed_arrays) {
@@ -514,34 +610,16 @@ void moduleFreeContext(RedisModuleCtx *ctx) {
if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) freeClient(ctx->client);
}
-/* Helper function for when a command callback is called, in order to handle
- * details needed to correctly replicate commands. */
-void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
- client *c = ctx->client;
-
- if (c->flags & CLIENT_LUA) return;
-
- /* Handle the replication of the final EXEC, since whatever a command
- * emits is always wrapped around MULTI/EXEC. */
- if (ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) {
- robj *propargv[1];
- propargv[0] = createStringObject("EXEC",4);
- alsoPropagate(server.execCommand,c->db->id,propargv,1,
- PROPAGATE_AOF|PROPAGATE_REPL);
- decrRefCount(propargv[0]);
- }
-}
-
/* This Redis command binds the normal Redis command invocation with commands
* exported by modules. */
void RedisModuleCommandDispatcher(client *c) {
RedisModuleCommandProxy *cp = (void*)(unsigned long)c->cmd->getkeys_proc;
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+ ctx.flags |= REDISMODULE_CTX_MODULE_COMMAND_CALL;
ctx.module = cp->module;
ctx.client = c;
cp->func(&ctx,(void**)c->argv,c->argc);
- moduleHandlePropagationAfterCommandCallback(&ctx);
moduleFreeContext(&ctx);
/* In some cases processMultibulkBuffer uses sdsMakeRoomFor to
@@ -755,6 +833,8 @@ void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int api
module->using = listCreate();
module->filters = listCreate();
module->in_call = 0;
+ module->in_hook = 0;
+ module->options = 0;
ctx->module = module;
}
@@ -772,6 +852,25 @@ long long RM_Milliseconds(void) {
return mstime();
}
+/* Set flags defining capabilities or behavior bit flags.
+ *
+ * REDISMODULE_OPTIONS_HANDLE_IO_ERRORS:
+ * Generally, modules don't need to bother with this, as the process will just
+ * terminate if a read error happens, however, setting this flag would allow
+ * repl-diskless-load to work if enabled.
+ * The module should use RedisModule_IsIOError after reads, before using the
+ * data that was read, and in case of error, propagate it upwards, and also be
+ * able to release the partially populated value and all it's allocations. */
+void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) {
+ ctx->module->options = options;
+}
+
+/* Signals that the key is modified from user's perspective (i.e. invalidate WATCH). */
+int RM_SignalModifiedKey(RedisModuleCtx *ctx, RedisModuleString *keyname) {
+ signalModifiedKey(ctx->client->db,keyname);
+ return REDISMODULE_OK;
+}
+
/* --------------------------------------------------------------------------
* Automatic memory management for modules
* -------------------------------------------------------------------------- */
@@ -1126,10 +1225,9 @@ int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) {
int replyWithStatus(RedisModuleCtx *ctx, const char *msg, char *prefix) {
client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK;
- sds strmsg = sdsnewlen(prefix,1);
- strmsg = sdscat(strmsg,msg);
- strmsg = sdscatlen(strmsg,"\r\n",2);
- addReplySds(c,strmsg);
+ addReplyProto(c,prefix,strlen(prefix));
+ addReplyProto(c,msg,strlen(msg));
+ addReplyProto(c,"\r\n",2);
return REDISMODULE_OK;
}
@@ -1186,6 +1284,27 @@ int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) {
return REDISMODULE_OK;
}
+/* Reply to the client with a null array, simply null in RESP3
+ * null array in RESP2.
+ *
+ * The function always returns REDISMODULE_OK. */
+int RM_ReplyWithNullArray(RedisModuleCtx *ctx) {
+ client *c = moduleGetReplyClient(ctx);
+ if (c == NULL) return REDISMODULE_OK;
+ addReplyNullArray(c);
+ return REDISMODULE_OK;
+}
+
+/* Reply to the client with an empty array.
+ *
+ * The function always returns REDISMODULE_OK. */
+int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) {
+ client *c = moduleGetReplyClient(ctx);
+ if (c == NULL) return REDISMODULE_OK;
+ addReply(c,shared.emptyarray);
+ return REDISMODULE_OK;
+}
+
/* When RedisModule_ReplyWithArray() is used with the argument
* REDISMODULE_POSTPONED_ARRAY_LEN, because we don't know beforehand the number
* of items we are going to output as elements of the array, this function
@@ -1264,6 +1383,27 @@ int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) {
return REDISMODULE_OK;
}
+/* Reply with an empty string.
+ *
+ * The function always returns REDISMODULE_OK. */
+int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) {
+ client *c = moduleGetReplyClient(ctx);
+ if (c == NULL) return REDISMODULE_OK;
+ addReplyBulkCBuffer(c, "", 0);
+ return REDISMODULE_OK;
+}
+
+/* Reply with a binary safe string, which should not be escaped or filtered
+ * taking in input a C buffer pointer and length.
+ *
+ * The function always returns REDISMODULE_OK. */
+int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len) {
+ client *c = moduleGetReplyClient(ctx);
+ if (c == NULL) return REDISMODULE_OK;
+ addReplyVerbatim(c, buf, len, "txt");
+ return REDISMODULE_OK;
+}
+
/* Reply to the client with a NULL. In the RESP protocol a NULL is encoded
* as the string "$-1\r\n".
*
@@ -1316,9 +1456,16 @@ void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) {
/* If we already emitted MULTI return ASAP. */
if (ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) return;
/* If this is a thread safe context, we do not want to wrap commands
- * executed into MUTLI/EXEC, they are executed as single commands
+ * executed into MULTI/EXEC, they are executed as single commands
* from an external client in essence. */
if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) return;
+ /* If this is a callback context, and not a module command execution
+ * context, we have to setup the op array for the "also propagate" API
+ * so that RM_Replicate() will work. */
+ if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL)) {
+ ctx->saved_oparray = server.also_propagate;
+ redisOpArrayInit(&server.also_propagate);
+ }
execCommandPropagateMulti(ctx->client);
ctx->flags |= REDISMODULE_CTX_MULTI_EMITTED;
}
@@ -1340,6 +1487,24 @@ void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) {
*
* Please refer to RedisModule_Call() for more information.
*
+ * Using the special "A" and "R" modifiers, the caller can exclude either
+ * the AOF or the replicas from the propagation of the specified command.
+ * Otherwise, by default, the command will be propagated in both channels.
+ *
+ * ## Note about calling this function from a thread safe context:
+ *
+ * Normally when you call this function from the callback implementing a
+ * module command, or any other callback provided by the Redis Module API,
+ * Redis will accumulate all the calls to this function in the context of
+ * the callback, and will propagate all the commands wrapped in a MULTI/EXEC
+ * transaction. However when calling this function from a threaded safe context
+ * that can live an undefined amount of time, and can be locked/unlocked in
+ * at will, the behavior is different: MULTI/EXEC wrapper is not emitted
+ * and the command specified is inserted in the AOF and replication stream
+ * immediately.
+ *
+ * ## Return value
+ *
* The command returns REDISMODULE_ERR if the format specifiers are invalid
* or the command name does not belong to a known command. */
int RM_Replicate(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) {
@@ -1357,10 +1522,23 @@ int RM_Replicate(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...)
va_end(ap);
if (argv == NULL) return REDISMODULE_ERR;
- /* Replicate! */
- moduleReplicateMultiIfNeeded(ctx);
- alsoPropagate(cmd,ctx->client->db->id,argv,argc,
- PROPAGATE_AOF|PROPAGATE_REPL);
+ /* Select the propagation target. Usually is AOF + replicas, however
+ * the caller can exclude one or the other using the "A" or "R"
+ * modifiers. */
+ int target = 0;
+ if (!(flags & REDISMODULE_ARGV_NO_AOF)) target |= PROPAGATE_AOF;
+ if (!(flags & REDISMODULE_ARGV_NO_REPLICAS)) target |= PROPAGATE_REPL;
+
+ /* Replicate! When we are in a threaded context, we want to just insert
+ * the replicated command ASAP, since it is not clear when the context
+ * will stop being used, so accumulating stuff does not make much sense,
+ * nor we could easily use the alsoPropagate() API from threads. */
+ if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) {
+ propagate(cmd,ctx->client->db->id,argv,argc,target);
+ } else {
+ moduleReplicateMultiIfNeeded(ctx);
+ alsoPropagate(cmd,ctx->client->db->id,argv,argc,target);
+ }
/* Release the argv. */
for (j = 0; j < argc; j++) decrRefCount(argv[j]);
@@ -1402,12 +1580,133 @@ int RM_ReplicateVerbatim(RedisModuleCtx *ctx) {
* are guaranteed to get IDs greater than any past ID previously seen.
*
* Valid IDs are from 1 to 2^64-1. If 0 is returned it means there is no way
- * to fetch the ID in the context the function was currently called. */
+ * to fetch the ID in the context the function was currently called.
+ *
+ * After obtaining the ID, it is possible to check if the command execution
+ * is actually happening in the context of AOF loading, using this macro:
+ *
+ * if (RedisModule_IsAOFClient(RedisModule_GetClientId(ctx)) {
+ * // Handle it differently.
+ * }
+ */
unsigned long long RM_GetClientId(RedisModuleCtx *ctx) {
if (ctx->client == NULL) return 0;
return ctx->client->id;
}
+/* This is an helper for RM_GetClientInfoById() and other functions: given
+ * a client, it populates the client info structure with the appropriate
+ * fields depending on the version provided. If the version is not valid
+ * then REDISMODULE_ERR is returned. Otherwise the function returns
+ * REDISMODULE_OK and the structure pointed by 'ci' gets populated. */
+
+int modulePopulateClientInfoStructure(void *ci, client *client, int structver) {
+ if (structver != 1) return REDISMODULE_ERR;
+
+ RedisModuleClientInfoV1 *ci1 = ci;
+ memset(ci1,0,sizeof(*ci1));
+ ci1->version = structver;
+ if (client->flags & CLIENT_MULTI)
+ ci1->flags |= REDISMODULE_CLIENTINFO_FLAG_MULTI;
+ if (client->flags & CLIENT_PUBSUB)
+ ci1->flags |= REDISMODULE_CLIENTINFO_FLAG_PUBSUB;
+ if (client->flags & CLIENT_UNIX_SOCKET)
+ ci1->flags |= REDISMODULE_CLIENTINFO_FLAG_UNIXSOCKET;
+ if (client->flags & CLIENT_TRACKING)
+ ci1->flags |= REDISMODULE_CLIENTINFO_FLAG_TRACKING;
+ if (client->flags & CLIENT_BLOCKED)
+ ci1->flags |= REDISMODULE_CLIENTINFO_FLAG_BLOCKED;
+
+ int port;
+ connPeerToString(client->conn,ci1->addr,sizeof(ci1->addr),&port);
+ ci1->port = port;
+ ci1->db = client->db->id;
+ ci1->id = client->id;
+ return REDISMODULE_OK;
+}
+
+/* This is an helper for moduleFireServerEvent() and other functions:
+ * It populates the replication info structure with the appropriate
+ * fields depending on the version provided. If the version is not valid
+ * then REDISMODULE_ERR is returned. Otherwise the function returns
+ * REDISMODULE_OK and the structure pointed by 'ri' gets populated. */
+int modulePopulateReplicationInfoStructure(void *ri, int structver) {
+ if (structver != 1) return REDISMODULE_ERR;
+
+ RedisModuleReplicationInfoV1 *ri1 = ri;
+ memset(ri1,0,sizeof(*ri1));
+ ri1->version = structver;
+ ri1->master = server.masterhost==NULL;
+ ri1->masterhost = server.masterhost? server.masterhost: "";
+ ri1->masterport = server.masterport;
+ ri1->replid1 = server.replid;
+ ri1->replid2 = server.replid2;
+ ri1->repl1_offset = server.master_repl_offset;
+ ri1->repl2_offset = server.second_replid_offset;
+ return REDISMODULE_OK;
+}
+
+/* Return information about the client with the specified ID (that was
+ * previously obtained via the RedisModule_GetClientId() API). If the
+ * client exists, REDISMODULE_OK is returned, otherwise REDISMODULE_ERR
+ * is returned.
+ *
+ * When the client exist and the `ci` pointer is not NULL, but points to
+ * a structure of type RedisModuleClientInfo, previously initialized with
+ * the correct REDISMODULE_CLIENTINFO_INITIALIZER, the structure is populated
+ * with the following fields:
+ *
+ * uint64_t flags; // REDISMODULE_CLIENTINFO_FLAG_*
+ * uint64_t id; // Client ID
+ * char addr[46]; // IPv4 or IPv6 address.
+ * uint16_t port; // TCP port.
+ * uint16_t db; // Selected DB.
+ *
+ * Note: the client ID is useless in the context of this call, since we
+ * already know, however the same structure could be used in other
+ * contexts where we don't know the client ID, yet the same structure
+ * is returned.
+ *
+ * With flags having the following meaning:
+ *
+ * REDISMODULE_CLIENTINFO_FLAG_SSL Client using SSL connection.
+ * REDISMODULE_CLIENTINFO_FLAG_PUBSUB Client in Pub/Sub mode.
+ * REDISMODULE_CLIENTINFO_FLAG_BLOCKED Client blocked in command.
+ * REDISMODULE_CLIENTINFO_FLAG_TRACKING Client with keys tracking on.
+ * REDISMODULE_CLIENTINFO_FLAG_UNIXSOCKET Client using unix domain socket.
+ * REDISMODULE_CLIENTINFO_FLAG_MULTI Client in MULTI state.
+ *
+ * However passing NULL is a way to just check if the client exists in case
+ * we are not interested in any additional information.
+ *
+ * This is the correct usage when we want the client info structure
+ * returned:
+ *
+ * RedisModuleClientInfo ci = REDISMODULE_CLIENTINFO_INITIALIZER;
+ * int retval = RedisModule_GetClientInfoById(&ci,client_id);
+ * if (retval == REDISMODULE_OK) {
+ * printf("Address: %s\n", ci.addr);
+ * }
+ */
+int RM_GetClientInfoById(void *ci, uint64_t id) {
+ client *client = lookupClientByID(id);
+ if (client == NULL) return REDISMODULE_ERR;
+ if (ci == NULL) return REDISMODULE_OK;
+
+ /* Fill the info structure if passed. */
+ uint64_t structver = ((uint64_t*)ci)[0];
+ return modulePopulateClientInfoStructure(ci,client,structver);
+}
+
+/* Publish a message to subscribers (see PUBLISH command). */
+int RM_PublishMessage(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) {
+ UNUSED(ctx);
+ int receivers = pubsubPublishMessage(channel, message);
+ if (server.cluster_enabled)
+ clusterPropagatePublish(channel, message);
+ return receivers;
+}
+
/* Return the currently selected DB. */
int RM_GetSelectedDb(RedisModuleCtx *ctx) {
return ctx->client->db->id;
@@ -1449,6 +1748,21 @@ int RM_GetSelectedDb(RedisModuleCtx *ctx) {
*
* * REDISMODULE_CTX_FLAGS_OOM_WARNING: Less than 25% of memory remains before
* reaching the maxmemory level.
+ *
+ * * REDISMODULE_CTX_FLAGS_REPLICA_IS_STALE: No active link with the master.
+ *
+ * * REDISMODULE_CTX_FLAGS_REPLICA_IS_CONNECTING: The replica is trying to
+ * connect with the master.
+ *
+ * * REDISMODULE_CTX_FLAGS_REPLICA_IS_TRANSFERRING: Master -> Replica RDB
+ * transfer is in progress.
+ *
+ * * REDISMODULE_CTX_FLAGS_REPLICA_IS_ONLINE: The replica has an active link
+ * with its master. This is the
+ * contrary of STALE state.
+ *
+ * * REDISMODULE_CTX_FLAGS_ACTIVE_CHILD: There is currently some background
+ * process active (RDB, AUX or module).
*/
int RM_GetContextFlags(RedisModuleCtx *ctx) {
@@ -1491,6 +1805,20 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) {
flags |= REDISMODULE_CTX_FLAGS_SLAVE;
if (server.repl_slave_ro)
flags |= REDISMODULE_CTX_FLAGS_READONLY;
+
+ /* Replica state flags. */
+ if (server.repl_state == REPL_STATE_CONNECT ||
+ server.repl_state == REPL_STATE_CONNECTING)
+ {
+ flags |= REDISMODULE_CTX_FLAGS_REPLICA_IS_CONNECTING;
+ } else if (server.repl_state == REPL_STATE_TRANSFER) {
+ flags |= REDISMODULE_CTX_FLAGS_REPLICA_IS_TRANSFERRING;
+ } else if (server.repl_state == REPL_STATE_CONNECTED) {
+ flags |= REDISMODULE_CTX_FLAGS_REPLICA_IS_ONLINE;
+ }
+
+ if (server.repl_state != REPL_STATE_CONNECTED)
+ flags |= REDISMODULE_CTX_FLAGS_REPLICA_IS_STALE;
}
/* OOM flag. */
@@ -1499,6 +1827,9 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) {
if (retval == C_ERR) flags |= REDISMODULE_CTX_FLAGS_OOM;
if (level > 0.75) flags |= REDISMODULE_CTX_FLAGS_OOM_WARNING;
+ /* Presence of children processes. */
+ if (hasActiveChildProcess()) flags |= REDISMODULE_CTX_FLAGS_ACTIVE_CHILD;
+
return flags;
}
@@ -1534,11 +1865,12 @@ int RM_SelectDb(RedisModuleCtx *ctx, int newid) {
void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
RedisModuleKey *kp;
robj *value;
+ int flags = mode & REDISMODULE_OPEN_KEY_NOTOUCH? LOOKUP_NOTOUCH: 0;
if (mode & REDISMODULE_WRITE) {
- value = lookupKeyWrite(ctx->client->db,keyname);
+ value = lookupKeyWriteWithFlags(ctx->client->db,keyname, flags);
} else {
- value = lookupKeyRead(ctx->client->db,keyname);
+ value = lookupKeyReadWithFlags(ctx->client->db,keyname, flags);
if (value == NULL) {
return NULL;
}
@@ -1561,7 +1893,9 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
/* Close a key handle. */
void RM_CloseKey(RedisModuleKey *key) {
if (key == NULL) return;
- if (key->mode & REDISMODULE_WRITE) signalModifiedKey(key->db,key->key);
+ int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx);
+ if ((key->mode & REDISMODULE_WRITE) && signal)
+ signalModifiedKey(key->db,key->key);
/* TODO: if (key->iter) RM_KeyIteratorStop(kp); */
RM_ZsetRangeStop(key);
decrRefCount(key->key);
@@ -1661,6 +1995,28 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) {
return REDISMODULE_OK;
}
+/* Performs similar operation to FLUSHALL, and optionally start a new AOF file (if enabled)
+ * If restart_aof is true, you must make sure the command that triggered this call is not
+ * propagated to the AOF file.
+ * When async is set to true, db contents will be freed by a background thread. */
+void RM_ResetDataset(int restart_aof, int async) {
+ if (restart_aof && server.aof_state != AOF_OFF) stopAppendOnly();
+ flushAllDataAndResetRDB(async? EMPTYDB_ASYNC: EMPTYDB_NO_FLAGS);
+ if (server.aof_enabled && restart_aof) restartAOFAfterSYNC();
+}
+
+/* Returns the number of keys in the current db. */
+unsigned long long RM_DbSize(RedisModuleCtx *ctx) {
+ return dictSize(ctx->client->db->dict);
+}
+
+/* Returns a name of a random key, or NULL if current db is empty. */
+RedisModuleString *RM_RandomKey(RedisModuleCtx *ctx) {
+ robj *key = dbRandomKey(ctx->client->db);
+ autoMemoryAdd(ctx,REDISMODULE_AM_STRING,key);
+ return key;
+}
+
/* --------------------------------------------------------------------------
* Key API for String type
* -------------------------------------------------------------------------- */
@@ -2389,7 +2745,7 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) {
*
* REDISMODULE_HASH_EXISTS: instead of setting the value of the field
* expecting a RedisModuleString pointer to pointer, the function just
- * reports if the field esists or not and expects an integer pointer
+ * reports if the field exists or not and expects an integer pointer
* as the second element of each pair.
*
* Example of REDISMODULE_HASH_CFIELD:
@@ -2678,12 +3034,11 @@ RedisModuleString *RM_CreateStringFromCallReply(RedisModuleCallReply *reply) {
* to special modifiers in "fmt". For now only one exists:
*
* "!" -> REDISMODULE_ARGV_REPLICATE
+ * "A" -> REDISMODULE_ARGV_NO_AOF
+ * "R" -> REDISMODULE_ARGV_NO_REPLICAS
*
* On error (format specifier error) NULL is returned and nothing is
* allocated. On success the argument vector is returned. */
-
-#define REDISMODULE_ARGV_REPLICATE (1<<0)
-
robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int *argcp, int *flags, va_list ap) {
int argc = 0, argv_size, j;
robj **argv = NULL;
@@ -2712,7 +3067,7 @@ robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int
size_t len = va_arg(ap,size_t);
argv[argc++] = createStringObject(buf,len);
} else if (*p == 'l') {
- long ll = va_arg(ap,long long);
+ long long ll = va_arg(ap,long long);
argv[argc++] = createObject(OBJ_STRING,sdsfromlonglong(ll));
} else if (*p == 'v') {
/* A vector of strings */
@@ -2732,6 +3087,10 @@ robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int
}
} else if (*p == '!') {
if (flags) (*flags) |= REDISMODULE_ARGV_REPLICATE;
+ } else if (*p == 'A') {
+ if (flags) (*flags) |= REDISMODULE_ARGV_NO_AOF;
+ } else if (*p == 'R') {
+ if (flags) (*flags) |= REDISMODULE_ARGV_NO_REPLICAS;
} else {
goto fmterr;
}
@@ -2752,7 +3111,10 @@ fmterr:
* NULL is returned and errno is set to the following values:
*
* EINVAL: command non existing, wrong arity, wrong format specifier.
- * EPERM: operation in Cluster instance with key in non local slot. */
+ * EPERM: operation in Cluster instance with key in non local slot.
+ *
+ * This API is documented here: https://redis.io/topics/modules-intro
+ */
RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) {
struct redisCommand *cmd;
client *c = NULL;
@@ -2764,7 +3126,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
/* Create the client and dispatch the command. */
va_start(ap, fmt);
- c = createClient(-1);
+ c = createClient(NULL);
c->user = NULL; /* Root user. */
argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap);
replicate = flags & REDISMODULE_ARGV_REPLICATE;
@@ -2779,7 +3141,10 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
/* We handle the above format error only when the client is setup so that
* we can free it normally. */
- if (argv == NULL) goto cleanup;
+ if (argv == NULL) {
+ errno = EINVAL;
+ goto cleanup;
+ }
/* Call command filters */
moduleCallCommandFilters(c);
@@ -2823,8 +3188,10 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
/* Run the command */
int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;
if (replicate) {
- call_flags |= CMD_CALL_PROPAGATE_AOF;
- call_flags |= CMD_CALL_PROPAGATE_REPL;
+ if (!(flags & REDISMODULE_ARGV_NO_AOF))
+ call_flags |= CMD_CALL_PROPAGATE_AOF;
+ if (!(flags & REDISMODULE_ARGV_NO_REPLICAS))
+ call_flags |= CMD_CALL_PROPAGATE_REPL;
}
call(c,call_flags);
@@ -3150,9 +3517,14 @@ void *RM_ModuleTypeGetValue(RedisModuleKey *key) {
* RDB loading and saving functions
* -------------------------------------------------------------------------- */
-/* Called when there is a load error in the context of a module. This cannot
- * be recovered like for the built-in types. */
+/* Called when there is a load error in the context of a module. On some
+ * modules this cannot be recovered, but if the module declared capability
+ * to handle errors, we'll raise a flag rather than exiting. */
void moduleRDBLoadError(RedisModuleIO *io) {
+ if (io->type->module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS) {
+ io->error = 1;
+ return;
+ }
serverLog(LL_WARNING,
"Error loading data from RDB (short read or EOF). "
"Read performed by module '%s' about type '%s' "
@@ -3163,6 +3535,33 @@ void moduleRDBLoadError(RedisModuleIO *io) {
exit(1);
}
+/* Returns 0 if there's at least one registered data type that did not declare
+ * REDISMODULE_OPTIONS_HANDLE_IO_ERRORS, in which case diskless loading should
+ * be avoided since it could cause data loss. */
+int moduleAllDatatypesHandleErrors() {
+ dictIterator *di = dictGetIterator(modules);
+ dictEntry *de;
+
+ while ((de = dictNext(di)) != NULL) {
+ struct RedisModule *module = dictGetVal(de);
+ if (listLength(module->types) &&
+ !(module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS))
+ {
+ dictReleaseIterator(di);
+ return 0;
+ }
+ }
+ dictReleaseIterator(di);
+ return 1;
+}
+
+/* Returns true if any previous IO API failed.
+ * for Load* APIs the REDISMODULE_OPTIONS_HANDLE_IO_ERRORS flag must be set with
+ * RediModule_SetModuleOptions first. */
+int RM_IsIOError(RedisModuleIO *io) {
+ return io->error;
+}
+
/* Save an unsigned 64 bit value into the RDB file. This function should only
* be called in the context of the rdb_save method of modules implementing new
* data types. */
@@ -3186,6 +3585,7 @@ saveerr:
* be called in the context of the rdb_load method of modules implementing
* new data types. */
uint64_t RM_LoadUnsigned(RedisModuleIO *io) {
+ if (io->error) return 0;
if (io->ver == 2) {
uint64_t opcode = rdbLoadLen(io->rio,NULL);
if (opcode != RDB_MODULE_OPCODE_UINT) goto loaderr;
@@ -3197,7 +3597,7 @@ uint64_t RM_LoadUnsigned(RedisModuleIO *io) {
loaderr:
moduleRDBLoadError(io);
- return 0; /* Never reached. */
+ return 0;
}
/* Like RedisModule_SaveUnsigned() but for signed 64 bit values. */
@@ -3256,6 +3656,7 @@ saveerr:
/* Implements RM_LoadString() and RM_LoadStringBuffer() */
void *moduleLoadString(RedisModuleIO *io, int plain, size_t *lenptr) {
+ if (io->error) return NULL;
if (io->ver == 2) {
uint64_t opcode = rdbLoadLen(io->rio,NULL);
if (opcode != RDB_MODULE_OPCODE_STRING) goto loaderr;
@@ -3267,7 +3668,7 @@ void *moduleLoadString(RedisModuleIO *io, int plain, size_t *lenptr) {
loaderr:
moduleRDBLoadError(io);
- return NULL; /* Never reached. */
+ return NULL;
}
/* In the context of the rdb_load method of a module data type, loads a string
@@ -3288,7 +3689,7 @@ RedisModuleString *RM_LoadString(RedisModuleIO *io) {
* RedisModule_Realloc() or RedisModule_Free().
*
* The size of the string is stored at '*lenptr' if not NULL.
- * The returned string is not automatically NULL termianted, it is loaded
+ * The returned string is not automatically NULL terminated, it is loaded
* exactly as it was stored inisde the RDB file. */
char *RM_LoadStringBuffer(RedisModuleIO *io, size_t *lenptr) {
return moduleLoadString(io,1,lenptr);
@@ -3316,6 +3717,7 @@ saveerr:
/* In the context of the rdb_save method of a module data type, loads back the
* double value saved by RedisModule_SaveDouble(). */
double RM_LoadDouble(RedisModuleIO *io) {
+ if (io->error) return 0;
if (io->ver == 2) {
uint64_t opcode = rdbLoadLen(io->rio,NULL);
if (opcode != RDB_MODULE_OPCODE_DOUBLE) goto loaderr;
@@ -3327,7 +3729,7 @@ double RM_LoadDouble(RedisModuleIO *io) {
loaderr:
moduleRDBLoadError(io);
- return 0; /* Never reached. */
+ return 0;
}
/* In the context of the rdb_save method of a module data type, saves a float
@@ -3352,6 +3754,7 @@ saveerr:
/* In the context of the rdb_save method of a module data type, loads back the
* float value saved by RedisModule_SaveFloat(). */
float RM_LoadFloat(RedisModuleIO *io) {
+ if (io->error) return 0;
if (io->ver == 2) {
uint64_t opcode = rdbLoadLen(io->rio,NULL);
if (opcode != RDB_MODULE_OPCODE_FLOAT) goto loaderr;
@@ -3363,7 +3766,7 @@ float RM_LoadFloat(RedisModuleIO *io) {
loaderr:
moduleRDBLoadError(io);
- return 0; /* Never reached. */
+ return 0;
}
/* Iterate over modules, and trigger rdb aux saving for the ones modules types
@@ -3533,6 +3936,11 @@ const RedisModuleString *RM_GetKeyNameFromIO(RedisModuleIO *io) {
return io->key;
}
+/* Returns a RedisModuleString with the name of the key from RedisModuleKey */
+const RedisModuleString *RM_GetKeyNameFromModuleKey(RedisModuleKey *key) {
+ return key ? key->key : NULL;
+}
+
/* --------------------------------------------------------------------------
* Logging
* -------------------------------------------------------------------------- */
@@ -3598,6 +4006,23 @@ void RM_LogIOError(RedisModuleIO *io, const char *levelstr, const char *fmt, ...
va_end(ap);
}
+/* Redis-like assert function.
+ *
+ * A failed assertion will shut down the server and produce logging information
+ * that looks identical to information generated by Redis itself.
+ */
+void RM__Assert(const char *estr, const char *file, int line) {
+ _serverAssert(estr, file, line);
+}
+
+/* Allows adding event to the latency monitor to be observed by the LATENCY
+ * command. The call is skipped if the latency is smaller than the configured
+ * latency-monitor-threshold. */
+void RM_LatencyAddSample(const char *event, mstime_t latency) {
+ if (latency >= server.latency_monitor_threshold)
+ latencyAddSample(event, latency);
+}
+
/* --------------------------------------------------------------------------
* Blocking clients from modules
* -------------------------------------------------------------------------- */
@@ -3646,45 +4071,52 @@ void unblockClientFromModule(client *c) {
resetClient(c);
}
-/* Block a client in the context of a blocking command, returning an handle
- * which will be used, later, in order to unblock the client with a call to
- * RedisModule_UnblockClient(). The arguments specify callback functions
- * and a timeout after which the client is unblocked.
- *
- * The callbacks are called in the following contexts:
+/* Block a client in the context of a module: this function implements both
+ * RM_BlockClient() and RM_BlockClientOnKeys() depending on the fact the
+ * keys are passed or not.
*
- * reply_callback: called after a successful RedisModule_UnblockClient()
- * call in order to reply to the client and unblock it.
+ * When not blocking for keys, the keys, numkeys, and privdata parameters are
+ * not needed. The privdata in that case must be NULL, since later is
+ * RM_UnblockClient() that will provide some private data that the reply
+ * callback will receive.
*
- * reply_timeout: called when the timeout is reached in order to send an
- * error to the client.
+ * Instead when blocking for keys, normally RM_UnblockClient() will not be
+ * called (because the client will unblock when the key is modified), so
+ * 'privdata' should be provided in that case, so that once the client is
+ * unlocked and the reply callback is called, it will receive its associated
+ * private data.
*
- * free_privdata: called in order to free the private data that is passed
- * by RedisModule_UnblockClient() call.
+ * Even when blocking on keys, RM_UnblockClient() can be called however, but
+ * in that case the privdata argument is disregarded, because we pass the
+ * reply callback the privdata that is set here while blocking.
*/
-RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
+RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
client *c = ctx->client;
int islua = c->flags & CLIENT_LUA;
int ismulti = c->flags & CLIENT_MULTI;
c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+ ctx->module->blocked_clients++;
/* We need to handle the invalid operation of calling modules blocking
* commands from Lua or MULTI. We actually create an already aborted
* (client set to NULL) blocked client handle, and actually reply with
* an error. */
+ mstime_t timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
bc->client = (islua || ismulti) ? NULL : c;
bc->module = ctx->module;
bc->reply_callback = reply_callback;
bc->timeout_callback = timeout_callback;
bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
bc->free_privdata = free_privdata;
- bc->privdata = NULL;
- bc->reply_client = createClient(-1);
+ bc->privdata = privdata;
+ bc->reply_client = createClient(NULL);
bc->reply_client->flags |= CLIENT_MODULE;
bc->dbid = c->db->id;
- c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
+ bc->blocked_on_keys = keys != NULL;
+ bc->unblocked = 0;
+ c->bpop.timeout = timeout;
if (islua || ismulti) {
c->bpop.module_blocked_handle = NULL;
@@ -3692,11 +4124,150 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
"Blocking module command called from Lua script" :
"Blocking module command called from transaction");
} else {
- blockClient(c,BLOCKED_MODULE);
+ if (keys) {
+ blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,NULL,NULL);
+ } else {
+ blockClient(c,BLOCKED_MODULE);
+ }
}
return bc;
}
+/* This function is called from module.c in order to check if a module
+ * blocked for BLOCKED_MODULE and subtype 'on keys' (bc->blocked_on_keys true)
+ * can really be unblocked, since the module was able to serve the client.
+ * If the callback returns REDISMODULE_OK, then the client can be unblocked,
+ * otherwise the client remains blocked and we'll retry again when one of
+ * the keys it blocked for becomes "ready" again. */
+int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
+ int served = 0;
+ RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+ /* Protect against re-processing: don't serve clients that are already
+ * in the unblocking list for any reason (including RM_UnblockClient()
+ * explicit call). */
+ if (bc->unblocked) return REDISMODULE_ERR;
+ RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+ ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
+ ctx.blocked_ready_key = key;
+ ctx.blocked_privdata = bc->privdata;
+ ctx.module = bc->module;
+ ctx.client = bc->client;
+ ctx.blocked_client = bc;
+ if (bc->reply_callback(&ctx,(void**)c->argv,c->argc) == REDISMODULE_OK)
+ served = 1;
+ moduleFreeContext(&ctx);
+ return served;
+}
+
+/* Block a client in the context of a blocking command, returning an handle
+ * which will be used, later, in order to unblock the client with a call to
+ * RedisModule_UnblockClient(). The arguments specify callback functions
+ * and a timeout after which the client is unblocked.
+ *
+ * The callbacks are called in the following contexts:
+ *
+ * reply_callback: called after a successful RedisModule_UnblockClient()
+ * call in order to reply to the client and unblock it.
+ *
+ * reply_timeout: called when the timeout is reached in order to send an
+ * error to the client.
+ *
+ * free_privdata: called in order to free the private data that is passed
+ * by RedisModule_UnblockClient() call.
+ */
+RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
+ return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL);
+}
+
+/* This call is similar to RedisModule_BlockClient(), however in this case we
+ * don't just block the client, but also ask Redis to unblock it automatically
+ * once certain keys become "ready", that is, contain more data.
+ *
+ * Basically this is similar to what a typical Redis command usually does,
+ * like BLPOP or ZPOPMAX: the client blocks if it cannot be served ASAP,
+ * and later when the key receives new data (a list push for instance), the
+ * client is unblocked and served.
+ *
+ * However in the case of this module API, when the client is unblocked?
+ *
+ * 1. If you block ok a key of a type that has blocking operations associated,
+ * like a list, a sorted set, a stream, and so forth, the client may be
+ * unblocked once the relevant key is targeted by an operation that normally
+ * unblocks the native blocking operations for that type. So if we block
+ * on a list key, an RPUSH command may unblock our client and so forth.
+ * 2. If you are implementing your native data type, or if you want to add new
+ * unblocking conditions in addition to "1", you can call the modules API
+ * RedisModule_SignalKeyAsReady().
+ *
+ * Anyway we can't be sure if the client should be unblocked just because the
+ * key is signaled as ready: for instance a successive operation may change the
+ * key, or a client in queue before this one can be served, modifying the key
+ * as well and making it empty again. So when a client is blocked with
+ * RedisModule_BlockClientOnKeys() the reply callback is not called after
+ * RM_UnblockCLient() is called, but every time a key is signaled as ready:
+ * if the reply callback can serve the client, it returns REDISMODULE_OK
+ * and the client is unblocked, otherwise it will return REDISMODULE_ERR
+ * and we'll try again later.
+ *
+ * The reply callback can access the key that was signaled as ready by
+ * calling the API RedisModule_GetBlockedClientReadyKey(), that returns
+ * just the string name of the key as a RedisModuleString object.
+ *
+ * Thanks to this system we can setup complex blocking scenarios, like
+ * unblocking a client only if a list contains at least 5 items or other
+ * more fancy logics.
+ *
+ * Note that another difference with RedisModule_BlockClient(), is that here
+ * we pass the private data directly when blocking the client: it will
+ * be accessible later in the reply callback. Normally when blocking with
+ * RedisModule_BlockClient() the private data to reply to the client is
+ * passed when calling RedisModule_UnblockClient() but here the unblocking
+ * is performed by Redis itself, so we need to have some private data before
+ * hand. The private data is used to store any information about the specific
+ * unblocking operation that you are implementing. Such information will be
+ * freed using the free_privdata callback provided by the user.
+ *
+ * However the reply callback will be able to access the argument vector of
+ * the command, so the private data is often not needed. */
+RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
+ return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata);
+}
+
+/* This function is used in order to potentially unblock a client blocked
+ * on keys with RedisModule_BlockClientOnKeys(). When this function is called,
+ * all the clients blocked for this key will get their reply callback called,
+ * and if the callback returns REDISMODULE_OK the client will be unblocked. */
+void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) {
+ signalKeyAsReady(ctx->client->db, key);
+}
+
+/* Implements RM_UnblockClient() and moduleUnblockClient(). */
+int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) {
+ pthread_mutex_lock(&moduleUnblockedClientsMutex);
+ if (!bc->blocked_on_keys) bc->privdata = privdata;
+ bc->unblocked = 1;
+ listAddNodeTail(moduleUnblockedClients,bc);
+ if (write(server.module_blocked_pipe[1],"A",1) != 1) {
+ /* Ignore the error, this is best-effort. */
+ }
+ pthread_mutex_unlock(&moduleUnblockedClientsMutex);
+ return REDISMODULE_OK;
+}
+
+/* This API is used by the Redis core to unblock a client that was blocked
+ * by a module. */
+void moduleUnblockClient(client *c) {
+ RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+ moduleUnblockClientByHandle(bc,NULL);
+}
+
+/* Return true if the client 'c' was blocked by a module using
+ * RM_BlockClientOnKeys(). */
+int moduleClientIsBlockedOnKeys(client *c) {
+ RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+ return bc->blocked_on_keys;
+}
+
/* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
* the reply callbacks to be called in order to reply to the client.
* The 'privdata' argument will be accessible by the reply callback, so
@@ -3707,15 +4278,25 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
* needs to be passed to the client, included but not limited some slow
* to compute reply or some reply obtained via networking.
*
- * Note: this function can be called from threads spawned by the module. */
+ * Note 1: this function can be called from threads spawned by the module.
+ *
+ * Note 2: when we unblock a client that is blocked for keys using
+ * the API RedisModule_BlockClientOnKeys(), the privdata argument here is
+ * not used, and the reply callback is called with the privdata pointer that
+ * was passed when blocking the client.
+ *
+ * Unblocking a client that was blocked for keys using this API will still
+ * require the client to get some reply, so the function will use the
+ * "timeout" handler in order to do so. */
int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
- pthread_mutex_lock(&moduleUnblockedClientsMutex);
- bc->privdata = privdata;
- listAddNodeTail(moduleUnblockedClients,bc);
- if (write(server.module_blocked_pipe[1],"A",1) != 1) {
- /* Ignore the error, this is best-effort. */
+ if (bc->blocked_on_keys) {
+ /* In theory the user should always pass the timeout handler as an
+ * argument, but better to be safe than sorry. */
+ if (bc->timeout_callback == NULL) return REDISMODULE_ERR;
+ if (bc->unblocked) return REDISMODULE_OK;
+ if (bc->client) moduleBlockedClientTimedOut(bc->client);
}
- pthread_mutex_unlock(&moduleUnblockedClientsMutex);
+ moduleUnblockClientByHandle(bc,privdata);
return REDISMODULE_OK;
}
@@ -3775,16 +4356,19 @@ void moduleHandleBlockedClients(void) {
* touch the shared list. */
/* Call the reply callback if the client is valid and we have
- * any callback. */
- if (c && bc->reply_callback) {
+ * any callback. However the callback is not called if the client
+ * was blocked on keys (RM_BlockClientOnKeys()), because we already
+ * called such callback in moduleTryServeClientBlockedOnKey() when
+ * the key was signaled as ready. */
+ if (c && !bc->blocked_on_keys && bc->reply_callback) {
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
ctx.blocked_privdata = bc->privdata;
+ ctx.blocked_ready_key = NULL;
ctx.module = bc->module;
ctx.client = bc->client;
ctx.blocked_client = bc;
bc->reply_callback(&ctx,(void**)c->argv,c->argc);
- moduleHandlePropagationAfterCommandCallback(&ctx);
moduleFreeContext(&ctx);
}
@@ -3827,6 +4411,7 @@ void moduleHandleBlockedClients(void) {
/* Free 'bc' only after unblocking the client, since it is
* referenced in the client blocking context, and must be valid
* when calling unblockClient(). */
+ bc->module->blocked_clients--;
zfree(bc);
/* Lock again before to iterate the loop. */
@@ -3871,6 +4456,12 @@ void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) {
return ctx->blocked_privdata;
}
+/* Get the key that is ready when the reply callback is called in the context
+ * of a client blocked by RedisModule_BlockClientOnKeys(). */
+RedisModuleString *RM_GetBlockedClientReadyKey(RedisModuleCtx *ctx) {
+ return ctx->blocked_ready_key;
+}
+
/* Get the blocked client associated with a given context.
* This is useful in the reply and timeout callbacks of blocked clients,
* before sometimes the module has the blocked client handle references
@@ -3922,10 +4513,10 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
* access it safely from another thread, so we create a fake client here
* in order to keep things like the currently selected database and similar
* things. */
- ctx->client = createClient(-1);
+ ctx->client = createClient(NULL);
if (bc) {
selectDb(ctx->client,bc->dbid);
- ctx->client->id = bc->client->id;
+ if (bc->client) ctx->client->id = bc->client->id;
}
return ctx;
}
@@ -4023,6 +4614,20 @@ int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNoti
return REDISMODULE_OK;
}
+/* Get the configured bitmap of notify-keyspace-events (Could be used
+ * for additional filtering in RedisModuleNotificationFunc) */
+int RM_GetNotifyKeyspaceEvents() {
+ return server.notify_keyspace_events;
+}
+
+/* Expose notifyKeyspaceEvent to modules */
+int RM_NotifyKeyspaceEvent(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
+ if (!ctx || !ctx->client)
+ return REDISMODULE_ERR;
+ notifyKeyspaceEvent(type, (char *)event, key, ctx->client->db->id);
+ return REDISMODULE_OK;
+}
+
/* Dispatcher for keyspace notifications to module subscriber functions.
* This gets called only if at least one module requested to be notified on
* keyspace notifications */
@@ -4729,6 +5334,194 @@ int RM_DictCompare(RedisModuleDictIter *di, const char *op, RedisModuleString *k
return res ? REDISMODULE_OK : REDISMODULE_ERR;
}
+
+
+
+/* --------------------------------------------------------------------------
+ * Modules Info fields
+ * -------------------------------------------------------------------------- */
+
+int RM_InfoEndDictField(RedisModuleInfoCtx *ctx);
+
+/* Used to start a new section, before adding any fields. the section name will
+ * be prefixed by "<modulename>_" and must only include A-Z,a-z,0-9.
+ * NULL or empty string indicates the default section (only <modulename>) is used.
+ * When return value is REDISMODULE_ERR, the section should and will be skipped. */
+int RM_InfoAddSection(RedisModuleInfoCtx *ctx, char *name) {
+ sds full_name = sdsdup(ctx->module->name);
+ if (name != NULL && strlen(name) > 0)
+ full_name = sdscatfmt(full_name, "_%s", name);
+
+ /* Implicitly end dicts, instead of returning an error which is likely un checked. */
+ if (ctx->in_dict_field)
+ RM_InfoEndDictField(ctx);
+
+ /* proceed only if:
+ * 1) no section was requested (emit all)
+ * 2) the module name was requested (emit all)
+ * 3) this specific section was requested. */
+ if (ctx->requested_section) {
+ if (strcasecmp(ctx->requested_section, full_name) &&
+ strcasecmp(ctx->requested_section, ctx->module->name)) {
+ sdsfree(full_name);
+ ctx->in_section = 0;
+ return REDISMODULE_ERR;
+ }
+ }
+ if (ctx->sections++) ctx->info = sdscat(ctx->info,"\r\n");
+ ctx->info = sdscatfmt(ctx->info, "# %S\r\n", full_name);
+ ctx->in_section = 1;
+ sdsfree(full_name);
+ return REDISMODULE_OK;
+}
+
+/* Starts a dict field, similar to the ones in INFO KEYSPACE. Use normal
+ * RedisModule_InfoAddField* functions to add the items to this field, and
+ * terminate with RedisModule_InfoEndDictField. */
+int RM_InfoBeginDictField(RedisModuleInfoCtx *ctx, char *name) {
+ if (!ctx->in_section)
+ return REDISMODULE_ERR;
+ /* Implicitly end dicts, instead of returning an error which is likely un checked. */
+ if (ctx->in_dict_field)
+ RM_InfoEndDictField(ctx);
+ ctx->info = sdscatfmt(ctx->info,
+ "%s_%s:",
+ ctx->module->name,
+ name);
+ ctx->in_dict_field = 1;
+ return REDISMODULE_OK;
+}
+
+/* Ends a dict field, see RedisModule_InfoBeginDictField */
+int RM_InfoEndDictField(RedisModuleInfoCtx *ctx) {
+ if (!ctx->in_dict_field)
+ return REDISMODULE_ERR;
+ /* trim the last ',' if found. */
+ if (ctx->info[sdslen(ctx->info)-1]==',')
+ sdsIncrLen(ctx->info, -1);
+ ctx->info = sdscat(ctx->info, "\r\n");
+ ctx->in_dict_field = 0;
+ return REDISMODULE_OK;
+}
+
+/* Used by RedisModuleInfoFunc to add info fields.
+ * Each field will be automatically prefixed by "<modulename>_".
+ * Field names or values must not include \r\n of ":" */
+int RM_InfoAddFieldString(RedisModuleInfoCtx *ctx, char *field, RedisModuleString *value) {
+ if (!ctx->in_section)
+ return REDISMODULE_ERR;
+ if (ctx->in_dict_field) {
+ ctx->info = sdscatfmt(ctx->info,
+ "%s=%S,",
+ field,
+ (sds)value->ptr);
+ return REDISMODULE_OK;
+ }
+ ctx->info = sdscatfmt(ctx->info,
+ "%s_%s:%S\r\n",
+ ctx->module->name,
+ field,
+ (sds)value->ptr);
+ return REDISMODULE_OK;
+}
+
+int RM_InfoAddFieldCString(RedisModuleInfoCtx *ctx, char *field, char *value) {
+ if (!ctx->in_section)
+ return REDISMODULE_ERR;
+ if (ctx->in_dict_field) {
+ ctx->info = sdscatfmt(ctx->info,
+ "%s=%s,",
+ field,
+ value);
+ return REDISMODULE_OK;
+ }
+ ctx->info = sdscatfmt(ctx->info,
+ "%s_%s:%s\r\n",
+ ctx->module->name,
+ field,
+ value);
+ return REDISMODULE_OK;
+}
+
+int RM_InfoAddFieldDouble(RedisModuleInfoCtx *ctx, char *field, double value) {
+ if (!ctx->in_section)
+ return REDISMODULE_ERR;
+ if (ctx->in_dict_field) {
+ ctx->info = sdscatprintf(ctx->info,
+ "%s=%.17g,",
+ field,
+ value);
+ return REDISMODULE_OK;
+ }
+ ctx->info = sdscatprintf(ctx->info,
+ "%s_%s:%.17g\r\n",
+ ctx->module->name,
+ field,
+ value);
+ return REDISMODULE_OK;
+}
+
+int RM_InfoAddFieldLongLong(RedisModuleInfoCtx *ctx, char *field, long long value) {
+ if (!ctx->in_section)
+ return REDISMODULE_ERR;
+ if (ctx->in_dict_field) {
+ ctx->info = sdscatfmt(ctx->info,
+ "%s=%I,",
+ field,
+ value);
+ return REDISMODULE_OK;
+ }
+ ctx->info = sdscatfmt(ctx->info,
+ "%s_%s:%I\r\n",
+ ctx->module->name,
+ field,
+ value);
+ return REDISMODULE_OK;
+}
+
+int RM_InfoAddFieldULongLong(RedisModuleInfoCtx *ctx, char *field, unsigned long long value) {
+ if (!ctx->in_section)
+ return REDISMODULE_ERR;
+ if (ctx->in_dict_field) {
+ ctx->info = sdscatfmt(ctx->info,
+ "%s=%U,",
+ field,
+ value);
+ return REDISMODULE_OK;
+ }
+ ctx->info = sdscatfmt(ctx->info,
+ "%s_%s:%U\r\n",
+ ctx->module->name,
+ field,
+ value);
+ return REDISMODULE_OK;
+}
+
+int RM_RegisterInfoFunc(RedisModuleCtx *ctx, RedisModuleInfoFunc cb) {
+ ctx->module->info_cb = cb;
+ return REDISMODULE_OK;
+}
+
+sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections) {
+ dictIterator *di = dictGetIterator(modules);
+ dictEntry *de;
+
+ while ((de = dictNext(di)) != NULL) {
+ struct RedisModule *module = dictGetVal(de);
+ if (!module->info_cb)
+ continue;
+ RedisModuleInfoCtx info_ctx = {module, section, info, sections, 0};
+ module->info_cb(&info_ctx, for_crash_report);
+ /* Implicitly end dicts (no way to handle errors, and we must add the newline). */
+ if (info_ctx.in_dict_field)
+ RM_InfoEndDictField(&info_ctx);
+ info = info_ctx.info;
+ sections = info_ctx.sections;
+ }
+ dictReleaseIterator(di);
+ return info;
+}
+
/* --------------------------------------------------------------------------
* Modules utility APIs
* -------------------------------------------------------------------------- */
@@ -4963,11 +5756,13 @@ int RM_UnregisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilter *fi
ln = listSearchKey(moduleCommandFilters,filter);
if (!ln) return REDISMODULE_ERR;
listDelNode(moduleCommandFilters,ln);
-
+
ln = listSearchKey(ctx->module->filters,filter);
if (!ln) return REDISMODULE_ERR; /* Shouldn't happen */
listDelNode(ctx->module->filters,ln);
+ zfree(filter);
+
return REDISMODULE_OK;
}
@@ -5072,6 +5867,459 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos)
}
/* --------------------------------------------------------------------------
+ * Module fork API
+ * -------------------------------------------------------------------------- */
+
+/* Create a background child process with the current frozen snaphost of the
+ * main process where you can do some processing in the background without
+ * affecting / freezing the traffic and no need for threads and GIL locking.
+ * Note that Redis allows for only one concurrent fork.
+ * When the child wants to exit, it should call RedisModule_ExitFromChild.
+ * If the parent wants to kill the child it should call RedisModule_KillForkChild
+ * The done handler callback will be executed on the parent process when the
+ * child existed (but not when killed)
+ * Return: -1 on failure, on success the parent process will get a positive PID
+ * of the child, and the child process will get 0.
+ */
+int RM_Fork(RedisModuleForkDoneHandler cb, void *user_data) {
+ pid_t childpid;
+ if (hasActiveChildProcess()) {
+ return -1;
+ }
+
+ openChildInfoPipe();
+ if ((childpid = redisFork()) == 0) {
+ /* Child */
+ redisSetProcTitle("redis-module-fork");
+ } else if (childpid == -1) {
+ closeChildInfoPipe();
+ serverLog(LL_WARNING,"Can't fork for module: %s", strerror(errno));
+ } else {
+ /* Parent */
+ server.module_child_pid = childpid;
+ moduleForkInfo.done_handler = cb;
+ moduleForkInfo.done_handler_user_data = user_data;
+ serverLog(LL_NOTICE, "Module fork started pid: %d ", childpid);
+ }
+ return childpid;
+}
+
+/* Call from the child process when you want to terminate it.
+ * retcode will be provided to the done handler executed on the parent process.
+ */
+int RM_ExitFromChild(int retcode) {
+ sendChildCOWInfo(CHILD_INFO_TYPE_MODULE, "Module fork");
+ exitFromChild(retcode);
+ return REDISMODULE_OK;
+}
+
+/* Kill the active module forked child, if there is one active and the
+ * pid matches, and returns C_OK. Otherwise if there is no active module
+ * child or the pid does not match, return C_ERR without doing anything. */
+int TerminateModuleForkChild(int child_pid, int wait) {
+ /* Module child should be active and pid should match. */
+ if (server.module_child_pid == -1 ||
+ server.module_child_pid != child_pid) return C_ERR;
+
+ int statloc;
+ serverLog(LL_NOTICE,"Killing running module fork child: %ld",
+ (long) server.module_child_pid);
+ if (kill(server.module_child_pid,SIGUSR1) != -1 && wait) {
+ while(wait4(server.module_child_pid,&statloc,0,NULL) !=
+ server.module_child_pid);
+ }
+ /* Reset the buffer accumulating changes while the child saves. */
+ server.module_child_pid = -1;
+ moduleForkInfo.done_handler = NULL;
+ moduleForkInfo.done_handler_user_data = NULL;
+ closeChildInfoPipe();
+ updateDictResizePolicy();
+ return C_OK;
+}
+
+/* Can be used to kill the forked child process from the parent process.
+ * child_pid whould be the return value of RedisModule_Fork. */
+int RM_KillForkChild(int child_pid) {
+ /* Kill module child, wait for child exit. */
+ if (TerminateModuleForkChild(child_pid,1) == C_OK)
+ return REDISMODULE_OK;
+ else
+ return REDISMODULE_ERR;
+}
+
+void ModuleForkDoneHandler(int exitcode, int bysignal) {
+ serverLog(LL_NOTICE,
+ "Module fork exited pid: %d, retcode: %d, bysignal: %d",
+ server.module_child_pid, exitcode, bysignal);
+ if (moduleForkInfo.done_handler) {
+ moduleForkInfo.done_handler(exitcode, bysignal,
+ moduleForkInfo.done_handler_user_data);
+ }
+ server.module_child_pid = -1;
+ moduleForkInfo.done_handler = NULL;
+ moduleForkInfo.done_handler_user_data = NULL;
+}
+
+/* --------------------------------------------------------------------------
+ * Server hooks implementation
+ * -------------------------------------------------------------------------- */
+
+/* Register to be notified, via a callback, when the specified server event
+ * happens. The callback is called with the event as argument, and an additional
+ * argument which is a void pointer and should be cased to a specific type
+ * that is event-specific (but many events will just use NULL since they do not
+ * have additional information to pass to the callback).
+ *
+ * If the callback is NULL and there was a previous subscription, the module
+ * will be unsubscribed. If there was a previous subscription and the callback
+ * is not null, the old callback will be replaced with the new one.
+ *
+ * The callback must be of this type:
+ *
+ * int (*RedisModuleEventCallback)(RedisModuleCtx *ctx,
+ * RedisModuleEvent eid,
+ * uint64_t subevent,
+ * void *data);
+ *
+ * The 'ctx' is a normal Redis module context that the callback can use in
+ * order to call other modules APIs. The 'eid' is the event itself, this
+ * is only useful in the case the module subscribed to multiple events: using
+ * the 'id' field of this structure it is possible to check if the event
+ * is one of the events we registered with this callback. The 'subevent' field
+ * depends on the event that fired.
+ *
+ * Finally the 'data' pointer may be populated, only for certain events, with
+ * more relevant data.
+ *
+ * Here is a list of events you can use as 'eid' and related sub events:
+ *
+ * RedisModuleEvent_ReplicationRoleChanged
+ *
+ * This event is called when the instance switches from master
+ * to replica or the other way around, however the event is
+ * also called when the replica remains a replica but starts to
+ * replicate with a different master.
+ *
+ * The following sub events are available:
+ *
+ * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_MASTER
+ * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_REPLICA
+ *
+ * The 'data' field can be casted by the callback to a
+ * RedisModuleReplicationInfo structure with the following fields:
+ *
+ * int master; // true if master, false if replica
+ * char *masterhost; // master instance hostname for NOW_REPLICA
+ * int masterport; // master instance port for NOW_REPLICA
+ * char *replid1; // Main replication ID
+ * char *replid2; // Secondary replication ID
+ * uint64_t repl1_offset; // Main replication offset
+ * uint64_t repl2_offset; // Offset of replid2 validity
+ *
+ * RedisModuleEvent_Persistence
+ *
+ * This event is called when RDB saving or AOF rewriting starts
+ * and ends. The following sub events are available:
+ *
+ * REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START
+ * REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START
+ * REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START
+ * REDISMODULE_SUBEVENT_PERSISTENCE_ENDED
+ * REDISMODULE_SUBEVENT_PERSISTENCE_FAILED
+ *
+ * The above events are triggered not just when the user calls the
+ * relevant commands like BGSAVE, but also when a saving operation
+ * or AOF rewriting occurs because of internal server triggers.
+ * The SYNC_RDB_START sub events are happening in the forground due to
+ * SAVE command, FLUSHALL, or server shutdown, and the other RDB and
+ * AOF sub events are executed in a background fork child, so any
+ * action the module takes can only affect the generated AOF or RDB,
+ * but will not be reflected in the parent process and affect connected
+ * clients and commands. Also note that the AOF_START sub event may end
+ * up saving RDB content in case of an AOF with rdb-preamble.
+ *
+ * RedisModuleEvent_FlushDB
+ *
+ * The FLUSHALL, FLUSHDB or an internal flush (for instance
+ * because of replication, after the replica synchronization)
+ * happened. The following sub events are available:
+ *
+ * REDISMODULE_SUBEVENT_FLUSHDB_START
+ * REDISMODULE_SUBEVENT_FLUSHDB_END
+ *
+ * The data pointer can be casted to a RedisModuleFlushInfo
+ * structure with the following fields:
+ *
+ * int32_t async; // True if the flush is done in a thread.
+ * See for instance FLUSHALL ASYNC.
+ * In this case the END callback is invoked
+ * immediately after the database is put
+ * in the free list of the thread.
+ * int32_t dbnum; // Flushed database number, -1 for all the DBs
+ * in the case of the FLUSHALL operation.
+ *
+ * The start event is called *before* the operation is initated, thus
+ * allowing the callback to call DBSIZE or other operation on the
+ * yet-to-free keyspace.
+ *
+ * RedisModuleEvent_Loading
+ *
+ * Called on loading operations: at startup when the server is
+ * started, but also after a first synchronization when the
+ * replica is loading the RDB file from the master.
+ * The following sub events are available:
+ *
+ * REDISMODULE_SUBEVENT_LOADING_RDB_START
+ * REDISMODULE_SUBEVENT_LOADING_AOF_START
+ * REDISMODULE_SUBEVENT_LOADING_REPL_START
+ * REDISMODULE_SUBEVENT_LOADING_ENDED
+ * REDISMODULE_SUBEVENT_LOADING_FAILED
+ *
+ * Note that AOF loading may start with an RDB data in case of
+ * rdb-preamble, in which case you'll only recieve an AOF_START event.
+ *
+ *
+ * RedisModuleEvent_ClientChange
+ *
+ * Called when a client connects or disconnects.
+ * The data pointer can be casted to a RedisModuleClientInfo
+ * structure, documented in RedisModule_GetClientInfoById().
+ * The following sub events are available:
+ *
+ * REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED
+ * REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED
+ *
+ * RedisModuleEvent_Shutdown
+ *
+ * The server is shutting down. No subevents are available.
+ *
+ * RedisModuleEvent_ReplicaChange
+ *
+ * This event is called when the instance (that can be both a
+ * master or a replica) get a new online replica, or lose a
+ * replica since it gets disconnected.
+ * The following sub events are availble:
+ *
+ * REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE
+ * REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE
+ *
+ * No additional information is available so far: future versions
+ * of Redis will have an API in order to enumerate the replicas
+ * connected and their state.
+ *
+ * RedisModuleEvent_CronLoop
+ *
+ * This event is called every time Redis calls the serverCron()
+ * function in order to do certain bookkeeping. Modules that are
+ * required to do operations from time to time may use this callback.
+ * Normally Redis calls this function 10 times per second, but
+ * this changes depending on the "hz" configuration.
+ * No sub events are available.
+ *
+ * The data pointer can be casted to a RedisModuleCronLoop
+ * structure with the following fields:
+ *
+ * int32_t hz; // Approximate number of events per second.
+ *
+ * RedisModuleEvent_MasterLinkChange
+ *
+ * This is called for replicas in order to notify when the
+ * replication link becomes functional (up) with our master,
+ * or when it goes down. Note that the link is not considered
+ * up when we just connected to the master, but only if the
+ * replication is happening correctly.
+ * The following sub events are available:
+ *
+ * REDISMODULE_SUBEVENT_MASTER_LINK_UP
+ * REDISMODULE_SUBEVENT_MASTER_LINK_DOWN
+ *
+ * RedisModuleEvent_ModuleChange
+ *
+ * This event is called when a new module is loaded or one is unloaded.
+ * The following sub events are availble:
+ *
+ * REDISMODULE_SUBEVENT_MODULE_LOADED
+ * REDISMODULE_SUBEVENT_MODULE_UNLOADED
+ *
+ * The data pointer can be casted to a RedisModuleModuleChange
+ * structure with the following fields:
+ *
+ * const char* module_name; // Name of module loaded or unloaded.
+ * int32_t module_version; // Module version.
+ *
+ * RedisModuleEvent_LoadingProgress
+ *
+ * This event is called repeatedly called while an RDB or AOF file
+ * is being loaded.
+ * The following sub events are availble:
+ *
+ * REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB
+ * REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF
+ *
+ * The data pointer can be casted to a RedisModuleLoadingProgress
+ * structure with the following fields:
+ *
+ * int32_t hz; // Approximate number of events per second.
+ * int32_t progress; // Approximate progress between 0 and 1024,
+ * or -1 if unknown.
+ *
+ * The function returns REDISMODULE_OK if the module was successfully subscrived
+ * for the specified event. If the API is called from a wrong context then
+ * REDISMODULE_ERR is returned. */
+int RM_SubscribeToServerEvent(RedisModuleCtx *ctx, RedisModuleEvent event, RedisModuleEventCallback callback) {
+ RedisModuleEventListener *el;
+
+ /* Protect in case of calls from contexts without a module reference. */
+ if (ctx->module == NULL) return REDISMODULE_ERR;
+
+ /* Search an event matching this module and event ID. */
+ listIter li;
+ listNode *ln;
+ listRewind(RedisModule_EventListeners,&li);
+ while((ln = listNext(&li))) {
+ el = ln->value;
+ if (el->module == ctx->module && el->event.id == event.id)
+ break; /* Matching event found. */
+ }
+
+ /* Modify or remove the event listener if we already had one. */
+ if (ln) {
+ if (callback == NULL) {
+ listDelNode(RedisModule_EventListeners,ln);
+ zfree(el);
+ } else {
+ el->callback = callback; /* Update the callback with the new one. */
+ }
+ return REDISMODULE_OK;
+ }
+
+ /* No event found, we need to add a new one. */
+ el = zmalloc(sizeof(*el));
+ el->module = ctx->module;
+ el->event = event;
+ el->callback = callback;
+ listAddNodeTail(RedisModule_EventListeners,el);
+ return REDISMODULE_OK;
+}
+
+/* This is called by the Redis internals every time we want to fire an
+ * event that can be interceppted by some module. The pointer 'data' is useful
+ * in order to populate the event-specific structure when needed, in order
+ * to return the structure with more information to the callback.
+ *
+ * 'eid' and 'subid' are just the main event ID and the sub event associated
+ * with the event, depending on what exactly happened. */
+void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
+ /* Fast path to return ASAP if there is nothing to do, avoiding to
+ * setup the iterator and so forth: we want this call to be extremely
+ * cheap if there are no registered modules. */
+ if (listLength(RedisModule_EventListeners) == 0) return;
+
+ listIter li;
+ listNode *ln;
+ listRewind(RedisModule_EventListeners,&li);
+ while((ln = listNext(&li))) {
+ RedisModuleEventListener *el = ln->value;
+ if (el->event.id == eid) {
+ RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+ ctx.module = el->module;
+
+ if (ModulesInHooks == 0) {
+ ctx.client = moduleFreeContextReusedClient;
+ } else {
+ ctx.client = createClient(NULL);
+ ctx.client->flags |= CLIENT_MODULE;
+ ctx.client->user = NULL; /* Root user. */
+ }
+
+ void *moduledata = NULL;
+ RedisModuleClientInfoV1 civ1;
+ RedisModuleReplicationInfoV1 riv1;
+ RedisModuleModuleChangeV1 mcv1;
+ /* Start at DB zero by default when calling the handler. It's
+ * up to the specific event setup to change it when it makes
+ * sense. For instance for FLUSHDB events we select the correct
+ * DB automatically. */
+ selectDb(ctx.client, 0);
+
+ /* Event specific context and data pointer setup. */
+ if (eid == REDISMODULE_EVENT_CLIENT_CHANGE) {
+ modulePopulateClientInfoStructure(&civ1,data,
+ el->event.dataver);
+ moduledata = &civ1;
+ } else if (eid == REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED) {
+ modulePopulateReplicationInfoStructure(&riv1,el->event.dataver);
+ moduledata = &riv1;
+ } else if (eid == REDISMODULE_EVENT_FLUSHDB) {
+ moduledata = data;
+ RedisModuleFlushInfoV1 *fi = data;
+ if (fi->dbnum != -1)
+ selectDb(ctx.client, fi->dbnum);
+ } else if (eid == REDISMODULE_EVENT_MODULE_CHANGE) {
+ RedisModule *m = data;
+ if (m == el->module)
+ continue;
+ mcv1.version = REDISMODULE_MODULE_CHANGE_VERSION;
+ mcv1.module_name = m->name;
+ mcv1.module_version = m->ver;
+ moduledata = &mcv1;
+ } else if (eid == REDISMODULE_EVENT_LOADING_PROGRESS) {
+ moduledata = data;
+ } else if (eid == REDISMODULE_EVENT_CRON_LOOP) {
+ moduledata = data;
+ }
+
+ ModulesInHooks++;
+ el->module->in_hook++;
+ el->callback(&ctx,el->event,subid,moduledata);
+ el->module->in_hook--;
+ ModulesInHooks--;
+
+ if (ModulesInHooks != 0) freeClient(ctx.client);
+ moduleFreeContext(&ctx);
+ }
+ }
+}
+
+/* Remove all the listeners for this module: this is used before unloading
+ * a module. */
+void moduleUnsubscribeAllServerEvents(RedisModule *module) {
+ RedisModuleEventListener *el;
+ listIter li;
+ listNode *ln;
+ listRewind(RedisModule_EventListeners,&li);
+
+ while((ln = listNext(&li))) {
+ el = ln->value;
+ if (el->module == module) {
+ listDelNode(RedisModule_EventListeners,ln);
+ zfree(el);
+ }
+ }
+}
+
+void processModuleLoadingProgressEvent(int is_aof) {
+ long long now = ustime();
+ static long long next_event = 0;
+ if (now >= next_event) {
+ /* Fire the loading progress modules end event. */
+ int progress = -1;
+ if (server.loading_total_bytes)
+ progress = (server.loading_total_bytes<<10) / server.loading_total_bytes;
+ RedisModuleFlushInfoV1 fi = {REDISMODULE_LOADING_PROGRESS_VERSION,
+ server.hz,
+ progress};
+ moduleFireServerEvent(REDISMODULE_EVENT_LOADING_PROGRESS,
+ is_aof?
+ REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF:
+ REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB,
+ &fi);
+ /* decide when the next event should fire. */
+ next_event = now + 1000000 / server.hz;
+ }
+}
+
+/* --------------------------------------------------------------------------
* Modules API internals
* -------------------------------------------------------------------------- */
@@ -5113,7 +6361,7 @@ void moduleInitModulesSystem(void) {
/* Set up the keyspace notification susbscriber list and static client */
moduleKeyspaceSubscribers = listCreate();
- moduleFreeContextReusedClient = createClient(-1);
+ moduleFreeContextReusedClient = createClient(NULL);
moduleFreeContextReusedClient->flags |= CLIENT_MODULE;
moduleFreeContextReusedClient->user = NULL; /* root user. */
@@ -5135,6 +6383,9 @@ void moduleInitModulesSystem(void) {
/* Create the timers radix tree. */
Timers = raxNew();
+ /* Setup the event listeners data structures. */
+ RedisModule_EventListeners = listCreate();
+
/* Our thread-safe contexts GIL must start with already locked:
* it is just unlocked when it's safe. */
pthread_mutex_lock(&moduleGIL);
@@ -5170,6 +6421,8 @@ void moduleLoadFromQueue(void) {
void moduleFreeModuleStructure(struct RedisModule *module) {
listRelease(module->types);
listRelease(module->filters);
+ listRelease(module->usedby);
+ listRelease(module->using);
sdsfree(module->name);
zfree(module);
}
@@ -5231,8 +6484,14 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) {
/* Redis module loaded! Register it. */
dictAdd(modules,ctx.module->name,ctx.module);
+ ctx.module->blocked_clients = 0;
ctx.module->handle = handle;
serverLog(LL_NOTICE,"Module '%s' loaded from %s",ctx.module->name,path);
+ /* Fire the loaded modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE,
+ REDISMODULE_SUBEVENT_MODULE_LOADED,
+ ctx.module);
+
moduleFreeContext(&ctx);
return C_OK;
}
@@ -5256,6 +6515,26 @@ int moduleUnload(sds name) {
} else if (listLength(module->usedby)) {
errno = EPERM;
return REDISMODULE_ERR;
+ } else if (module->blocked_clients) {
+ errno = EAGAIN;
+ return REDISMODULE_ERR;
+ }
+
+ /* Give module a chance to clean up. */
+ int (*onunload)(void *);
+ onunload = (int (*)(void *))(unsigned long) dlsym(module->handle, "RedisModule_OnUnload");
+ if (onunload) {
+ RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+ ctx.module = module;
+ ctx.client = moduleFreeContextReusedClient;
+ int unload_status = onunload((void*)&ctx);
+ moduleFreeContext(&ctx);
+
+ if (unload_status == REDISMODULE_ERR) {
+ serverLog(LL_WARNING, "Module %s OnUnload failed. Unload canceled.", name);
+ errno = ECANCELED;
+ return REDISMODULE_ERR;
+ }
}
moduleUnregisterCommands(module);
@@ -5265,8 +6544,7 @@ int moduleUnload(sds name) {
/* Remove any notification subscribers this module might have */
moduleUnsubscribeNotifications(module);
-
- /* Unregister all the hooks. TODO: Yet no hooks support here. */
+ moduleUnsubscribeAllServerEvents(module);
/* Unload the dynamic library. */
if (dlclose(module->handle) == -1) {
@@ -5276,6 +6554,11 @@ int moduleUnload(sds name) {
module->name, error);
}
+ /* Fire the unloaded modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE,
+ REDISMODULE_SUBEVENT_MODULE_UNLOADED,
+ module);
+
/* Remove from list of modules. */
serverLog(LL_NOTICE,"Module %s unloaded",module->name);
dictDelete(modules,module->name);
@@ -5304,6 +6587,62 @@ void addReplyLoadedModules(client *c) {
dictReleaseIterator(di);
}
+/* Helper for genModulesInfoString(): given a list of modules, return
+ * am SDS string in the form "[modulename|modulename2|...]" */
+sds genModulesInfoStringRenderModulesList(list *l) {
+ listIter li;
+ listNode *ln;
+ listRewind(l,&li);
+ sds output = sdsnew("[");
+ while((ln = listNext(&li))) {
+ RedisModule *module = ln->value;
+ output = sdscat(output,module->name);
+ }
+ output = sdstrim(output,"|");
+ output = sdscat(output,"]");
+ return output;
+}
+
+/* Helper for genModulesInfoString(): render module options as an SDS string. */
+sds genModulesInfoStringRenderModuleOptions(struct RedisModule *module) {
+ sds output = sdsnew("[");
+ if (module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS)
+ output = sdscat(output,"handle-io-errors|");
+ output = sdstrim(output,"|");
+ output = sdscat(output,"]");
+ return output;
+}
+
+
+/* Helper function for the INFO command: adds loaded modules as to info's
+ * output.
+ *
+ * After the call, the passed sds info string is no longer valid and all the
+ * references must be substituted with the new pointer returned by the call. */
+sds genModulesInfoString(sds info) {
+ dictIterator *di = dictGetIterator(modules);
+ dictEntry *de;
+
+ while ((de = dictNext(di)) != NULL) {
+ sds name = dictGetKey(de);
+ struct RedisModule *module = dictGetVal(de);
+
+ sds usedby = genModulesInfoStringRenderModulesList(module->usedby);
+ sds using = genModulesInfoStringRenderModulesList(module->using);
+ sds options = genModulesInfoStringRenderModuleOptions(module);
+ info = sdscatfmt(info,
+ "module:name=%S,ver=%i,api=%i,filters=%i,"
+ "usedby=%S,using=%S,options=%S\r\n",
+ name, module->ver, module->apiver,
+ (int)listLength(module->filters), usedby, using, options);
+ sdsfree(usedby);
+ sdsfree(using);
+ sdsfree(options);
+ }
+ dictReleaseIterator(di);
+ return info;
+}
+
/* Redis MODULE command.
*
* MODULE LOAD <path> [args...] */
@@ -5349,6 +6688,10 @@ NULL
errmsg = "the module exports APIs used by other modules. "
"Please unload them first and try again";
break;
+ case EAGAIN:
+ errmsg = "the module has blocked clients. "
+ "Please wait them unblocked and try again";
+ break;
default:
errmsg = "operation not possible.";
break;
@@ -5368,6 +6711,38 @@ size_t moduleCount(void) {
return dictSize(modules);
}
+/* Set the key LRU/LFU depending on server.maxmemory_policy.
+ * The lru_idle arg is idle time in seconds, and is only relevant if the
+ * eviction policy is LRU based.
+ * The lfu_freq arg is a logarithmic counter that provides an indication of
+ * the access frequencyonly (must be <= 255) and is only relevant if the
+ * eviction policy is LFU based.
+ * Either or both of them may be <0, in that case, nothing is set. */
+/* return value is an indication if the lru field was updated or not. */
+int RM_SetLRUOrLFU(RedisModuleKey *key, long long lfu_freq, long long lru_idle) {
+ if (!key->value)
+ return REDISMODULE_ERR;
+ if (objectSetLRUOrLFU(key->value, lfu_freq, lru_idle, lru_idle>=0 ? LRU_CLOCK() : 0))
+ return REDISMODULE_OK;
+ return REDISMODULE_ERR;
+}
+
+/* Gets the key LRU or LFU (depending on the current eviction policy).
+ * One will be set to the appropiate return value, and the other will be set to -1.
+ * see RedisModule_SetLRUOrLFU for units and ranges.
+ * return value is an indication of success. */
+int RM_GetLRUOrLFU(RedisModuleKey *key, long long *lfu_freq, long long *lru_idle) {
+ *lru_idle = *lfu_freq = -1;
+ if (!key->value)
+ return REDISMODULE_ERR;
+ if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
+ *lfu_freq = LFUDecrAndReturn(key->value);
+ } else {
+ *lru_idle = estimateObjectIdleTime(key->value)/1000;
+ }
+ return REDISMODULE_OK;
+}
+
/* Register all the APIs we export. Keep this function at the end of the
* file so that's easy to seek it to add new entries. */
void moduleRegisterCoreAPI(void) {
@@ -5386,8 +6761,12 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(ReplyWithError);
REGISTER_API(ReplyWithSimpleString);
REGISTER_API(ReplyWithArray);
+ REGISTER_API(ReplyWithNullArray);
+ REGISTER_API(ReplyWithEmptyArray);
REGISTER_API(ReplySetArrayLength);
REGISTER_API(ReplyWithString);
+ REGISTER_API(ReplyWithEmptyString);
+ REGISTER_API(ReplyWithVerbatimString);
REGISTER_API(ReplyWithStringBuffer);
REGISTER_API(ReplyWithCString);
REGISTER_API(ReplyWithNull);
@@ -5428,6 +6807,9 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(StringTruncate);
REGISTER_API(SetExpire);
REGISTER_API(GetExpire);
+ REGISTER_API(ResetDataset);
+ REGISTER_API(DbSize);
+ REGISTER_API(RandomKey);
REGISTER_API(ZsetAdd);
REGISTER_API(ZsetIncrby);
REGISTER_API(ZsetScore);
@@ -5452,6 +6834,9 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(ModuleTypeSetValue);
REGISTER_API(ModuleTypeGetType);
REGISTER_API(ModuleTypeGetValue);
+ REGISTER_API(IsIOError);
+ REGISTER_API(SetModuleOptions);
+ REGISTER_API(SignalModifiedKey);
REGISTER_API(SaveUnsigned);
REGISTER_API(LoadUnsigned);
REGISTER_API(SaveSigned);
@@ -5467,11 +6852,14 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(EmitAOF);
REGISTER_API(Log);
REGISTER_API(LogIOError);
+ REGISTER_API(_Assert);
+ REGISTER_API(LatencyAddSample);
REGISTER_API(StringAppendBuffer);
REGISTER_API(RetainString);
REGISTER_API(StringCompare);
REGISTER_API(GetContextFromIO);
REGISTER_API(GetKeyNameFromIO);
+ REGISTER_API(GetKeyNameFromModuleKey);
REGISTER_API(BlockClient);
REGISTER_API(UnblockClient);
REGISTER_API(IsBlockedReplyRequest);
@@ -5486,6 +6874,8 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(DigestAddStringBuffer);
REGISTER_API(DigestAddLongLong);
REGISTER_API(DigestEndSequence);
+ REGISTER_API(NotifyKeyspaceEvent);
+ REGISTER_API(GetNotifyKeyspaceEvents);
REGISTER_API(SubscribeToKeyspaceEvents);
REGISTER_API(RegisterClusterMessageReceiver);
REGISTER_API(SendClusterMessage);
@@ -5534,4 +6924,24 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(CommandFilterArgInsert);
REGISTER_API(CommandFilterArgReplace);
REGISTER_API(CommandFilterArgDelete);
+ REGISTER_API(Fork);
+ REGISTER_API(ExitFromChild);
+ REGISTER_API(KillForkChild);
+ REGISTER_API(RegisterInfoFunc);
+ REGISTER_API(InfoAddSection);
+ REGISTER_API(InfoBeginDictField);
+ REGISTER_API(InfoEndDictField);
+ REGISTER_API(InfoAddFieldString);
+ REGISTER_API(InfoAddFieldCString);
+ REGISTER_API(InfoAddFieldDouble);
+ REGISTER_API(InfoAddFieldLongLong);
+ REGISTER_API(InfoAddFieldULongLong);
+ REGISTER_API(GetClientInfoById);
+ REGISTER_API(PublishMessage);
+ REGISTER_API(SubscribeToServerEvent);
+ REGISTER_API(SetLRUOrLFU);
+ REGISTER_API(GetLRUOrLFU);
+ REGISTER_API(BlockClientOnKeys);
+ REGISTER_API(SignalKeyAsReady);
+ REGISTER_API(GetBlockedClientReadyKey);
}