diff options
Diffstat (limited to 'src/module.c')
-rw-r--r-- | src/module.c | 426 |
1 files changed, 373 insertions, 53 deletions
diff --git a/src/module.c b/src/module.c index 09cf6ac7a..3a161c05f 100644 --- a/src/module.c +++ b/src/module.c @@ -140,6 +140,9 @@ struct RedisModuleCtx { void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */ int postponed_arrays_count; /* Number of entries in postponed_arrays. */ void *blocked_privdata; /* Privdata set when unblocking a client. */ + RedisModuleString *blocked_ready_key; /* Key ready when the reply callback + gets called for clients blocked + on keys. */ /* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */ int *keys_pos; @@ -153,7 +156,7 @@ struct RedisModuleCtx { }; typedef struct RedisModuleCtx RedisModuleCtx; -#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL, {0}} +#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, NULL, 0, NULL, {0}} #define REDISMODULE_CTX_MULTI_EMITTED (1<<0) #define REDISMODULE_CTX_AUTO_MEMORY (1<<1) #define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2) @@ -245,6 +248,8 @@ typedef struct RedisModuleBlockedClient { client *reply_client; /* Fake client used to accumulate replies in thread safe contexts. */ int dbid; /* Database number selected by the original client. */ + int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */ + int unblocked; /* Already on the moduleUnblocked list. */ } RedisModuleBlockedClient; static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER; @@ -1620,6 +1625,27 @@ int modulePopulateClientInfoStructure(void *ci, client *client, int structver) { return REDISMODULE_OK; } +/* This is an helper for moduleFireServerEvent() and other functions: + * It populates the replication info structure with the appropriate + * fields depending on the version provided. If the version is not valid + * then REDISMODULE_ERR is returned. Otherwise the function returns + * REDISMODULE_OK and the structure pointed by 'ri' gets populated. */ +int modulePopulateReplicationInfoStructure(void *ri, int structver) { + if (structver != 1) return REDISMODULE_ERR; + + RedisModuleReplicationInfoV1 *ri1 = ri; + memset(ri1,0,sizeof(*ri1)); + ri1->version = structver; + ri1->master = server.masterhost==NULL; + ri1->masterhost = server.masterhost? server.masterhost: ""; + ri1->masterport = server.masterport; + ri1->replid1 = server.replid; + ri1->replid2 = server.replid2; + ri1->repl1_offset = server.master_repl_offset; + ri1->repl2_offset = server.second_replid_offset; + return REDISMODULE_OK; +} + /* Return information about the client with the specified ID (that was * previously obtained via the RedisModule_GetClientId() API). If the * client exists, REDISMODULE_OK is returned, otherwise REDISMODULE_ERR @@ -1672,6 +1698,15 @@ int RM_GetClientInfoById(void *ci, uint64_t id) { return modulePopulateClientInfoStructure(ci,client,structver); } +/* Publish a message to subscribers (see PUBLISH command). */ +int RM_PublishMessage(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) { + UNUSED(ctx); + int receivers = pubsubPublishMessage(channel, message); + if (server.cluster_enabled) + clusterPropagatePublish(channel, message); + return receivers; +} + /* Return the currently selected DB. */ int RM_GetSelectedDb(RedisModuleCtx *ctx) { return ctx->client->db->id; @@ -1960,6 +1995,28 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) { return REDISMODULE_OK; } +/* Performs similar operation to FLUSHALL, and optionally start a new AOF file (if enabled) + * If restart_aof is true, you must make sure the command that triggered this call is not + * propagated to the AOF file. + * When async is set to true, db contents will be freed by a background thread. */ +void RM_ResetDataset(int restart_aof, int async) { + if (restart_aof && server.aof_state != AOF_OFF) stopAppendOnly(); + flushAllDataAndResetRDB(async? EMPTYDB_ASYNC: EMPTYDB_NO_FLAGS); + if (server.aof_enabled && restart_aof) restartAOFAfterSYNC(); +} + +/* Returns the number of keys in the current db. */ +unsigned long long RM_DbSize(RedisModuleCtx *ctx) { + return dictSize(ctx->client->db->dict); +} + +/* Returns a name of a random key, or NULL if current db is empty. */ +RedisModuleString *RM_RandomKey(RedisModuleCtx *ctx) { + robj *key = dbRandomKey(ctx->client->db); + autoMemoryAdd(ctx,REDISMODULE_AM_STRING,key); + return key; +} + /* -------------------------------------------------------------------------- * Key API for String type * -------------------------------------------------------------------------- */ @@ -4014,23 +4071,26 @@ void unblockClientFromModule(client *c) { resetClient(c); } -/* Block a client in the context of a blocking command, returning an handle - * which will be used, later, in order to unblock the client with a call to - * RedisModule_UnblockClient(). The arguments specify callback functions - * and a timeout after which the client is unblocked. +/* Block a client in the context of a module: this function implements both + * RM_BlockClient() and RM_BlockClientOnKeys() depending on the fact the + * keys are passed or not. * - * The callbacks are called in the following contexts: + * When not blocking for keys, the keys, numkeys, and privdata parameters are + * not needed. The privdata in that case must be NULL, since later is + * RM_UnblockClient() that will provide some private data that the reply + * callback will receive. * - * reply_callback: called after a successful RedisModule_UnblockClient() - * call in order to reply to the client and unblock it. - * - * reply_timeout: called when the timeout is reached in order to send an - * error to the client. + * Instead when blocking for keys, normally RM_UnblockClient() will not be + * called (because the client will unblock when the key is modified), so + * 'privdata' should be provided in that case, so that once the client is + * unlocked and the reply callback is called, it will receive its associated + * private data. * - * free_privdata: called in order to free the private data that is passed - * by RedisModule_UnblockClient() call. + * Even when blocking on keys, RM_UnblockClient() can be called however, but + * in that case the privdata argument is disregarded, because we pass the + * reply callback the privdata that is set here while blocking. */ -RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) { +RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) { client *c = ctx->client; int islua = c->flags & CLIENT_LUA; int ismulti = c->flags & CLIENT_MULTI; @@ -4043,17 +4103,20 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc * commands from Lua or MULTI. We actually create an already aborted * (client set to NULL) blocked client handle, and actually reply with * an error. */ + mstime_t timeout = timeout_ms ? (mstime()+timeout_ms) : 0; bc->client = (islua || ismulti) ? NULL : c; bc->module = ctx->module; bc->reply_callback = reply_callback; bc->timeout_callback = timeout_callback; bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */ bc->free_privdata = free_privdata; - bc->privdata = NULL; + bc->privdata = privdata; bc->reply_client = createClient(NULL); bc->reply_client->flags |= CLIENT_MODULE; bc->dbid = c->db->id; - c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0; + bc->blocked_on_keys = keys != NULL; + bc->unblocked = 0; + c->bpop.timeout = timeout; if (islua || ismulti) { c->bpop.module_blocked_handle = NULL; @@ -4061,11 +4124,150 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc "Blocking module command called from Lua script" : "Blocking module command called from transaction"); } else { - blockClient(c,BLOCKED_MODULE); + if (keys) { + blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,NULL,NULL); + } else { + blockClient(c,BLOCKED_MODULE); + } } return bc; } +/* This function is called from module.c in order to check if a module + * blocked for BLOCKED_MODULE and subtype 'on keys' (bc->blocked_on_keys true) + * can really be unblocked, since the module was able to serve the client. + * If the callback returns REDISMODULE_OK, then the client can be unblocked, + * otherwise the client remains blocked and we'll retry again when one of + * the keys it blocked for becomes "ready" again. */ +int moduleTryServeClientBlockedOnKey(client *c, robj *key) { + int served = 0; + RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + /* Protect against re-processing: don't serve clients that are already + * in the unblocking list for any reason (including RM_UnblockClient() + * explicit call). */ + if (bc->unblocked) return REDISMODULE_ERR; + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY; + ctx.blocked_ready_key = key; + ctx.blocked_privdata = bc->privdata; + ctx.module = bc->module; + ctx.client = bc->client; + ctx.blocked_client = bc; + if (bc->reply_callback(&ctx,(void**)c->argv,c->argc) == REDISMODULE_OK) + served = 1; + moduleFreeContext(&ctx); + return served; +} + +/* Block a client in the context of a blocking command, returning an handle + * which will be used, later, in order to unblock the client with a call to + * RedisModule_UnblockClient(). The arguments specify callback functions + * and a timeout after which the client is unblocked. + * + * The callbacks are called in the following contexts: + * + * reply_callback: called after a successful RedisModule_UnblockClient() + * call in order to reply to the client and unblock it. + * + * reply_timeout: called when the timeout is reached in order to send an + * error to the client. + * + * free_privdata: called in order to free the private data that is passed + * by RedisModule_UnblockClient() call. + */ +RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) { + return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL); +} + +/* This call is similar to RedisModule_BlockClient(), however in this case we + * don't just block the client, but also ask Redis to unblock it automatically + * once certain keys become "ready", that is, contain more data. + * + * Basically this is similar to what a typical Redis command usually does, + * like BLPOP or ZPOPMAX: the client blocks if it cannot be served ASAP, + * and later when the key receives new data (a list push for instance), the + * client is unblocked and served. + * + * However in the case of this module API, when the client is unblocked? + * + * 1. If you block ok a key of a type that has blocking operations associated, + * like a list, a sorted set, a stream, and so forth, the client may be + * unblocked once the relevant key is targeted by an operation that normally + * unblocks the native blocking operations for that type. So if we block + * on a list key, an RPUSH command may unblock our client and so forth. + * 2. If you are implementing your native data type, or if you want to add new + * unblocking conditions in addition to "1", you can call the modules API + * RedisModule_SignalKeyAsReady(). + * + * Anyway we can't be sure if the client should be unblocked just because the + * key is signaled as ready: for instance a successive operation may change the + * key, or a client in queue before this one can be served, modifying the key + * as well and making it empty again. So when a client is blocked with + * RedisModule_BlockClientOnKeys() the reply callback is not called after + * RM_UnblockCLient() is called, but every time a key is signaled as ready: + * if the reply callback can serve the client, it returns REDISMODULE_OK + * and the client is unblocked, otherwise it will return REDISMODULE_ERR + * and we'll try again later. + * + * The reply callback can access the key that was signaled as ready by + * calling the API RedisModule_GetBlockedClientReadyKey(), that returns + * just the string name of the key as a RedisModuleString object. + * + * Thanks to this system we can setup complex blocking scenarios, like + * unblocking a client only if a list contains at least 5 items or other + * more fancy logics. + * + * Note that another difference with RedisModule_BlockClient(), is that here + * we pass the private data directly when blocking the client: it will + * be accessible later in the reply callback. Normally when blocking with + * RedisModule_BlockClient() the private data to reply to the client is + * passed when calling RedisModule_UnblockClient() but here the unblocking + * is performed by Redis itself, so we need to have some private data before + * hand. The private data is used to store any information about the specific + * unblocking operation that you are implementing. Such information will be + * freed using the free_privdata callback provided by the user. + * + * However the reply callback will be able to access the argument vector of + * the command, so the private data is often not needed. */ +RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) { + return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata); +} + +/* This function is used in order to potentially unblock a client blocked + * on keys with RedisModule_BlockClientOnKeys(). When this function is called, + * all the clients blocked for this key will get their reply callback called, + * and if the callback returns REDISMODULE_OK the client will be unblocked. */ +void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) { + signalKeyAsReady(ctx->client->db, key); +} + +/* Implements RM_UnblockClient() and moduleUnblockClient(). */ +int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) { + pthread_mutex_lock(&moduleUnblockedClientsMutex); + if (!bc->blocked_on_keys) bc->privdata = privdata; + bc->unblocked = 1; + listAddNodeTail(moduleUnblockedClients,bc); + if (write(server.module_blocked_pipe[1],"A",1) != 1) { + /* Ignore the error, this is best-effort. */ + } + pthread_mutex_unlock(&moduleUnblockedClientsMutex); + return REDISMODULE_OK; +} + +/* This API is used by the Redis core to unblock a client that was blocked + * by a module. */ +void moduleUnblockClient(client *c) { + RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + moduleUnblockClientByHandle(bc,NULL); +} + +/* Return true if the client 'c' was blocked by a module using + * RM_BlockClientOnKeys(). */ +int moduleClientIsBlockedOnKeys(client *c) { + RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + return bc->blocked_on_keys; +} + /* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger * the reply callbacks to be called in order to reply to the client. * The 'privdata' argument will be accessible by the reply callback, so @@ -4076,15 +4278,25 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc * needs to be passed to the client, included but not limited some slow * to compute reply or some reply obtained via networking. * - * Note: this function can be called from threads spawned by the module. */ + * Note 1: this function can be called from threads spawned by the module. + * + * Note 2: when we unblock a client that is blocked for keys using + * the API RedisModule_BlockClientOnKeys(), the privdata argument here is + * not used, and the reply callback is called with the privdata pointer that + * was passed when blocking the client. + * + * Unblocking a client that was blocked for keys using this API will still + * require the client to get some reply, so the function will use the + * "timeout" handler in order to do so. */ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) { - pthread_mutex_lock(&moduleUnblockedClientsMutex); - bc->privdata = privdata; - listAddNodeTail(moduleUnblockedClients,bc); - if (write(server.module_blocked_pipe[1],"A",1) != 1) { - /* Ignore the error, this is best-effort. */ - } - pthread_mutex_unlock(&moduleUnblockedClientsMutex); + if (bc->blocked_on_keys) { + /* In theory the user should always pass the timeout handler as an + * argument, but better to be safe than sorry. */ + if (bc->timeout_callback == NULL) return REDISMODULE_ERR; + if (bc->unblocked) return REDISMODULE_OK; + if (bc->client) moduleBlockedClientTimedOut(bc->client); + } + moduleUnblockClientByHandle(bc,privdata); return REDISMODULE_OK; } @@ -4144,16 +4356,19 @@ void moduleHandleBlockedClients(void) { * touch the shared list. */ /* Call the reply callback if the client is valid and we have - * any callback. */ - if (c && bc->reply_callback) { + * any callback. However the callback is not called if the client + * was blocked on keys (RM_BlockClientOnKeys()), because we already + * called such callback in moduleTryServeClientBlockedOnKey() when + * the key was signaled as ready. */ + if (c && !bc->blocked_on_keys && bc->reply_callback) { RedisModuleCtx ctx = REDISMODULE_CTX_INIT; ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY; ctx.blocked_privdata = bc->privdata; + ctx.blocked_ready_key = NULL; ctx.module = bc->module; ctx.client = bc->client; ctx.blocked_client = bc; bc->reply_callback(&ctx,(void**)c->argv,c->argc); - moduleHandlePropagationAfterCommandCallback(&ctx); moduleFreeContext(&ctx); } @@ -4241,6 +4456,12 @@ void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) { return ctx->blocked_privdata; } +/* Get the key that is ready when the reply callback is called in the context + * of a client blocked by RedisModule_BlockClientOnKeys(). */ +RedisModuleString *RM_GetBlockedClientReadyKey(RedisModuleCtx *ctx) { + return ctx->blocked_ready_key; +} + /* Get the blocked client associated with a given context. * This is useful in the reply and timeout callbacks of blocked clients, * before sometimes the module has the blocked client handle references @@ -4295,7 +4516,7 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) { ctx->client = createClient(NULL); if (bc) { selectDb(ctx->client,bc->dbid); - ctx->client->id = bc->client->id; + if (bc->client) ctx->client->id = bc->client->id; } return ctx; } @@ -5781,8 +6002,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * * The following sub events are available: * - * REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER - * REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA + * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_MASTER + * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_REPLICA * * The 'data' field can be casted by the callback to a * RedisModuleReplicationInfo structure with the following fields: @@ -5792,24 +6013,30 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * int masterport; // master instance port for NOW_REPLICA * char *replid1; // Main replication ID * char *replid2; // Secondary replication ID + * uint64_t repl1_offset; // Main replication offset * uint64_t repl2_offset; // Offset of replid2 validity - * uint64_t main_repl_offset; // Replication offset * * RedisModuleEvent_Persistence * * This event is called when RDB saving or AOF rewriting starts * and ends. The following sub events are available: * - * REDISMODULE_EVENT_LOADING_RDB_START // BGSAVE start - * REDISMODULE_EVENT_LOADING_RDB_END // BGSAVE end - * REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE start - * REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE end - * REDISMODULE_EVENT_LOADING_AOF_START // AOF rewrite start - * REDISMODULE_EVENT_LOADING_AOF_END // AOF rewrite end + * REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START + * REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START + * REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START + * REDISMODULE_SUBEVENT_PERSISTENCE_ENDED + * REDISMODULE_SUBEVENT_PERSISTENCE_FAILED * * The above events are triggered not just when the user calls the * relevant commands like BGSAVE, but also when a saving operation * or AOF rewriting occurs because of internal server triggers. + * The SYNC_RDB_START sub events are happening in the forground due to + * SAVE command, FLUSHALL, or server shutdown, and the other RDB and + * AOF sub events are executed in a background fork child, so any + * action the module takes can only affect the generated AOF or RDB, + * but will not be reflected in the parent process and affect connected + * clients and commands. Also note that the AOF_START sub event may end + * up saving RDB content in case of an AOF with rdb-preamble. * * RedisModuleEvent_FlushDB * @@ -5817,8 +6044,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * because of replication, after the replica synchronization) * happened. The following sub events are available: * - * REDISMODULE_EVENT_FLUSHDB_START - * REDISMODULE_EVENT_FLUSHDB_END + * REDISMODULE_SUBEVENT_FLUSHDB_START + * REDISMODULE_SUBEVENT_FLUSHDB_END * * The data pointer can be casted to a RedisModuleFlushInfo * structure with the following fields: @@ -5842,12 +6069,15 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * replica is loading the RDB file from the master. * The following sub events are available: * - * REDISMODULE_EVENT_LOADING_RDB_START - * REDISMODULE_EVENT_LOADING_RDB_END - * REDISMODULE_EVENT_LOADING_MASTER_RDB_START - * REDISMODULE_EVENT_LOADING_MASTER_RDB_END - * REDISMODULE_EVENT_LOADING_AOF_START - * REDISMODULE_EVENT_LOADING_AOF_END + * REDISMODULE_SUBEVENT_LOADING_RDB_START + * REDISMODULE_SUBEVENT_LOADING_AOF_START + * REDISMODULE_SUBEVENT_LOADING_REPL_START + * REDISMODULE_SUBEVENT_LOADING_ENDED + * REDISMODULE_SUBEVENT_LOADING_FAILED + * + * Note that AOF loading may start with an RDB data in case of + * rdb-preamble, in which case you'll only recieve an AOF_START event. + * * * RedisModuleEvent_ClientChange * @@ -5856,8 +6086,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * structure, documented in RedisModule_GetClientInfoById(). * The following sub events are available: * - * REDISMODULE_EVENT_CLIENT_CHANGE_CONNECTED - * REDISMODULE_EVENT_CLIENT_CHANGE_DISCONNECTED + * REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED + * REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED * * RedisModuleEvent_Shutdown * @@ -5870,8 +6100,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * replica since it gets disconnected. * The following sub events are availble: * - * REDISMODULE_EVENT_REPLICA_CHANGE_ONLINE - * REDISMODULE_EVENT_REPLICA_CHANGE_OFFLINE + * REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE + * REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE * * No additional information is available so far: future versions * of Redis will have an API in order to enumerate the replicas @@ -5886,6 +6116,11 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * this changes depending on the "hz" configuration. * No sub events are available. * + * The data pointer can be casted to a RedisModuleCronLoop + * structure with the following fields: + * + * int32_t hz; // Approximate number of events per second. + * * RedisModuleEvent_MasterLinkChange * * This is called for replicas in order to notify when the @@ -5895,8 +6130,38 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * replication is happening correctly. * The following sub events are available: * - * REDISMODULE_EVENT_MASTER_LINK_UP - * REDISMODULE_EVENT_MASTER_LINK_DOWN + * REDISMODULE_SUBEVENT_MASTER_LINK_UP + * REDISMODULE_SUBEVENT_MASTER_LINK_DOWN + * + * RedisModuleEvent_ModuleChange + * + * This event is called when a new module is loaded or one is unloaded. + * The following sub events are availble: + * + * REDISMODULE_SUBEVENT_MODULE_LOADED + * REDISMODULE_SUBEVENT_MODULE_UNLOADED + * + * The data pointer can be casted to a RedisModuleModuleChange + * structure with the following fields: + * + * const char* module_name; // Name of module loaded or unloaded. + * int32_t module_version; // Module version. + * + * RedisModuleEvent_LoadingProgress + * + * This event is called repeatedly called while an RDB or AOF file + * is being loaded. + * The following sub events are availble: + * + * REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB + * REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF + * + * The data pointer can be casted to a RedisModuleLoadingProgress + * structure with the following fields: + * + * int32_t hz; // Approximate number of events per second. + * int32_t progress; // Approximate progress between 0 and 1024, + * or -1 if unknown. * * The function returns REDISMODULE_OK if the module was successfully subscrived * for the specified event. If the API is called from a wrong context then @@ -5955,7 +6220,7 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) { listRewind(RedisModule_EventListeners,&li); while((ln = listNext(&li))) { RedisModuleEventListener *el = ln->value; - if (el->event.id == eid && !el->module->in_hook) { + if (el->event.id == eid) { RedisModuleCtx ctx = REDISMODULE_CTX_INIT; ctx.module = el->module; @@ -5969,6 +6234,8 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) { void *moduledata = NULL; RedisModuleClientInfoV1 civ1; + RedisModuleReplicationInfoV1 riv1; + RedisModuleModuleChangeV1 mcv1; /* Start at DB zero by default when calling the handler. It's * up to the specific event setup to change it when it makes * sense. For instance for FLUSHDB events we select the correct @@ -5980,11 +6247,26 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) { modulePopulateClientInfoStructure(&civ1,data, el->event.dataver); moduledata = &civ1; + } else if (eid == REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED) { + modulePopulateReplicationInfoStructure(&riv1,el->event.dataver); + moduledata = &riv1; } else if (eid == REDISMODULE_EVENT_FLUSHDB) { moduledata = data; RedisModuleFlushInfoV1 *fi = data; if (fi->dbnum != -1) selectDb(ctx.client, fi->dbnum); + } else if (eid == REDISMODULE_EVENT_MODULE_CHANGE) { + RedisModule *m = data; + if (m == el->module) + continue; + mcv1.version = REDISMODULE_MODULE_CHANGE_VERSION; + mcv1.module_name = m->name; + mcv1.module_version = m->ver; + moduledata = &mcv1; + } else if (eid == REDISMODULE_EVENT_LOADING_PROGRESS) { + moduledata = data; + } else if (eid == REDISMODULE_EVENT_CRON_LOOP) { + moduledata = data; } ModulesInHooks++; @@ -6016,6 +6298,27 @@ void moduleUnsubscribeAllServerEvents(RedisModule *module) { } } +void processModuleLoadingProgressEvent(int is_aof) { + long long now = ustime(); + static long long next_event = 0; + if (now >= next_event) { + /* Fire the loading progress modules end event. */ + int progress = -1; + if (server.loading_total_bytes) + progress = (server.loading_total_bytes<<10) / server.loading_total_bytes; + RedisModuleFlushInfoV1 fi = {REDISMODULE_LOADING_PROGRESS_VERSION, + server.hz, + progress}; + moduleFireServerEvent(REDISMODULE_EVENT_LOADING_PROGRESS, + is_aof? + REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF: + REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB, + &fi); + /* decide when the next event should fire. */ + next_event = now + 1000000 / server.hz; + } +} + /* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */ @@ -6184,6 +6487,11 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) { ctx.module->blocked_clients = 0; ctx.module->handle = handle; serverLog(LL_NOTICE,"Module '%s' loaded from %s",ctx.module->name,path); + /* Fire the loaded modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE, + REDISMODULE_SUBEVENT_MODULE_LOADED, + ctx.module); + moduleFreeContext(&ctx); return C_OK; } @@ -6246,6 +6554,11 @@ int moduleUnload(sds name) { module->name, error); } + /* Fire the unloaded modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE, + REDISMODULE_SUBEVENT_MODULE_UNLOADED, + module); + /* Remove from list of modules. */ serverLog(LL_NOTICE,"Module %s unloaded",module->name); dictDelete(modules,module->name); @@ -6494,6 +6807,9 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(StringTruncate); REGISTER_API(SetExpire); REGISTER_API(GetExpire); + REGISTER_API(ResetDataset); + REGISTER_API(DbSize); + REGISTER_API(RandomKey); REGISTER_API(ZsetAdd); REGISTER_API(ZsetIncrby); REGISTER_API(ZsetScore); @@ -6621,7 +6937,11 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(InfoAddFieldLongLong); REGISTER_API(InfoAddFieldULongLong); REGISTER_API(GetClientInfoById); + REGISTER_API(PublishMessage); REGISTER_API(SubscribeToServerEvent); REGISTER_API(SetLRUOrLFU); REGISTER_API(GetLRUOrLFU); + REGISTER_API(BlockClientOnKeys); + REGISTER_API(SignalKeyAsReady); + REGISTER_API(GetBlockedClientReadyKey); } |