summaryrefslogtreecommitdiff
path: root/src/module.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/module.c')
-rw-r--r--src/module.c426
1 files changed, 373 insertions, 53 deletions
diff --git a/src/module.c b/src/module.c
index 09cf6ac7a..3a161c05f 100644
--- a/src/module.c
+++ b/src/module.c
@@ -140,6 +140,9 @@ struct RedisModuleCtx {
void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */
int postponed_arrays_count; /* Number of entries in postponed_arrays. */
void *blocked_privdata; /* Privdata set when unblocking a client. */
+ RedisModuleString *blocked_ready_key; /* Key ready when the reply callback
+ gets called for clients blocked
+ on keys. */
/* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */
int *keys_pos;
@@ -153,7 +156,7 @@ struct RedisModuleCtx {
};
typedef struct RedisModuleCtx RedisModuleCtx;
-#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL, {0}}
+#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, NULL, 0, NULL, {0}}
#define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
#define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
#define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2)
@@ -245,6 +248,8 @@ typedef struct RedisModuleBlockedClient {
client *reply_client; /* Fake client used to accumulate replies
in thread safe contexts. */
int dbid; /* Database number selected by the original client. */
+ int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */
+ int unblocked; /* Already on the moduleUnblocked list. */
} RedisModuleBlockedClient;
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
@@ -1620,6 +1625,27 @@ int modulePopulateClientInfoStructure(void *ci, client *client, int structver) {
return REDISMODULE_OK;
}
+/* This is an helper for moduleFireServerEvent() and other functions:
+ * It populates the replication info structure with the appropriate
+ * fields depending on the version provided. If the version is not valid
+ * then REDISMODULE_ERR is returned. Otherwise the function returns
+ * REDISMODULE_OK and the structure pointed by 'ri' gets populated. */
+int modulePopulateReplicationInfoStructure(void *ri, int structver) {
+ if (structver != 1) return REDISMODULE_ERR;
+
+ RedisModuleReplicationInfoV1 *ri1 = ri;
+ memset(ri1,0,sizeof(*ri1));
+ ri1->version = structver;
+ ri1->master = server.masterhost==NULL;
+ ri1->masterhost = server.masterhost? server.masterhost: "";
+ ri1->masterport = server.masterport;
+ ri1->replid1 = server.replid;
+ ri1->replid2 = server.replid2;
+ ri1->repl1_offset = server.master_repl_offset;
+ ri1->repl2_offset = server.second_replid_offset;
+ return REDISMODULE_OK;
+}
+
/* Return information about the client with the specified ID (that was
* previously obtained via the RedisModule_GetClientId() API). If the
* client exists, REDISMODULE_OK is returned, otherwise REDISMODULE_ERR
@@ -1672,6 +1698,15 @@ int RM_GetClientInfoById(void *ci, uint64_t id) {
return modulePopulateClientInfoStructure(ci,client,structver);
}
+/* Publish a message to subscribers (see PUBLISH command). */
+int RM_PublishMessage(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) {
+ UNUSED(ctx);
+ int receivers = pubsubPublishMessage(channel, message);
+ if (server.cluster_enabled)
+ clusterPropagatePublish(channel, message);
+ return receivers;
+}
+
/* Return the currently selected DB. */
int RM_GetSelectedDb(RedisModuleCtx *ctx) {
return ctx->client->db->id;
@@ -1960,6 +1995,28 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) {
return REDISMODULE_OK;
}
+/* Performs similar operation to FLUSHALL, and optionally start a new AOF file (if enabled)
+ * If restart_aof is true, you must make sure the command that triggered this call is not
+ * propagated to the AOF file.
+ * When async is set to true, db contents will be freed by a background thread. */
+void RM_ResetDataset(int restart_aof, int async) {
+ if (restart_aof && server.aof_state != AOF_OFF) stopAppendOnly();
+ flushAllDataAndResetRDB(async? EMPTYDB_ASYNC: EMPTYDB_NO_FLAGS);
+ if (server.aof_enabled && restart_aof) restartAOFAfterSYNC();
+}
+
+/* Returns the number of keys in the current db. */
+unsigned long long RM_DbSize(RedisModuleCtx *ctx) {
+ return dictSize(ctx->client->db->dict);
+}
+
+/* Returns a name of a random key, or NULL if current db is empty. */
+RedisModuleString *RM_RandomKey(RedisModuleCtx *ctx) {
+ robj *key = dbRandomKey(ctx->client->db);
+ autoMemoryAdd(ctx,REDISMODULE_AM_STRING,key);
+ return key;
+}
+
/* --------------------------------------------------------------------------
* Key API for String type
* -------------------------------------------------------------------------- */
@@ -4014,23 +4071,26 @@ void unblockClientFromModule(client *c) {
resetClient(c);
}
-/* Block a client in the context of a blocking command, returning an handle
- * which will be used, later, in order to unblock the client with a call to
- * RedisModule_UnblockClient(). The arguments specify callback functions
- * and a timeout after which the client is unblocked.
+/* Block a client in the context of a module: this function implements both
+ * RM_BlockClient() and RM_BlockClientOnKeys() depending on the fact the
+ * keys are passed or not.
*
- * The callbacks are called in the following contexts:
+ * When not blocking for keys, the keys, numkeys, and privdata parameters are
+ * not needed. The privdata in that case must be NULL, since later is
+ * RM_UnblockClient() that will provide some private data that the reply
+ * callback will receive.
*
- * reply_callback: called after a successful RedisModule_UnblockClient()
- * call in order to reply to the client and unblock it.
- *
- * reply_timeout: called when the timeout is reached in order to send an
- * error to the client.
+ * Instead when blocking for keys, normally RM_UnblockClient() will not be
+ * called (because the client will unblock when the key is modified), so
+ * 'privdata' should be provided in that case, so that once the client is
+ * unlocked and the reply callback is called, it will receive its associated
+ * private data.
*
- * free_privdata: called in order to free the private data that is passed
- * by RedisModule_UnblockClient() call.
+ * Even when blocking on keys, RM_UnblockClient() can be called however, but
+ * in that case the privdata argument is disregarded, because we pass the
+ * reply callback the privdata that is set here while blocking.
*/
-RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
+RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
client *c = ctx->client;
int islua = c->flags & CLIENT_LUA;
int ismulti = c->flags & CLIENT_MULTI;
@@ -4043,17 +4103,20 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
* commands from Lua or MULTI. We actually create an already aborted
* (client set to NULL) blocked client handle, and actually reply with
* an error. */
+ mstime_t timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
bc->client = (islua || ismulti) ? NULL : c;
bc->module = ctx->module;
bc->reply_callback = reply_callback;
bc->timeout_callback = timeout_callback;
bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
bc->free_privdata = free_privdata;
- bc->privdata = NULL;
+ bc->privdata = privdata;
bc->reply_client = createClient(NULL);
bc->reply_client->flags |= CLIENT_MODULE;
bc->dbid = c->db->id;
- c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
+ bc->blocked_on_keys = keys != NULL;
+ bc->unblocked = 0;
+ c->bpop.timeout = timeout;
if (islua || ismulti) {
c->bpop.module_blocked_handle = NULL;
@@ -4061,11 +4124,150 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
"Blocking module command called from Lua script" :
"Blocking module command called from transaction");
} else {
- blockClient(c,BLOCKED_MODULE);
+ if (keys) {
+ blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,NULL,NULL);
+ } else {
+ blockClient(c,BLOCKED_MODULE);
+ }
}
return bc;
}
+/* This function is called from module.c in order to check if a module
+ * blocked for BLOCKED_MODULE and subtype 'on keys' (bc->blocked_on_keys true)
+ * can really be unblocked, since the module was able to serve the client.
+ * If the callback returns REDISMODULE_OK, then the client can be unblocked,
+ * otherwise the client remains blocked and we'll retry again when one of
+ * the keys it blocked for becomes "ready" again. */
+int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
+ int served = 0;
+ RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+ /* Protect against re-processing: don't serve clients that are already
+ * in the unblocking list for any reason (including RM_UnblockClient()
+ * explicit call). */
+ if (bc->unblocked) return REDISMODULE_ERR;
+ RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+ ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
+ ctx.blocked_ready_key = key;
+ ctx.blocked_privdata = bc->privdata;
+ ctx.module = bc->module;
+ ctx.client = bc->client;
+ ctx.blocked_client = bc;
+ if (bc->reply_callback(&ctx,(void**)c->argv,c->argc) == REDISMODULE_OK)
+ served = 1;
+ moduleFreeContext(&ctx);
+ return served;
+}
+
+/* Block a client in the context of a blocking command, returning an handle
+ * which will be used, later, in order to unblock the client with a call to
+ * RedisModule_UnblockClient(). The arguments specify callback functions
+ * and a timeout after which the client is unblocked.
+ *
+ * The callbacks are called in the following contexts:
+ *
+ * reply_callback: called after a successful RedisModule_UnblockClient()
+ * call in order to reply to the client and unblock it.
+ *
+ * reply_timeout: called when the timeout is reached in order to send an
+ * error to the client.
+ *
+ * free_privdata: called in order to free the private data that is passed
+ * by RedisModule_UnblockClient() call.
+ */
+RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
+ return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL);
+}
+
+/* This call is similar to RedisModule_BlockClient(), however in this case we
+ * don't just block the client, but also ask Redis to unblock it automatically
+ * once certain keys become "ready", that is, contain more data.
+ *
+ * Basically this is similar to what a typical Redis command usually does,
+ * like BLPOP or ZPOPMAX: the client blocks if it cannot be served ASAP,
+ * and later when the key receives new data (a list push for instance), the
+ * client is unblocked and served.
+ *
+ * However in the case of this module API, when the client is unblocked?
+ *
+ * 1. If you block ok a key of a type that has blocking operations associated,
+ * like a list, a sorted set, a stream, and so forth, the client may be
+ * unblocked once the relevant key is targeted by an operation that normally
+ * unblocks the native blocking operations for that type. So if we block
+ * on a list key, an RPUSH command may unblock our client and so forth.
+ * 2. If you are implementing your native data type, or if you want to add new
+ * unblocking conditions in addition to "1", you can call the modules API
+ * RedisModule_SignalKeyAsReady().
+ *
+ * Anyway we can't be sure if the client should be unblocked just because the
+ * key is signaled as ready: for instance a successive operation may change the
+ * key, or a client in queue before this one can be served, modifying the key
+ * as well and making it empty again. So when a client is blocked with
+ * RedisModule_BlockClientOnKeys() the reply callback is not called after
+ * RM_UnblockCLient() is called, but every time a key is signaled as ready:
+ * if the reply callback can serve the client, it returns REDISMODULE_OK
+ * and the client is unblocked, otherwise it will return REDISMODULE_ERR
+ * and we'll try again later.
+ *
+ * The reply callback can access the key that was signaled as ready by
+ * calling the API RedisModule_GetBlockedClientReadyKey(), that returns
+ * just the string name of the key as a RedisModuleString object.
+ *
+ * Thanks to this system we can setup complex blocking scenarios, like
+ * unblocking a client only if a list contains at least 5 items or other
+ * more fancy logics.
+ *
+ * Note that another difference with RedisModule_BlockClient(), is that here
+ * we pass the private data directly when blocking the client: it will
+ * be accessible later in the reply callback. Normally when blocking with
+ * RedisModule_BlockClient() the private data to reply to the client is
+ * passed when calling RedisModule_UnblockClient() but here the unblocking
+ * is performed by Redis itself, so we need to have some private data before
+ * hand. The private data is used to store any information about the specific
+ * unblocking operation that you are implementing. Such information will be
+ * freed using the free_privdata callback provided by the user.
+ *
+ * However the reply callback will be able to access the argument vector of
+ * the command, so the private data is often not needed. */
+RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
+ return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata);
+}
+
+/* This function is used in order to potentially unblock a client blocked
+ * on keys with RedisModule_BlockClientOnKeys(). When this function is called,
+ * all the clients blocked for this key will get their reply callback called,
+ * and if the callback returns REDISMODULE_OK the client will be unblocked. */
+void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) {
+ signalKeyAsReady(ctx->client->db, key);
+}
+
+/* Implements RM_UnblockClient() and moduleUnblockClient(). */
+int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) {
+ pthread_mutex_lock(&moduleUnblockedClientsMutex);
+ if (!bc->blocked_on_keys) bc->privdata = privdata;
+ bc->unblocked = 1;
+ listAddNodeTail(moduleUnblockedClients,bc);
+ if (write(server.module_blocked_pipe[1],"A",1) != 1) {
+ /* Ignore the error, this is best-effort. */
+ }
+ pthread_mutex_unlock(&moduleUnblockedClientsMutex);
+ return REDISMODULE_OK;
+}
+
+/* This API is used by the Redis core to unblock a client that was blocked
+ * by a module. */
+void moduleUnblockClient(client *c) {
+ RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+ moduleUnblockClientByHandle(bc,NULL);
+}
+
+/* Return true if the client 'c' was blocked by a module using
+ * RM_BlockClientOnKeys(). */
+int moduleClientIsBlockedOnKeys(client *c) {
+ RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+ return bc->blocked_on_keys;
+}
+
/* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
* the reply callbacks to be called in order to reply to the client.
* The 'privdata' argument will be accessible by the reply callback, so
@@ -4076,15 +4278,25 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
* needs to be passed to the client, included but not limited some slow
* to compute reply or some reply obtained via networking.
*
- * Note: this function can be called from threads spawned by the module. */
+ * Note 1: this function can be called from threads spawned by the module.
+ *
+ * Note 2: when we unblock a client that is blocked for keys using
+ * the API RedisModule_BlockClientOnKeys(), the privdata argument here is
+ * not used, and the reply callback is called with the privdata pointer that
+ * was passed when blocking the client.
+ *
+ * Unblocking a client that was blocked for keys using this API will still
+ * require the client to get some reply, so the function will use the
+ * "timeout" handler in order to do so. */
int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
- pthread_mutex_lock(&moduleUnblockedClientsMutex);
- bc->privdata = privdata;
- listAddNodeTail(moduleUnblockedClients,bc);
- if (write(server.module_blocked_pipe[1],"A",1) != 1) {
- /* Ignore the error, this is best-effort. */
- }
- pthread_mutex_unlock(&moduleUnblockedClientsMutex);
+ if (bc->blocked_on_keys) {
+ /* In theory the user should always pass the timeout handler as an
+ * argument, but better to be safe than sorry. */
+ if (bc->timeout_callback == NULL) return REDISMODULE_ERR;
+ if (bc->unblocked) return REDISMODULE_OK;
+ if (bc->client) moduleBlockedClientTimedOut(bc->client);
+ }
+ moduleUnblockClientByHandle(bc,privdata);
return REDISMODULE_OK;
}
@@ -4144,16 +4356,19 @@ void moduleHandleBlockedClients(void) {
* touch the shared list. */
/* Call the reply callback if the client is valid and we have
- * any callback. */
- if (c && bc->reply_callback) {
+ * any callback. However the callback is not called if the client
+ * was blocked on keys (RM_BlockClientOnKeys()), because we already
+ * called such callback in moduleTryServeClientBlockedOnKey() when
+ * the key was signaled as ready. */
+ if (c && !bc->blocked_on_keys && bc->reply_callback) {
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
ctx.blocked_privdata = bc->privdata;
+ ctx.blocked_ready_key = NULL;
ctx.module = bc->module;
ctx.client = bc->client;
ctx.blocked_client = bc;
bc->reply_callback(&ctx,(void**)c->argv,c->argc);
- moduleHandlePropagationAfterCommandCallback(&ctx);
moduleFreeContext(&ctx);
}
@@ -4241,6 +4456,12 @@ void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) {
return ctx->blocked_privdata;
}
+/* Get the key that is ready when the reply callback is called in the context
+ * of a client blocked by RedisModule_BlockClientOnKeys(). */
+RedisModuleString *RM_GetBlockedClientReadyKey(RedisModuleCtx *ctx) {
+ return ctx->blocked_ready_key;
+}
+
/* Get the blocked client associated with a given context.
* This is useful in the reply and timeout callbacks of blocked clients,
* before sometimes the module has the blocked client handle references
@@ -4295,7 +4516,7 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
ctx->client = createClient(NULL);
if (bc) {
selectDb(ctx->client,bc->dbid);
- ctx->client->id = bc->client->id;
+ if (bc->client) ctx->client->id = bc->client->id;
}
return ctx;
}
@@ -5781,8 +6002,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
*
* The following sub events are available:
*
- * REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER
- * REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA
+ * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_MASTER
+ * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_REPLICA
*
* The 'data' field can be casted by the callback to a
* RedisModuleReplicationInfo structure with the following fields:
@@ -5792,24 +6013,30 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* int masterport; // master instance port for NOW_REPLICA
* char *replid1; // Main replication ID
* char *replid2; // Secondary replication ID
+ * uint64_t repl1_offset; // Main replication offset
* uint64_t repl2_offset; // Offset of replid2 validity
- * uint64_t main_repl_offset; // Replication offset
*
* RedisModuleEvent_Persistence
*
* This event is called when RDB saving or AOF rewriting starts
* and ends. The following sub events are available:
*
- * REDISMODULE_EVENT_LOADING_RDB_START // BGSAVE start
- * REDISMODULE_EVENT_LOADING_RDB_END // BGSAVE end
- * REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE start
- * REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE end
- * REDISMODULE_EVENT_LOADING_AOF_START // AOF rewrite start
- * REDISMODULE_EVENT_LOADING_AOF_END // AOF rewrite end
+ * REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START
+ * REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START
+ * REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START
+ * REDISMODULE_SUBEVENT_PERSISTENCE_ENDED
+ * REDISMODULE_SUBEVENT_PERSISTENCE_FAILED
*
* The above events are triggered not just when the user calls the
* relevant commands like BGSAVE, but also when a saving operation
* or AOF rewriting occurs because of internal server triggers.
+ * The SYNC_RDB_START sub events are happening in the forground due to
+ * SAVE command, FLUSHALL, or server shutdown, and the other RDB and
+ * AOF sub events are executed in a background fork child, so any
+ * action the module takes can only affect the generated AOF or RDB,
+ * but will not be reflected in the parent process and affect connected
+ * clients and commands. Also note that the AOF_START sub event may end
+ * up saving RDB content in case of an AOF with rdb-preamble.
*
* RedisModuleEvent_FlushDB
*
@@ -5817,8 +6044,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* because of replication, after the replica synchronization)
* happened. The following sub events are available:
*
- * REDISMODULE_EVENT_FLUSHDB_START
- * REDISMODULE_EVENT_FLUSHDB_END
+ * REDISMODULE_SUBEVENT_FLUSHDB_START
+ * REDISMODULE_SUBEVENT_FLUSHDB_END
*
* The data pointer can be casted to a RedisModuleFlushInfo
* structure with the following fields:
@@ -5842,12 +6069,15 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* replica is loading the RDB file from the master.
* The following sub events are available:
*
- * REDISMODULE_EVENT_LOADING_RDB_START
- * REDISMODULE_EVENT_LOADING_RDB_END
- * REDISMODULE_EVENT_LOADING_MASTER_RDB_START
- * REDISMODULE_EVENT_LOADING_MASTER_RDB_END
- * REDISMODULE_EVENT_LOADING_AOF_START
- * REDISMODULE_EVENT_LOADING_AOF_END
+ * REDISMODULE_SUBEVENT_LOADING_RDB_START
+ * REDISMODULE_SUBEVENT_LOADING_AOF_START
+ * REDISMODULE_SUBEVENT_LOADING_REPL_START
+ * REDISMODULE_SUBEVENT_LOADING_ENDED
+ * REDISMODULE_SUBEVENT_LOADING_FAILED
+ *
+ * Note that AOF loading may start with an RDB data in case of
+ * rdb-preamble, in which case you'll only recieve an AOF_START event.
+ *
*
* RedisModuleEvent_ClientChange
*
@@ -5856,8 +6086,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* structure, documented in RedisModule_GetClientInfoById().
* The following sub events are available:
*
- * REDISMODULE_EVENT_CLIENT_CHANGE_CONNECTED
- * REDISMODULE_EVENT_CLIENT_CHANGE_DISCONNECTED
+ * REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED
+ * REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED
*
* RedisModuleEvent_Shutdown
*
@@ -5870,8 +6100,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* replica since it gets disconnected.
* The following sub events are availble:
*
- * REDISMODULE_EVENT_REPLICA_CHANGE_ONLINE
- * REDISMODULE_EVENT_REPLICA_CHANGE_OFFLINE
+ * REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE
+ * REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE
*
* No additional information is available so far: future versions
* of Redis will have an API in order to enumerate the replicas
@@ -5886,6 +6116,11 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* this changes depending on the "hz" configuration.
* No sub events are available.
*
+ * The data pointer can be casted to a RedisModuleCronLoop
+ * structure with the following fields:
+ *
+ * int32_t hz; // Approximate number of events per second.
+ *
* RedisModuleEvent_MasterLinkChange
*
* This is called for replicas in order to notify when the
@@ -5895,8 +6130,38 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* replication is happening correctly.
* The following sub events are available:
*
- * REDISMODULE_EVENT_MASTER_LINK_UP
- * REDISMODULE_EVENT_MASTER_LINK_DOWN
+ * REDISMODULE_SUBEVENT_MASTER_LINK_UP
+ * REDISMODULE_SUBEVENT_MASTER_LINK_DOWN
+ *
+ * RedisModuleEvent_ModuleChange
+ *
+ * This event is called when a new module is loaded or one is unloaded.
+ * The following sub events are availble:
+ *
+ * REDISMODULE_SUBEVENT_MODULE_LOADED
+ * REDISMODULE_SUBEVENT_MODULE_UNLOADED
+ *
+ * The data pointer can be casted to a RedisModuleModuleChange
+ * structure with the following fields:
+ *
+ * const char* module_name; // Name of module loaded or unloaded.
+ * int32_t module_version; // Module version.
+ *
+ * RedisModuleEvent_LoadingProgress
+ *
+ * This event is called repeatedly called while an RDB or AOF file
+ * is being loaded.
+ * The following sub events are availble:
+ *
+ * REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB
+ * REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF
+ *
+ * The data pointer can be casted to a RedisModuleLoadingProgress
+ * structure with the following fields:
+ *
+ * int32_t hz; // Approximate number of events per second.
+ * int32_t progress; // Approximate progress between 0 and 1024,
+ * or -1 if unknown.
*
* The function returns REDISMODULE_OK if the module was successfully subscrived
* for the specified event. If the API is called from a wrong context then
@@ -5955,7 +6220,7 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
listRewind(RedisModule_EventListeners,&li);
while((ln = listNext(&li))) {
RedisModuleEventListener *el = ln->value;
- if (el->event.id == eid && !el->module->in_hook) {
+ if (el->event.id == eid) {
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.module = el->module;
@@ -5969,6 +6234,8 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
void *moduledata = NULL;
RedisModuleClientInfoV1 civ1;
+ RedisModuleReplicationInfoV1 riv1;
+ RedisModuleModuleChangeV1 mcv1;
/* Start at DB zero by default when calling the handler. It's
* up to the specific event setup to change it when it makes
* sense. For instance for FLUSHDB events we select the correct
@@ -5980,11 +6247,26 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
modulePopulateClientInfoStructure(&civ1,data,
el->event.dataver);
moduledata = &civ1;
+ } else if (eid == REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED) {
+ modulePopulateReplicationInfoStructure(&riv1,el->event.dataver);
+ moduledata = &riv1;
} else if (eid == REDISMODULE_EVENT_FLUSHDB) {
moduledata = data;
RedisModuleFlushInfoV1 *fi = data;
if (fi->dbnum != -1)
selectDb(ctx.client, fi->dbnum);
+ } else if (eid == REDISMODULE_EVENT_MODULE_CHANGE) {
+ RedisModule *m = data;
+ if (m == el->module)
+ continue;
+ mcv1.version = REDISMODULE_MODULE_CHANGE_VERSION;
+ mcv1.module_name = m->name;
+ mcv1.module_version = m->ver;
+ moduledata = &mcv1;
+ } else if (eid == REDISMODULE_EVENT_LOADING_PROGRESS) {
+ moduledata = data;
+ } else if (eid == REDISMODULE_EVENT_CRON_LOOP) {
+ moduledata = data;
}
ModulesInHooks++;
@@ -6016,6 +6298,27 @@ void moduleUnsubscribeAllServerEvents(RedisModule *module) {
}
}
+void processModuleLoadingProgressEvent(int is_aof) {
+ long long now = ustime();
+ static long long next_event = 0;
+ if (now >= next_event) {
+ /* Fire the loading progress modules end event. */
+ int progress = -1;
+ if (server.loading_total_bytes)
+ progress = (server.loading_total_bytes<<10) / server.loading_total_bytes;
+ RedisModuleFlushInfoV1 fi = {REDISMODULE_LOADING_PROGRESS_VERSION,
+ server.hz,
+ progress};
+ moduleFireServerEvent(REDISMODULE_EVENT_LOADING_PROGRESS,
+ is_aof?
+ REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF:
+ REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB,
+ &fi);
+ /* decide when the next event should fire. */
+ next_event = now + 1000000 / server.hz;
+ }
+}
+
/* --------------------------------------------------------------------------
* Modules API internals
* -------------------------------------------------------------------------- */
@@ -6184,6 +6487,11 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) {
ctx.module->blocked_clients = 0;
ctx.module->handle = handle;
serverLog(LL_NOTICE,"Module '%s' loaded from %s",ctx.module->name,path);
+ /* Fire the loaded modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE,
+ REDISMODULE_SUBEVENT_MODULE_LOADED,
+ ctx.module);
+
moduleFreeContext(&ctx);
return C_OK;
}
@@ -6246,6 +6554,11 @@ int moduleUnload(sds name) {
module->name, error);
}
+ /* Fire the unloaded modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE,
+ REDISMODULE_SUBEVENT_MODULE_UNLOADED,
+ module);
+
/* Remove from list of modules. */
serverLog(LL_NOTICE,"Module %s unloaded",module->name);
dictDelete(modules,module->name);
@@ -6494,6 +6807,9 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(StringTruncate);
REGISTER_API(SetExpire);
REGISTER_API(GetExpire);
+ REGISTER_API(ResetDataset);
+ REGISTER_API(DbSize);
+ REGISTER_API(RandomKey);
REGISTER_API(ZsetAdd);
REGISTER_API(ZsetIncrby);
REGISTER_API(ZsetScore);
@@ -6621,7 +6937,11 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(InfoAddFieldLongLong);
REGISTER_API(InfoAddFieldULongLong);
REGISTER_API(GetClientInfoById);
+ REGISTER_API(PublishMessage);
REGISTER_API(SubscribeToServerEvent);
REGISTER_API(SetLRUOrLFU);
REGISTER_API(GetLRUOrLFU);
+ REGISTER_API(BlockClientOnKeys);
+ REGISTER_API(SignalKeyAsReady);
+ REGISTER_API(GetBlockedClientReadyKey);
}