summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMeir Shpilraien (Spielrein) <meir@redis.com>2022-11-24 19:00:04 +0200
committerGitHub <noreply@github.com>2022-11-24 19:00:04 +0200
commitabc345ad2837cb36ade137982859b6a8666b2735 (patch)
tree7a4636c7175cc030eea0fce2b8e740670d7f6121
parentae1de549006c1f15bade4969ba25932e3509f17a (diff)
downloadredis-abc345ad2837cb36ade137982859b6a8666b2735.tar.gz
Module API to allow writes after key space notification hooks (#11199)
### Summary of API additions * `RedisModule_AddPostNotificationJob` - new API to call inside a key space notification (and on more locations in the future) and allow to add a post job as describe above. * New module option, `REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS`, allows to disable Redis protection of nested key-space notifications. * `RedisModule_GetModuleOptionsAll` - gets the mask of all supported module options so a module will be able to check if a given option is supported by the current running Redis instance. ### Background The following PR is a proposal of handling write operations inside module key space notifications. After a lot of discussions we came to a conclusion that module should not perform any write operations on key space notification. Some examples of issues that such write operation can cause are describe on the following links: * Bad replication oreder - https://github.com/redis/redis/pull/10969 * Used after free - https://github.com/redis/redis/pull/10969#issuecomment-1223771006 * Used after free - https://github.com/redis/redis/pull/9406#issuecomment-1221684054 There are probably more issues that are yet to be discovered. The underline problem with writing inside key space notification is that the notification runs synchronously, this means that the notification code will be executed in the middle on Redis logic (commands logic, eviction, expire). Redis **do not assume** that the data might change while running the logic and such changes can crash Redis or cause unexpected behaviour. The solution is to state that modules **should not** perform any write command inside key space notification (we can chose whether or not we want to force it). To still cover the use-case where module wants to perform a write operation as a reaction to key space notifications, we introduce a new API , `RedisModule_AddPostNotificationJob`, that allows to register a callback that will be called by Redis when the following conditions hold: * It is safe to perform any write operation. * The job will be called atomically along side the operation that triggers it (in our case, key space notification). Module can use this new API to safely perform any write operation and still achieve atomicity between the notification and the write. Although currently the API is supported on key space notifications, the API is written in a generic way so that in the future we will be able to use it on other places (server events for example). ### Technical Details Whenever a module uses `RedisModule_AddPostNotificationJob` the callback is added to a list of callbacks (called `modulePostExecUnitJobs`) that need to be invoke after the current execution unit ends (whether its a command, eviction, or active expire). In order to trigger those callback atomically with the notification effect, we call those callbacks on `postExecutionUnitOperations` (which was `propagatePendingCommands` before this PR). The new function fires the post jobs and then calls `propagatePendingCommands`. If the callback perform more operations that triggers more key space notifications. Those keys space notifications might register more callbacks. Those callbacks will be added to the end of `modulePostExecUnitJobs` list and will be invoke atomically after the current callback ends. This raises a concerns of entering an infinite loops, we consider infinite loops as a logical bug that need to be fixed in the module, an attempt to protect against infinite loops by halting the execution could result in violation of the feature correctness and so **Redis will make no attempt to protect the module from infinite loops** In addition, currently key space notifications are not nested. Some modules might want to allow nesting key-space notifications. To allow that and keep backward compatibility, we introduce a new module option called `REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS`. Setting this option will disable the Redis key-space notifications nesting protection and will pass this responsibility to the module. ### Redis infrastructure This PR promotes the existing `propagatePendingCommands` to an "Execution Unit" concept, which is called after each atomic unit of execution, Co-authored-by: Oran Agra <oran@redislabs.com> Co-authored-by: Yossi Gottlieb <yossigo@gmail.com> Co-authored-by: Madelyn Olson <34459052+madolson@users.noreply.github.com>
-rwxr-xr-xruntest-moduleapi1
-rw-r--r--src/blocked.c2
-rw-r--r--src/cluster.c3
-rw-r--r--src/evict.c2
-rw-r--r--src/expire.c2
-rw-r--r--src/module.c107
-rw-r--r--src/redismodule.h14
-rw-r--r--src/server.c32
-rw-r--r--src/server.h3
-rw-r--r--tests/modules/Makefile3
-rw-r--r--tests/modules/keyspace_events.c51
-rw-r--r--tests/modules/postnotifications.c236
-rw-r--r--tests/unit/moduleapi/keyspace_events.tcl4
-rw-r--r--tests/unit/moduleapi/postnotifications.tcl205
14 files changed, 651 insertions, 14 deletions
diff --git a/runtest-moduleapi b/runtest-moduleapi
index e58c58099..4b4f72d10 100755
--- a/runtest-moduleapi
+++ b/runtest-moduleapi
@@ -51,4 +51,5 @@ $TCLSH tests/test_helper.tcl \
--single unit/moduleapi/timer \
--single unit/moduleapi/publish \
--single unit/moduleapi/usercall \
+--single unit/moduleapi/postnotifications \
"${@}"
diff --git a/src/blocked.c b/src/blocked.c
index 497ffe4ce..069d3ba95 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -635,7 +635,7 @@ void handleClientsBlockedOnKeys(void) {
if (!o) {
/* Edge case: If lookupKeyReadWithFlags decides to expire the key we have to
* take care of the propagation here, because afterCommand wasn't called */
- propagatePendingCommands();
+ postExecutionUnitOperations();
} else {
if (o->type == OBJ_LIST)
serveClientsBlockedOnListKey(o,rl);
diff --git a/src/cluster.c b/src/cluster.c
index 436ed014c..d9acb1bdc 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -7361,8 +7361,9 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del);
signalModifiedKey(NULL, &server.db[0], key);
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id);
+ postExecutionUnitOperations();
decrRefCount(key);
- propagatePendingCommands();
+ postExecutionUnitOperations();
j++;
server.dirty++;
}
diff --git a/src/evict.c b/src/evict.c
index 637a8b6c7..f97285f2a 100644
--- a/src/evict.c
+++ b/src/evict.c
@@ -691,7 +691,7 @@ int performEvictions(void) {
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
keyobj, db->id);
propagateDeletion(db,keyobj,server.lazyfree_lazy_eviction);
- propagatePendingCommands();
+ postExecutionUnitOperations();
decrRefCount(keyobj);
keys_freed++;
diff --git a/src/expire.c b/src/expire.c
index 8220f80cf..a106b0839 100644
--- a/src/expire.c
+++ b/src/expire.c
@@ -267,7 +267,7 @@ void activeExpireCycle(int type) {
if (activeExpireCycleTryExpire(db,e,now)) {
expired++;
/* Propagate the DEL command */
- propagatePendingCommands();
+ postExecutionUnitOperations();
}
if (ttl > 0) {
/* We want the average TTL of keys yet
diff --git a/src/module.c b/src/module.c
index e938b98d7..e336ebd19 100644
--- a/src/module.c
+++ b/src/module.c
@@ -294,6 +294,9 @@ static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER;
/* Function pointer type for keyspace event notification subscriptions from modules. */
typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
+/* Function pointer type for post jobs */
+typedef void (*RedisModulePostNotificationJobFunc) (RedisModuleCtx *ctx, void *pd);
+
/* Keyspace notification subscriber information.
* See RM_SubscribeToKeyspaceEvents() for more information. */
typedef struct RedisModuleKeyspaceSubscriber {
@@ -308,9 +311,21 @@ typedef struct RedisModuleKeyspaceSubscriber {
int active;
} RedisModuleKeyspaceSubscriber;
+typedef struct RedisModulePostExecUnitJob {
+ /* The module subscribed to the event */
+ RedisModule *module;
+ RedisModulePostNotificationJobFunc callback;
+ void *pd;
+ void (*free_pd)(void*);
+ int dbid;
+} RedisModulePostExecUnitJob;
+
/* The module keyspace notification subscribers list */
static list *moduleKeyspaceSubscribers;
+/* The module post keyspace jobs list */
+static list *modulePostExecUnitJobs;
+
/* Data structures related to the exported dictionary data structure. */
typedef struct RedisModuleDict {
rax *rax; /* The radix tree. */
@@ -729,8 +744,9 @@ void moduleFreeContext(RedisModuleCtx *ctx) {
/* Modules take care of their own propagation, when we are
* outside of call() context (timers, events, etc.). */
if (--server.module_ctx_nesting == 0) {
- if (!server.core_propagates)
- propagatePendingCommands();
+ if (!server.core_propagates) {
+ postExecutionUnitOperations();
+ }
if (server.busy_module_yield_flags) {
blockingOperationEnds();
server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
@@ -2207,7 +2223,13 @@ void RM_Yield(RedisModuleCtx *ctx, int flags, const char *busy_reply) {
* REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD:
* Setting this flag indicates module awareness of diskless async replication (repl-diskless-load=swapdb)
* and that redis could be serving reads during replication instead of blocking with LOADING status.
- */
+ *
+ * REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS:
+ * Declare that the module wants to get nested key-space notifications.
+ * By default, Redis will not fire key-space notifications that happened inside
+ * a key-space notification callback. This flag allows to change this behavior
+ * and fire nested key-space notifications. Notice: if enabled, the module
+ * should protected itself from infinite recursion. */
void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) {
ctx->module->options = options;
}
@@ -7905,7 +7927,7 @@ void moduleGILBeforeUnlock() {
* (because it's u clear when thread safe contexts are
* released we have to propagate here). */
server.module_ctx_nesting--;
- propagatePendingCommands();
+ postExecutionUnitOperations();
if (server.busy_module_yield_flags) {
blockingOperationEnds();
@@ -8000,6 +8022,12 @@ void moduleReleaseGIL(void) {
* so notification callbacks must to be fast, or they would slow Redis down.
* If you need to take long actions, use threads to offload them.
*
+ * Moreover, the fact that the notification is executed synchronously means
+ * that the notification code will be executed in the middle on Redis logic
+ * (commands logic, eviction, expire). Changing the key space while the logic
+ * runs is dangerous and discouraged. In order to react to key space events with
+ * write actions, please refer to `RM_AddPostExecutionUnitJob`.
+ *
* See https://redis.io/topics/notifications for more information.
*/
int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc callback) {
@@ -8013,6 +8041,53 @@ int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNoti
return REDISMODULE_OK;
}
+void firePostExecutionUnitJobs() {
+ /* Avoid propagation of commands. */
+ server.in_nested_call++;
+ while (listLength(modulePostExecUnitJobs) > 0) {
+ listNode *ln = listFirst(modulePostExecUnitJobs);
+ RedisModulePostExecUnitJob *job = listNodeValue(ln);
+ listDelNode(modulePostExecUnitJobs, ln);
+
+ RedisModuleCtx ctx;
+ moduleCreateContext(&ctx, job->module, REDISMODULE_CTX_TEMP_CLIENT);
+ selectDb(ctx.client, job->dbid);
+
+ job->callback(&ctx, job->pd);
+ if (job->free_pd) job->free_pd(job->pd);
+
+ moduleFreeContext(&ctx);
+ zfree(job);
+ }
+ server.in_nested_call--;
+}
+
+/* When running inside a key space notification callback, it is dangerous and highly discouraged to perform any write
+ * operation (See `RM_SubscribeToKeyspaceEvents`). In order to still perform write actions in this scenario,
+ * Redis provides `RM_AddPostNotificationJob` API. The API allows to register a job callback which Redis will call
+ * when the following condition are promised to be fulfilled:
+ * 1. It is safe to perform any write operation.
+ * 2. The job will be called atomically along side the key space notification.
+ *
+ * Notice, one job might trigger key space notifications that will trigger more jobs.
+ * This raises a concerns of entering an infinite loops, we consider infinite loops
+ * as a logical bug that need to be fixed in the module, an attempt to protect against
+ * infinite loops by halting the execution could result in violation of the feature correctness
+ * and so Redis will make no attempt to protect the module from infinite loops.
+ *
+ * 'free_pd' can be NULL and in such case will not be used. */
+int RM_AddPostNotificationJob(RedisModuleCtx *ctx, RedisModulePostNotificationJobFunc callback, void *privdata, void (*free_privdata)(void*)) {
+ RedisModulePostExecUnitJob *job = zmalloc(sizeof(*job));
+ job->module = ctx->module;
+ job->callback = callback;
+ job->pd = privdata;
+ job->free_pd = free_privdata;
+ job->dbid = ctx->client->db->id;
+
+ listAddNodeTail(modulePostExecUnitJobs, job);
+ return REDISMODULE_OK;
+}
+
/* Get the configured bitmap of notify-keyspace-events (Could be used
* for additional filtering in RedisModuleNotificationFunc) */
int RM_GetNotifyKeyspaceEvents() {
@@ -8045,7 +8120,8 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
RedisModuleKeyspaceSubscriber *sub = ln->value;
/* Only notify subscribers on events matching the registration,
* and avoid subscribers triggering themselves */
- if ((sub->event_mask & type) && sub->active == 0) {
+ if ((sub->event_mask & type) &&
+ (sub->active == 0 || (sub->module->options & REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS))) {
RedisModuleCtx ctx;
moduleCreateContext(&ctx, sub->module, REDISMODULE_CTX_TEMP_CLIENT);
selectDb(ctx.client, dbid);
@@ -11116,6 +11192,8 @@ void moduleInitModulesSystem(void) {
/* Set up the keyspace notification subscriber list and static client */
moduleKeyspaceSubscribers = listCreate();
+ modulePostExecUnitJobs = listCreate();
+
/* Set up filter list */
moduleCommandFilters = listCreate();
@@ -12235,6 +12313,23 @@ int RM_GetLFU(RedisModuleKey *key, long long *lfu_freq) {
* -------------------------------------------------------------------------- */
/**
+ * Returns the full module options flags mask, using the return value
+ * the module can check if a certain set of module options are supported
+ * by the redis server version in use.
+ * Example:
+ *
+ * int supportedFlags = RM_GetModuleOptionsAll();
+ * if (supportedFlags & REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS) {
+ * // REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS is supported
+ * } else{
+ * // REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS is not supported
+ * }
+ */
+int RM_GetModuleOptionsAll() {
+ return _REDISMODULE_OPTIONS_FLAGS_NEXT - 1;
+}
+
+/**
* Returns the full ContextFlags mask, using the return value
* the module can check if a certain set of flags are supported
* by the redis server version in use.
@@ -12825,6 +12920,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(NotifyKeyspaceEvent);
REGISTER_API(GetNotifyKeyspaceEvents);
REGISTER_API(SubscribeToKeyspaceEvents);
+ REGISTER_API(AddPostNotificationJob);
REGISTER_API(RegisterClusterMessageReceiver);
REGISTER_API(SendClusterMessage);
REGISTER_API(GetClusterNodeInfo);
@@ -12932,6 +13028,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(AuthenticateClientWithACLUser);
REGISTER_API(AuthenticateClientWithUser);
REGISTER_API(GetContextFlagsAll);
+ REGISTER_API(GetModuleOptionsAll);
REGISTER_API(GetKeyspaceNotificationFlagsAll);
REGISTER_API(IsSubEventSupported);
REGISTER_API(GetServerVersion);
diff --git a/src/redismodule.h b/src/redismodule.h
index 0515af61e..7f04f7ba4 100644
--- a/src/redismodule.h
+++ b/src/redismodule.h
@@ -261,6 +261,15 @@ typedef uint64_t RedisModuleTimerID;
/* Declare that the module can handle diskless async replication with RedisModule_SetModuleOptions. */
#define REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD (1<<2)
+/* Declare that the module want to get nested key space notifications.
+ * If enabled, the module is responsible to break endless loop. */
+#define REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS (1<<3)
+
+/* Next option flag, must be updated when adding new module flags above!
+ * This flag should not be used directly by the module.
+ * Use RedisModule_GetModuleOptionsAll instead. */
+#define _REDISMODULE_OPTIONS_FLAGS_NEXT (1<<4)
+
/* Definitions for RedisModule_SetCommandInfo. */
typedef enum {
@@ -834,6 +843,7 @@ typedef struct RedisModuleKeyOptCtx RedisModuleKeyOptCtx;
typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc);
typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
+typedef void (*RedisModulePostNotificationJobFunc) (RedisModuleCtx *ctx, void *pd);
typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
typedef int (*RedisModuleTypeAuxLoadFunc)(RedisModuleIO *rdb, int encver, int when);
@@ -1146,6 +1156,7 @@ REDISMODULE_API void (*RedisModule_ScanCursorDestroy)(RedisModuleScanCursor *cur
REDISMODULE_API int (*RedisModule_Scan)(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanCB fn, void *privdata) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ScanKey)(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleScanKeyCB fn, void *privdata) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetContextFlagsAll)() REDISMODULE_ATTR;
+REDISMODULE_API int (*RedisModule_GetModuleOptionsAll)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetKeyspaceNotificationFlagsAll)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_IsSubEventSupported)(RedisModuleEvent event, uint64_t subevent) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetServerVersion)() REDISMODULE_ATTR;
@@ -1167,6 +1178,7 @@ REDISMODULE_API void (*RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx) R
REDISMODULE_API int (*RedisModule_ThreadSafeContextTryLock)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb) REDISMODULE_ATTR;
+REDISMODULE_API int (*RedisModule_AddPostNotificationJob)(RedisModuleCtx *ctx, RedisModulePostNotificationJobFunc callback, void *pd, void (*free_pd)(void*)) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_NotifyKeyspaceEvent)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetNotifyKeyspaceEvents)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_BlockedClientDisconnected)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
@@ -1490,6 +1502,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(Scan);
REDISMODULE_GET_API(ScanKey);
REDISMODULE_GET_API(GetContextFlagsAll);
+ REDISMODULE_GET_API(GetModuleOptionsAll);
REDISMODULE_GET_API(GetKeyspaceNotificationFlagsAll);
REDISMODULE_GET_API(IsSubEventSupported);
REDISMODULE_GET_API(GetServerVersion);
@@ -1512,6 +1525,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(BlockedClientMeasureTimeEnd);
REDISMODULE_GET_API(SetDisconnectCallback);
REDISMODULE_GET_API(SubscribeToKeyspaceEvents);
+ REDISMODULE_GET_API(AddPostNotificationJob);
REDISMODULE_GET_API(NotifyKeyspaceEvent);
REDISMODULE_GET_API(GetNotifyKeyspaceEvents);
REDISMODULE_GET_API(BlockedClientDisconnected);
diff --git a/src/server.c b/src/server.c
index 149b675eb..bc85a3778 100644
--- a/src/server.c
+++ b/src/server.c
@@ -3226,7 +3226,7 @@ void updateCommandLatencyHistogram(struct hdr_histogram **latency_histogram, int
/* Handle the alsoPropagate() API to handle commands that want to propagate
* multiple separated commands. Note that alsoPropagate() is not affected
* by CLIENT_PREVENT_PROP flag. */
-void propagatePendingCommands() {
+static void propagatePendingCommands() {
if (server.also_propagate.numops == 0)
return;
@@ -3262,6 +3262,31 @@ void propagatePendingCommands() {
redisOpArrayFree(&server.also_propagate);
}
+/* Performs operations that should be performed after an execution unit ends.
+ * Execution unit is a code that should be done atomically.
+ * Execution units can be nested and are not necessarily starts with Redis command.
+ *
+ * For example the following is a logical unit:
+ * active expire ->
+ * trigger del notification of some module ->
+ * accessing a key ->
+ * trigger key miss notification of some other module
+ *
+ * What we want to achieve is that the entire execution unit will be done atomically,
+ * currently with respect to replication and post jobs, but in the future there might
+ * be other considerations. So we basically want the `postUnitOperations` to trigger
+ * after the entire chain finished.
+ *
+ * Current, in order to avoid massive code changes that could be risky to cherry-pick,
+ * we count on the mechanism we already have such as `server.core_propagation`,
+ * `server.module_ctx_nesting`, and `server.in_nested_call`. We understand that we probably
+ * do not need all of those variable and we will make an attempt to re-arrange it on unstable
+ * branch. */
+void postExecutionUnitOperations() {
+ firePostExecutionUnitJobs();
+ propagatePendingCommands();
+}
+
/* Increment the command failure counters (either rejected_calls or failed_calls).
* The decision which counter to increment is done using the flags argument, options are:
* * ERROR_COMMAND_REJECTED - update rejected_calls
@@ -3576,8 +3601,9 @@ void afterCommand(client *c) {
/* If we are at the top-most call() we can propagate what we accumulated.
* Should be done before trackingHandlePendingKeyInvalidations so that we
* reply to client before invalidating cache (makes more sense) */
- if (server.core_propagates)
- propagatePendingCommands();
+ if (server.core_propagates) {
+ postExecutionUnitOperations();
+ }
/* Flush pending invalidation messages only when we are not in nested call.
* So the messages are not interleaved with transaction response. */
trackingHandlePendingKeyInvalidations();
diff --git a/src/server.h b/src/server.h
index 0faf80c0f..3c2a6a045 100644
--- a/src/server.h
+++ b/src/server.h
@@ -2424,6 +2424,7 @@ void moduleAcquireGIL(void);
int moduleTryAcquireGIL(void);
void moduleReleaseGIL(void);
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
+void firePostExecutionUnitJobs();
void moduleCallCommandFilters(client *c);
void ModuleForkDoneHandler(int exitcode, int bysignal);
int TerminateModuleForkChild(int child_pid, int wait);
@@ -2946,7 +2947,7 @@ void startCommandExecution();
int incrCommandStatsOnError(struct redisCommand *cmd, int flags);
void call(client *c, int flags);
void alsoPropagate(int dbid, robj **argv, int argc, int target);
-void propagatePendingCommands();
+void postExecutionUnitOperations();
void redisOpArrayFree(redisOpArray *oa);
void forceCommandPropagation(client *c, int flags);
void preventCommandPropagation(client *c);
diff --git a/tests/modules/Makefile b/tests/modules/Makefile
index 56069406b..5c73ed0e3 100644
--- a/tests/modules/Makefile
+++ b/tests/modules/Makefile
@@ -59,7 +59,8 @@ TEST_MODULES = \
moduleconfigs.so \
moduleconfigstwo.so \
publish.so \
- usercall.so
+ usercall.so \
+ postnotifications.so
.PHONY: all
diff --git a/tests/modules/keyspace_events.c b/tests/modules/keyspace_events.c
index bb9987883..46eb688a5 100644
--- a/tests/modules/keyspace_events.c
+++ b/tests/modules/keyspace_events.c
@@ -131,6 +131,49 @@ static int KeySpace_NotificationModuleKeyMiss(RedisModuleCtx *ctx, int type, con
return REDISMODULE_OK;
}
+static int KeySpace_NotificationModuleString(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
+ REDISMODULE_NOT_USED(type);
+ REDISMODULE_NOT_USED(event);
+ RedisModuleKey *redis_key = RedisModule_OpenKey(ctx, key, REDISMODULE_READ);
+
+ size_t len = 0;
+ /* RedisModule_StringDMA could change the data format and cause the old robj to be freed.
+ * This code verifies that such format change will not cause any crashes.*/
+ char *data = RedisModule_StringDMA(redis_key, &len, REDISMODULE_READ);
+ int res = strncmp(data, "dummy", 5);
+ REDISMODULE_NOT_USED(res);
+
+ RedisModule_CloseKey(redis_key);
+
+ return REDISMODULE_OK;
+}
+
+static void KeySpace_PostNotificationStringFreePD(void *pd) {
+ RedisModule_FreeString(NULL, pd);
+}
+
+static void KeySpace_PostNotificationString(RedisModuleCtx *ctx, void *pd) {
+ REDISMODULE_NOT_USED(ctx);
+ RedisModuleCallReply* rep = RedisModule_Call(ctx, "incr", "!s", pd);
+ RedisModule_FreeCallReply(rep);
+}
+
+static int KeySpace_NotificationModuleStringPostNotificationJob(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
+ REDISMODULE_NOT_USED(ctx);
+ REDISMODULE_NOT_USED(type);
+ REDISMODULE_NOT_USED(event);
+
+ const char *key_str = RedisModule_StringPtrLen(key, NULL);
+
+ if (strncmp(key_str, "string1_", 8) != 0) {
+ return REDISMODULE_OK;
+ }
+
+ RedisModuleString *new_key = RedisModule_CreateStringPrintf(NULL, "string_changed{%s}", key_str);
+ RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
+ return REDISMODULE_OK;
+}
+
static int KeySpace_NotificationModule(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(type);
@@ -312,6 +355,14 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_ERR;
}
+ if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationModuleString) != REDISMODULE_OK){
+ return REDISMODULE_ERR;
+ }
+
+ if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationModuleStringPostNotificationJob) != REDISMODULE_OK){
+ return REDISMODULE_ERR;
+ }
+
if (RedisModule_CreateCommand(ctx,"keyspace.notify", cmdNotify,"",0,0,0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
diff --git a/tests/modules/postnotifications.c b/tests/modules/postnotifications.c
new file mode 100644
index 000000000..ca3a15b43
--- /dev/null
+++ b/tests/modules/postnotifications.c
@@ -0,0 +1,236 @@
+/* This module is used to test the server post keyspace jobs API.
+ *
+ * -----------------------------------------------------------------------------
+ *
+ * Copyright (c) 2020, Meir Shpilraien <meir at redislabs dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/* This module allow to verify 'RedisModule_AddPostNotificationJob' by registering to 3
+ * key space event:
+ * * STRINGS - the module register to all strings notifications and set post notification job
+ * that increase a counter indicating how many times the string key was changed.
+ * In addition, it increase another counter that counts the total changes that
+ * was made on all strings keys.
+ * * EXPIRED - the module register to expired event and set post notification job that that
+ * counts the total number of expired events.
+ * * EVICTED - the module register to evicted event and set post notification job that that
+ * counts the total number of evicted events.
+ *
+ * In addition, the module register a new command, 'postnotification.async_set', that performs a set
+ * command from a background thread. This allows to check the 'RedisModule_AddPostNotificationJob' on
+ * notifications that was triggered on a background thread. */
+
+#define _BSD_SOURCE
+#define _DEFAULT_SOURCE /* For usleep */
+
+#include "redismodule.h"
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <pthread.h>
+
+static void KeySpace_PostNotificationStringFreePD(void *pd) {
+ RedisModule_FreeString(NULL, pd);
+}
+
+static void KeySpace_PostNotificationReadKey(RedisModuleCtx *ctx, void *pd) {
+ RedisModuleCallReply* rep = RedisModule_Call(ctx, "get", "!s", pd);
+ RedisModule_FreeCallReply(rep);
+}
+
+static void KeySpace_PostNotificationString(RedisModuleCtx *ctx, void *pd) {
+ REDISMODULE_NOT_USED(ctx);
+ RedisModuleCallReply* rep = RedisModule_Call(ctx, "incr", "!s", pd);
+ RedisModule_FreeCallReply(rep);
+}
+
+static int KeySpace_NotificationExpired(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
+ REDISMODULE_NOT_USED(type);
+ REDISMODULE_NOT_USED(event);
+ REDISMODULE_NOT_USED(key);
+
+ RedisModuleString *new_key = RedisModule_CreateString(NULL, "expired", 7);
+ RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
+ return REDISMODULE_OK;
+}
+
+static int KeySpace_NotificationEvicted(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
+ REDISMODULE_NOT_USED(type);
+ REDISMODULE_NOT_USED(event);
+ REDISMODULE_NOT_USED(key);
+
+ const char *key_str = RedisModule_StringPtrLen(key, NULL);
+
+ if (strncmp(key_str, "evicted", 7) == 0) {
+ return REDISMODULE_OK; /* do not count the evicted key */
+ }
+
+ RedisModuleString *new_key = RedisModule_CreateString(NULL, "evicted", 7);
+ RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
+ return REDISMODULE_OK;
+}
+
+static int KeySpace_NotificationString(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
+ REDISMODULE_NOT_USED(ctx);
+ REDISMODULE_NOT_USED(type);
+ REDISMODULE_NOT_USED(event);
+
+ const char *key_str = RedisModule_StringPtrLen(key, NULL);
+
+ if (strncmp(key_str, "string_", 7) != 0) {
+ return REDISMODULE_OK;
+ }
+
+ if (strcmp(key_str, "string_total") == 0) {
+ return REDISMODULE_OK;
+ }
+
+ RedisModuleString *new_key;
+ if (strncmp(key_str, "string_changed{", 15) == 0) {
+ new_key = RedisModule_CreateString(NULL, "string_total", 12);
+ } else {
+ new_key = RedisModule_CreateStringPrintf(NULL, "string_changed{%s}", key_str);
+ }
+
+ RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
+ return REDISMODULE_OK;
+}
+
+static int KeySpace_LazyExpireInsidePostNotificationJob(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
+ REDISMODULE_NOT_USED(ctx);
+ REDISMODULE_NOT_USED(type);
+ REDISMODULE_NOT_USED(event);
+
+ const char *key_str = RedisModule_StringPtrLen(key, NULL);
+
+ if (strncmp(key_str, "read_", 5) != 0) {
+ return REDISMODULE_OK;
+ }
+
+ RedisModuleString *new_key = RedisModule_CreateString(NULL, key_str + 5, strlen(key_str) - 5);;
+ RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationReadKey, new_key, KeySpace_PostNotificationStringFreePD);
+ return REDISMODULE_OK;
+}
+
+static int KeySpace_NestedNotification(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
+ REDISMODULE_NOT_USED(ctx);
+ REDISMODULE_NOT_USED(type);
+ REDISMODULE_NOT_USED(event);
+
+ const char *key_str = RedisModule_StringPtrLen(key, NULL);
+
+ if (strncmp(key_str, "write_sync_", 11) != 0) {
+ return REDISMODULE_OK;
+ }
+
+ /* This test was only meant to check REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS.
+ * In general it is wrong and discourage to perform any writes inside a notification callback. */
+ RedisModuleString *new_key = RedisModule_CreateString(NULL, key_str + 11, strlen(key_str) - 11);;
+ RedisModuleCallReply* rep = RedisModule_Call(ctx, "set", "!sc", new_key, "1");
+ RedisModule_FreeCallReply(rep);
+ RedisModule_FreeString(NULL, new_key);
+ return REDISMODULE_OK;
+}
+
+static void *KeySpace_PostNotificationsAsyncSetInner(void *arg) {
+ RedisModuleBlockedClient *bc = arg;
+ RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
+ RedisModule_ThreadSafeContextLock(ctx);
+ RedisModuleCallReply* rep = RedisModule_Call(ctx, "set", "!cc", "string_x", "1");
+ RedisModule_ThreadSafeContextUnlock(ctx);
+ RedisModule_ReplyWithCallReply(ctx, rep);
+ RedisModule_FreeCallReply(rep);
+
+ RedisModule_UnblockClient(bc, NULL);
+ RedisModule_FreeThreadSafeContext(ctx);
+ return NULL;
+}
+
+static int KeySpace_PostNotificationsAsyncSet(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ if (argc != 1)
+ return RedisModule_WrongArity(ctx);
+
+ pthread_t tid;
+ RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);
+
+ if (pthread_create(&tid,NULL,KeySpace_PostNotificationsAsyncSetInner,bc) != 0) {
+ RedisModule_AbortBlock(bc);
+ return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
+ }
+ return REDISMODULE_OK;
+}
+
+/* This function must be present on each Redis module. It is used in order to
+ * register the commands into the Redis server. */
+int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ if (RedisModule_Init(ctx,"postnotifications",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR){
+ return REDISMODULE_ERR;
+ }
+
+ if (!(RedisModule_GetModuleOptionsAll() & REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS)) {
+ return REDISMODULE_ERR;
+ }
+
+ RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS);
+
+ if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationString) != REDISMODULE_OK){
+ return REDISMODULE_ERR;
+ }
+
+ if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_LazyExpireInsidePostNotificationJob) != REDISMODULE_OK){
+ return REDISMODULE_ERR;
+ }
+
+ if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NestedNotification) != REDISMODULE_OK){
+ return REDISMODULE_ERR;
+ }
+
+ if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_EXPIRED, KeySpace_NotificationExpired) != REDISMODULE_OK){
+ return REDISMODULE_ERR;
+ }
+
+ if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_EVICTED, KeySpace_NotificationEvicted) != REDISMODULE_OK){
+ return REDISMODULE_ERR;
+ }
+
+ if (RedisModule_CreateCommand(ctx, "postnotification.async_set", KeySpace_PostNotificationsAsyncSet,
+ "write", 0, 0, 0) == REDISMODULE_ERR){
+ return REDISMODULE_ERR;
+ }
+
+ return REDISMODULE_OK;
+}
+
+int RedisModule_OnUnload(RedisModuleCtx *ctx) {
+ REDISMODULE_NOT_USED(ctx);
+ return REDISMODULE_OK;
+}
diff --git a/tests/unit/moduleapi/keyspace_events.tcl b/tests/unit/moduleapi/keyspace_events.tcl
index ceec6fdf3..19c712052 100644
--- a/tests/unit/moduleapi/keyspace_events.tcl
+++ b/tests/unit/moduleapi/keyspace_events.tcl
@@ -97,5 +97,9 @@ tags "modules" {
test "Unload the module - testkeyspace" {
assert_equal {OK} [r module unload testkeyspace]
}
+
+ test "Verify RM_StringDMA with expiration are not causing invalid memory access" {
+ assert_equal {OK} [r set x 1 EX 1]
+ }
}
}
diff --git a/tests/unit/moduleapi/postnotifications.tcl b/tests/unit/moduleapi/postnotifications.tcl
new file mode 100644
index 000000000..11b003a15
--- /dev/null
+++ b/tests/unit/moduleapi/postnotifications.tcl
@@ -0,0 +1,205 @@
+set testmodule [file normalize tests/modules/postnotifications.so]
+
+tags "modules" {
+ start_server [list overrides [list loadmodule "$testmodule"]] {
+
+ test {Test write on post notification callback} {
+ set repl [attach_to_replication_stream]
+
+ r set string_x 1
+ assert_equal {1} [r get string_changed{string_x}]
+ assert_equal {1} [r get string_total]
+
+ r set string_x 2
+ assert_equal {2} [r get string_changed{string_x}]
+ assert_equal {2} [r get string_total]
+
+ assert_replication_stream $repl {
+ {multi}
+ {select *}
+ {set string_x 1}
+ {incr string_changed{string_x}}
+ {incr string_total}
+ {exec}
+ {multi}
+ {set string_x 2}
+ {incr string_changed{string_x}}
+ {incr string_total}
+ {exec}
+ }
+ close_replication_stream $repl
+ }
+
+ test {Test write on post notification callback from module thread} {
+ r flushall
+ set repl [attach_to_replication_stream]
+
+ assert_equal {OK} [r postnotification.async_set]
+ assert_equal {1} [r get string_changed{string_x}]
+ assert_equal {1} [r get string_total]
+
+ assert_replication_stream $repl {
+ {multi}
+ {select *}
+ {set string_x 1}
+ {incr string_changed{string_x}}
+ {incr string_total}
+ {exec}
+ }
+ close_replication_stream $repl
+ }
+
+ test {Test active expire} {
+ r flushall
+ set repl [attach_to_replication_stream]
+
+ r set x 1
+ r pexpire x 10
+
+ wait_for_condition 100 50 {
+ [r keys expired] == {expired}
+ } else {
+ puts [r keys *]
+ fail "Failed waiting for x to expired"
+ }
+
+ assert_replication_stream $repl {
+ {select *}
+ {set x 1}
+ {pexpireat x *}
+ {multi}
+ {del x}
+ {incr expired}
+ {exec}
+ }
+ close_replication_stream $repl
+ }
+
+ test {Test lazy expire} {
+ r flushall
+ r DEBUG SET-ACTIVE-EXPIRE 0
+ set repl [attach_to_replication_stream]
+
+ r set x 1
+ r pexpire x 1
+ after 10
+ assert_equal {} [r get x]
+
+ assert_replication_stream $repl {
+ {select *}
+ {set x 1}
+ {pexpireat x *}
+ {multi}
+ {del x}
+ {incr expired}
+ {exec}
+ }
+ close_replication_stream $repl
+ r DEBUG SET-ACTIVE-EXPIRE 1
+ } {OK} {needs:debug}
+
+ test {Test lazy expire inside post job notification} {
+ r flushall
+ r DEBUG SET-ACTIVE-EXPIRE 0
+ set repl [attach_to_replication_stream]
+
+ r set x 1
+ r pexpire x 1
+ after 10
+ assert_equal {OK} [r set read_x 1]
+
+ assert_replication_stream $repl {
+ {select *}
+ {set x 1}
+ {pexpireat x *}
+ {multi}
+ {set read_x 1}
+ {del x}
+ {incr expired}
+ {exec}
+ }
+ close_replication_stream $repl
+ r DEBUG SET-ACTIVE-EXPIRE 1
+ } {OK} {needs:debug}
+
+ test {Test nested keyspace notification} {
+ r flushall
+ set repl [attach_to_replication_stream]
+
+ assert_equal {OK} [r set write_sync_write_sync_x 1]
+
+ assert_replication_stream $repl {
+ {multi}
+ {select *}
+ {set x 1}
+ {set write_sync_x 1}
+ {set write_sync_write_sync_x 1}
+ {exec}
+ }
+ close_replication_stream $repl
+ }
+
+ test {Test eviction} {
+ r flushall
+ set repl [attach_to_replication_stream]
+ r set x 1
+ r config set maxmemory-policy allkeys-random
+ r config set maxmemory 1
+
+ assert_error {OOM *} {r set y 1}
+
+ assert_replication_stream $repl {
+ {select *}
+ {set x 1}
+ {multi}
+ {del x}
+ {incr evicted}
+ {exec}
+ }
+ close_replication_stream $repl
+ } {} {needs:config-maxmemory}
+ }
+}
+
+set testmodule2 [file normalize tests/modules/keyspace_events.so]
+
+tags "modules" {
+ start_server [list overrides [list loadmodule "$testmodule"]] {
+ r module load $testmodule2
+ test {Test write on post notification callback} {
+ set repl [attach_to_replication_stream]
+
+ r set string_x 1
+ assert_equal {1} [r get string_changed{string_x}]
+ assert_equal {1} [r get string_total]
+
+ r set string_x 2
+ assert_equal {2} [r get string_changed{string_x}]
+ assert_equal {2} [r get string_total]
+
+ r set string1_x 1
+ assert_equal {1} [r get string_changed{string1_x}]
+ assert_equal {3} [r get string_total]
+
+ assert_replication_stream $repl {
+ {multi}
+ {select *}
+ {set string_x 1}
+ {incr string_changed{string_x}}
+ {incr string_total}
+ {exec}
+ {multi}
+ {set string_x 2}
+ {incr string_changed{string_x}}
+ {incr string_total}
+ {exec}
+ {multi}
+ {set string1_x 1}
+ {incr string_changed{string1_x}}
+ {incr string_total}
+ {exec}
+ }
+ close_replication_stream $repl
+ }
+ }
+} \ No newline at end of file