summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2019-10-03 11:03:46 +0200
committerantirez <antirez@gmail.com>2019-11-14 17:44:37 +0100
commit3b38164e8fe8627ae10536e3cfee5520983a1888 (patch)
treeac2ba6e5b4e07ab7fa139cbdcf85e679dfc56e20
parentef9fe9b0cbd11f1aac11cc9d635f957efd621206 (diff)
downloadredis-3b38164e8fe8627ae10536e3cfee5520983a1888.tar.gz
Modules: RM_Replicate() in thread safe contexts.
-rw-r--r--src/module.c30
1 files changed, 26 insertions, 4 deletions
diff --git a/src/module.c b/src/module.c
index dcb5c9c98..8126e2749 100644
--- a/src/module.c
+++ b/src/module.c
@@ -1372,6 +1372,20 @@ void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) {
*
* Please refer to RedisModule_Call() for more information.
*
+ * ## Note about calling this function from a thread safe context:
+ *
+ * Normally when you call this function from the callback implementing a
+ * module command, or any other callback provided by the Redis Module API,
+ * Redis will accumulate all the calls to this function in the context of
+ * the callback, and will propagate all the commands wrapped in a MULTI/EXEC
+ * transaction. However when calling this function from a threaded safe context
+ * that can live an undefined amount of time, and can be locked/unlocked in
+ * at will, the behavior is different: MULTI/EXEC wrapper is not emitted
+ * and the command specified is inserted in the AOF and replication stream
+ * immediately.
+ *
+ * ## Return value
+ *
* The command returns REDISMODULE_ERR if the format specifiers are invalid
* or the command name does not belong to a known command. */
int RM_Replicate(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) {
@@ -1389,10 +1403,18 @@ int RM_Replicate(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...)
va_end(ap);
if (argv == NULL) return REDISMODULE_ERR;
- /* Replicate! */
- moduleReplicateMultiIfNeeded(ctx);
- alsoPropagate(cmd,ctx->client->db->id,argv,argc,
- PROPAGATE_AOF|PROPAGATE_REPL);
+ /* Replicate! When we are in a threaded context, we want to just insert
+ * the replicated command ASAP, since it is not clear when the context
+ * will stop being used, so accumulating stuff does not make much sense,
+ * nor we could easily use the alsoPropagate() API from threads. */
+ if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) {
+ propagate(cmd,ctx->client->db->id,argv,argc,
+ PROPAGATE_AOF|PROPAGATE_REPL);
+ } else {
+ moduleReplicateMultiIfNeeded(ctx);
+ alsoPropagate(cmd,ctx->client->db->id,argv,argc,
+ PROPAGATE_AOF|PROPAGATE_REPL);
+ }
/* Release the argv. */
for (j = 0; j < argc; j++) decrRefCount(argv[j]);