summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSalvatore Sanfilippo <antirez@gmail.com>2019-12-19 09:24:52 +0100
committerGitHub <noreply@github.com>2019-12-19 09:24:52 +0100
commitd3a9dff6b95eb2ec16e432dd102202694dfc285a (patch)
tree2769e5fddc6c0a58ef9888738c521b33c8557eb9
parent9a7b6a9f51939e09576b3e26be2da6d6ac20f097 (diff)
parent6b056d29f31c01188c4758ade8900c847bbd025c (diff)
downloadredis-d3a9dff6b95eb2ec16e432dd102202694dfc285a.tar.gz
Merge pull request #6615 from soloestoy/wrap-also-propagate-as-multi
Wrap also propagate as multi
-rw-r--r--src/module.c10
-rw-r--r--src/multi.c8
-rw-r--r--src/scripting.c6
-rw-r--r--src/server.c19
-rw-r--r--src/server.h4
-rw-r--r--src/t_list.c17
6 files changed, 39 insertions, 25 deletions
diff --git a/src/module.c b/src/module.c
index 26c095b05..33b69f8c2 100644
--- a/src/module.c
+++ b/src/module.c
@@ -590,11 +590,8 @@ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
/* Handle the replication of the final EXEC, since whatever a command
* emits is always wrapped around MULTI/EXEC. */
- robj *propargv[1];
- propargv[0] = createStringObject("EXEC",4);
- alsoPropagate(server.execCommand,c->db->id,propargv,1,
+ alsoPropagate(server.execCommand,c->db->id,&shared.exec,1,
PROPAGATE_AOF|PROPAGATE_REPL);
- decrRefCount(propargv[0]);
/* If this is not a module command context (but is instead a simple
* callback context), we have to handle directly the "also propagate"
@@ -3300,6 +3297,11 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
* a Lua script in the context of AOF and slaves. */
if (replicate) moduleReplicateMultiIfNeeded(ctx);
+ if (ctx->client->flags & CLIENT_MULTI ||
+ ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) {
+ c->flags |= CLIENT_MULTI;
+ }
+
/* Run the command */
int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;
if (replicate) {
diff --git a/src/multi.c b/src/multi.c
index f885fa19c..df11225bd 100644
--- a/src/multi.c
+++ b/src/multi.c
@@ -106,11 +106,13 @@ void discardCommand(client *c) {
/* Send a MULTI command to all the slaves and AOF file. Check the execCommand
* implementation for more information. */
void execCommandPropagateMulti(client *c) {
- robj *multistring = createStringObject("MULTI",5);
+ propagate(server.multiCommand,c->db->id,&shared.multi,1,
+ PROPAGATE_AOF|PROPAGATE_REPL);
+}
- propagate(server.multiCommand,c->db->id,&multistring,1,
+void execCommandPropagateExec(client *c) {
+ propagate(server.execCommand,c->db->id,&shared.exec,1,
PROPAGATE_AOF|PROPAGATE_REPL);
- decrRefCount(multistring);
}
void execCommand(client *c) {
diff --git a/src/scripting.c b/src/scripting.c
index 43e01f372..9282b7fd9 100644
--- a/src/scripting.c
+++ b/src/scripting.c
@@ -1615,11 +1615,7 @@ void evalGenericCommand(client *c, int evalsha) {
if (server.lua_replicate_commands) {
preventCommandPropagation(c);
if (server.lua_multi_emitted) {
- robj *propargv[1];
- propargv[0] = createStringObject("EXEC",4);
- alsoPropagate(server.execCommand,c->db->id,propargv,1,
- PROPAGATE_AOF|PROPAGATE_REPL);
- decrRefCount(propargv[0]);
+ execCommandPropagateExec(c);
}
}
diff --git a/src/server.c b/src/server.c
index debf2f68e..5845a5485 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2244,6 +2244,8 @@ void createSharedObjects(void) {
shared.rpoplpush = createStringObject("RPOPLPUSH",9);
shared.zpopmin = createStringObject("ZPOPMIN",7);
shared.zpopmax = createStringObject("ZPOPMAX",7);
+ shared.multi = createStringObject("MULTI",5);
+ shared.exec = createStringObject("EXEC",4);
for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
shared.integers[j] =
makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
@@ -2372,6 +2374,7 @@ void initServerConfig(void) {
server.pexpireCommand = lookupCommandByCString("pexpire");
server.xclaimCommand = lookupCommandByCString("xclaim");
server.xgroupCommand = lookupCommandByCString("xgroup");
+ server.rpoplpushCommand = lookupCommandByCString("rpoplpush");
/* Debugging */
server.assert_failed = "<no assertion failed>";
@@ -3272,6 +3275,18 @@ void call(client *c, int flags) {
redisOp *rop;
if (flags & CMD_CALL_PROPAGATE) {
+ int multi_emitted = 0;
+ /* Wrap the commands in server.also_propagate array,
+ * but don't wrap it if we are already in MULIT context,
+ * in case the nested MULIT/EXEC.
+ *
+ * And if the array contains only one command, no need to
+ * wrap it, since the single command is atomic. */
+ if (server.also_propagate.numops > 1 && !(c->flags & CLIENT_MULTI)) {
+ execCommandPropagateMulti(c);
+ multi_emitted = 1;
+ }
+
for (j = 0; j < server.also_propagate.numops; j++) {
rop = &server.also_propagate.ops[j];
int target = rop->target;
@@ -3281,6 +3296,10 @@ void call(client *c, int flags) {
if (target)
propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
}
+
+ if (multi_emitted) {
+ execCommandPropagateExec(c);
+ }
}
redisOpArrayFree(&server.also_propagate);
}
diff --git a/src/server.h b/src/server.h
index fb967a82a..8e354c03d 100644
--- a/src/server.h
+++ b/src/server.h
@@ -848,6 +848,7 @@ struct sharedObjectsStruct {
*busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink,
*rpop, *lpop, *lpush, *rpoplpush, *zpopmin, *zpopmax, *emptyscan,
+ *multi, *exec,
*select[PROTO_SHARED_SELECT_CMDS],
*integers[OBJ_SHARED_INTEGERS],
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
@@ -1083,7 +1084,7 @@ struct redisServer {
*lpopCommand, *rpopCommand, *zpopminCommand,
*zpopmaxCommand, *sremCommand, *execCommand,
*expireCommand, *pexpireCommand, *xclaimCommand,
- *xgroupCommand;
+ *xgroupCommand, *rpoplpushCommand;
/* Fields used only for stats */
time_t stat_starttime; /* Server start time */
long long stat_numcommands; /* Number of processed commands */
@@ -1680,6 +1681,7 @@ void touchWatchedKeysOnFlush(int dbid);
void discardTransaction(client *c);
void flagTransaction(client *c);
void execCommandPropagateMulti(client *c);
+void execCommandPropagateExec(client *c);
/* Redis object implementation */
void decrRefCount(robj *o);
diff --git a/src/t_list.c b/src/t_list.c
index 9bbd61de3..eaeaa8e48 100644
--- a/src/t_list.c
+++ b/src/t_list.c
@@ -653,20 +653,13 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb
if (!(dstobj &&
checkType(receiver,dstobj,OBJ_LIST)))
{
- /* Propagate the RPOP operation. */
- argv[0] = shared.rpop;
- argv[1] = key;
- propagate(server.rpopCommand,
- db->id,argv,2,
- PROPAGATE_AOF|
- PROPAGATE_REPL);
rpoplpushHandlePush(receiver,dstkey,dstobj,
value);
- /* Propagate the LPUSH operation. */
- argv[0] = shared.lpush;
- argv[1] = dstkey;
- argv[2] = value;
- propagate(server.lpushCommand,
+ /* Propagate the RPOPLPUSH operation. */
+ argv[0] = shared.rpoplpush;
+ argv[1] = key;
+ argv[2] = dstkey;
+ propagate(server.rpoplpushCommand,
db->id,argv,3,
PROPAGATE_AOF|
PROPAGATE_REPL);