diff options
Diffstat (limited to 'src/module.c')
-rw-r--r-- | src/module.c | 156 |
1 files changed, 155 insertions, 1 deletions
diff --git a/src/module.c b/src/module.c index 545f1a74a..96e65f0be 100644 --- a/src/module.c +++ b/src/module.c @@ -216,6 +216,28 @@ static list *moduleUnblockedClients; * allow thread safe contexts to execute commands at a safe moment. */ static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER; + +/* Function pointer type for keyspace event notification subscriptions from modules. */ +typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); + +/* Keyspace notification subscriber information. See RM_SubscribeToKeyspaceEvents */ +typedef struct RedisModuleKeyspaceSubscriber { + /* The module subscribed to the event */ + RedisModule *module; + /* Notification callback in the module*/ + RedisModuleNotificationFunc notify_callback; + /* A bit mask of the events the module is interested in */ + int event_mask; + /* Active flag set on entry, to avoid reentrant subscribers calling themselves */ + int active; +} RedisModuleKeyspaceSubscriber; + +/* The module keyspace notification subscribers list */ +static list *moduleKeyspaceSubscribers; + +/* Static client recycled for all notification clients, to avoid allocating per round. */ +static client *moduleKeyspaceSubscribersClient; + /* -------------------------------------------------------------------------- * Prototypes * -------------------------------------------------------------------------- */ @@ -3669,6 +3691,126 @@ void moduleReleaseGIL(void) { pthread_mutex_unlock(&moduleGIL); } + +/* -------------------------------------------------------------------------- + * Module Keyspace Notifications API + * -------------------------------------------------------------------------- */ + +/* Subscribe to keyspace notifications. This is a low-level version of the + * keyspace-notifications API. A module cand register callbacks to be notified + * when keyspce events occur. + * + * Notification events are filtered by their type (string events, set events, + * etc), and the subsriber callback receives only events that match a specific + * mask of event types. + * + * When subscribing to notifications with RedisModule_SubscribeToKeyspaceEvents + * the module must provide an event type-mask, denoting the events the subscriber + * is interested in. This can be an ORed mask of any of the following flags: + * + * - REDISMODULE_NOTIFY_GENERIC: Generic commands like DEL, EXPIRE, RENAME + * - REDISMODULE_NOTIFY_STRING: String events + * - REDISMODULE_NOTIFY_LIST: List events + * - REDISMODULE_NOTIFY_SET: Set events + * - REDISMODULE_NOTIFY_HASH: Hash events + * - REDISMODULE_NOTIFY_ZSET: Sorted Set events + * - REDISMODULE_NOTIFY_EXPIRED: Expiration events + * - REDISMODULE_NOTIFY_EVICTED: Eviction events + * - REDISMODULE_NOTIFY_STREAM: Stream events + * - REDISMODULE_NOTIFY_ALL: All events + * + * We do not distinguish between key events and keyspace events, and it is up + * to the module to filter the actions taken based on the key. + * + * The subscriber signature is: + * + * int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, + * const char *event, + * RedisModuleString *key); + * + * `type` is the event type bit, that must match the mask given at registration + * time. The event string is the actual command being executed, and key is the + * relevant Redis key. + * + * Notification callback gets executed with a redis context that can not be + * used to send anything to the client, and has the db number where the event + * occured as its selected db number. + * + * Notice that it is not necessary to enable norifications in redis.conf for + * module notifications to work. + * + * Warning: the notification callbacks are performed in a synchronous manner, + * so notification callbacks must to be fast, or they would slow Redis down. + * If you need to take long actions, use threads to offload them. + * + * See https://redis.io/topics/notifications for more information. + */ +int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc callback) { + RedisModuleKeyspaceSubscriber *sub = zmalloc(sizeof(*sub)); + sub->module = ctx->module; + sub->event_mask = types; + sub->notify_callback = callback; + sub->active = 0; + + listAddNodeTail(moduleKeyspaceSubscribers, sub); + return REDISMODULE_OK; + +} + +/* Dispatcher for keyspace notifications to module subscriber functions. + * This gets called only if at least one module requested to be notified on + * keyspace notifications */ +void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) { + + /* Don't do anything if there aren't any subscribers */ + if (listLength(moduleKeyspaceSubscribers) == 0) return; + + listIter li; + listNode *ln; + + listRewind(moduleKeyspaceSubscribers,&li); + + /* Remove irrelevant flags from the type mask */ + type &= ~(NOTIFY_KEYEVENT | NOTIFY_KEYSPACE); + + + while((ln = listNext(&li))) { + RedisModuleKeyspaceSubscriber *sub = ln->value; + /* Only notify subscribers on events matching they registration, + * and avoid subscribers triggering themselves */ + if ((sub->event_mask & type) && sub->active == 0) { + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + ctx.module = sub->module; + ctx.client = moduleKeyspaceSubscribersClient; + selectDb(ctx.client, dbid); + + /* mark the handler as activer to avoid reentrant loops. + * If the subscriber performs an action triggering itself, + * it will not be notified about it. */ + sub->active = 1; + sub->notify_callback(&ctx, type, event, key); + sub->active = 0; + moduleFreeContext(&ctx); + } + } + +} + +/* Unsubscribe any notification subscirbers this module has upon unloading */ +void moduleUnsubscribeNotifications(RedisModule *module) { + listIter li; + listNode *ln; + listRewind(moduleKeyspaceSubscribers,&li); + while((ln = listNext(&li))) { + RedisModuleKeyspaceSubscriber *sub = ln->value; + if (sub->module == module) { + listDelNode(moduleKeyspaceSubscribers, ln); + zfree(sub); + } + } +} + + /* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */ @@ -3706,9 +3848,15 @@ void moduleRegisterCoreAPI(void); void moduleInitModulesSystem(void) { moduleUnblockedClients = listCreate(); - + server.loadmodule_queue = listCreate(); modules = dictCreate(&modulesDictType,NULL); + + /* Set up the keyspace notification susbscriber list and static client */ + moduleKeyspaceSubscribers = listCreate(); + moduleKeyspaceSubscribersClient = createClient(-1); + moduleKeyspaceSubscribersClient->flags |= CLIENT_MODULE; + moduleRegisterCoreAPI(); if (pipe(server.module_blocked_pipe) == -1) { serverLog(LL_WARNING, @@ -3759,6 +3907,7 @@ void moduleFreeModuleStructure(struct RedisModule *module) { zfree(module); } + void moduleUnregisterCommands(struct RedisModule *module) { /* Unregister all the commands registered by this module. */ dictIterator *di = dictGetSafeIterator(server.commands); @@ -3819,6 +3968,7 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) { return C_OK; } + /* Unload the module registered with the specified name. On success * C_OK is returned, otherwise C_ERR is returned and errno is set * to the following values depending on the type of error: @@ -3840,6 +3990,9 @@ int moduleUnload(sds name) { moduleUnregisterCommands(module); + /* Remvoe any noification subscribers this module might have */ + moduleUnsubscribeNotifications(module); + /* Unregister all the hooks. TODO: Yet no hooks support here. */ /* Unload the dynamic library. */ @@ -4037,4 +4190,5 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(DigestAddStringBuffer); REGISTER_API(DigestAddLongLong); REGISTER_API(DigestEndSequence); + REGISTER_API(SubscribeToKeyspaceEvents); } |