summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2019-10-03 10:56:37 +0200
committerantirez <antirez@gmail.com>2019-11-14 17:44:29 +0100
commitef9fe9b0cbd11f1aac11cc9d635f957efd621206 (patch)
tree260fdd83ec178b22a645544e2521523803644af9
parent8066d2a197095bf727f7bb5feb37801ae24ce279 (diff)
downloadredis-ef9fe9b0cbd11f1aac11cc9d635f957efd621206.tar.gz
Modules: implement RM_Replicate() from async callbacks.
-rw-r--r--src/module.c33
-rw-r--r--src/server.h2
2 files changed, 33 insertions, 2 deletions
diff --git a/src/module.c b/src/module.c
index 8a0be9b79..dcb5c9c98 100644
--- a/src/module.c
+++ b/src/module.c
@@ -133,10 +133,14 @@ struct RedisModuleCtx {
int keys_count;
struct RedisModulePoolAllocBlock *pa_head;
+ redisOpArray saved_oparray; /* When propagating commands in a callback
+ we reallocate the "also propagate" op
+ array. Here we save the old one to
+ restore it later. */
};
typedef struct RedisModuleCtx RedisModuleCtx;
-#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL}
+#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL, {0}}
#define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
#define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
#define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2)
@@ -516,6 +520,24 @@ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
alsoPropagate(server.execCommand,c->db->id,propargv,1,
PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(propargv[0]);
+
+ /* If this is not a module command context (but is instead a simple
+ * callback context), we have to handle directly the "also propagate"
+ * array and emit it. In a module command call this will be handled
+ * directly by call(). */
+ if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL) &&
+ server.also_propagate.numops)
+ {
+ for (int j = 0; j < server.also_propagate.numops; j++) {
+ redisOp *rop = &server.also_propagate.ops[j];
+ int target = rop->target;
+ if (target)
+ propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
+ }
+ redisOpArrayFree(&server.also_propagate);
+ }
+ /* Restore the previous oparray in case of nexted use of the API. */
+ server.also_propagate = ctx->saved_oparray;
}
/* Free the context after the user function was called. */
@@ -1319,9 +1341,16 @@ void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) {
/* If we already emitted MULTI return ASAP. */
if (ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) return;
/* If this is a thread safe context, we do not want to wrap commands
- * executed into MUTLI/EXEC, they are executed as single commands
+ * executed into MULTI/EXEC, they are executed as single commands
* from an external client in essence. */
if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) return;
+ /* If this is a callback context, and not a module command execution
+ * context, we have to setup the op array for the "also propagate" API
+ * so that RM_Replicate() will work. */
+ if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL)) {
+ ctx->saved_oparray = server.also_propagate;
+ redisOpArrayInit(&server.also_propagate);
+ }
execCommandPropagateMulti(ctx->client);
ctx->flags |= REDISMODULE_CTX_MULTI_EMITTED;
}
diff --git a/src/server.h b/src/server.h
index 42831d93c..8c0ec3809 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1731,6 +1731,8 @@ struct redisCommand *lookupCommandOrOriginal(sds name);
void call(client *c, int flags);
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags);
void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int target);
+void redisOpArrayInit(redisOpArray *oa);
+void redisOpArrayFree(redisOpArray *oa);
void forceCommandPropagation(client *c, int flags);
void preventCommandPropagation(client *c);
void preventCommandAOF(client *c);