diff options
Diffstat (limited to 'src/multi.c')
-rw-r--r-- | src/multi.c | 89 |
1 files changed, 52 insertions, 37 deletions
diff --git a/src/multi.c b/src/multi.c index c82876456..112ce0605 100644 --- a/src/multi.c +++ b/src/multi.c @@ -27,18 +27,18 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#include "redis.h" +#include "server.h" /* ================================ MULTI/EXEC ============================== */ /* Client state initialization for MULTI/EXEC */ -void initClientMultiState(redisClient *c) { +void initClientMultiState(client *c) { c->mstate.commands = NULL; c->mstate.count = 0; } /* Release all the resources associated with MULTI/EXEC state */ -void freeClientMultiState(redisClient *c) { +void freeClientMultiState(client *c) { int j; for (j = 0; j < c->mstate.count; j++) { @@ -53,7 +53,7 @@ void freeClientMultiState(redisClient *c) { } /* Add a new command into the MULTI commands queue */ -void queueMultiCommand(redisClient *c) { +void queueMultiCommand(client *c) { multiCmd *mc; int j; @@ -69,31 +69,31 @@ void queueMultiCommand(redisClient *c) { c->mstate.count++; } -void discardTransaction(redisClient *c) { +void discardTransaction(client *c) { freeClientMultiState(c); initClientMultiState(c); - c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC); + c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC); unwatchAllKeys(c); } /* Flag the transacation as DIRTY_EXEC so that EXEC will fail. * Should be called every time there is an error while queueing a command. */ -void flagTransaction(redisClient *c) { - if (c->flags & REDIS_MULTI) - c->flags |= REDIS_DIRTY_EXEC; +void flagTransaction(client *c) { + if (c->flags & CLIENT_MULTI) + c->flags |= CLIENT_DIRTY_EXEC; } -void multiCommand(redisClient *c) { - if (c->flags & REDIS_MULTI) { +void multiCommand(client *c) { + if (c->flags & CLIENT_MULTI) { addReplyError(c,"MULTI calls can not be nested"); return; } - c->flags |= REDIS_MULTI; + c->flags |= CLIENT_MULTI; addReply(c,shared.ok); } -void discardCommand(redisClient *c) { - if (!(c->flags & REDIS_MULTI)) { +void discardCommand(client *c) { + if (!(c->flags & CLIENT_MULTI)) { addReplyError(c,"DISCARD without MULTI"); return; } @@ -103,22 +103,23 @@ void discardCommand(redisClient *c) { /* Send a MULTI command to all the slaves and AOF file. Check the execCommand * implementation for more information. */ -void execCommandPropagateMulti(redisClient *c) { +void execCommandPropagateMulti(client *c) { robj *multistring = createStringObject("MULTI",5); propagate(server.multiCommand,c->db->id,&multistring,1, - REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL); + PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(multistring); } -void execCommand(redisClient *c) { +void execCommand(client *c) { int j; robj **orig_argv; int orig_argc; struct redisCommand *orig_cmd; int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */ + int was_master = server.masterhost == NULL; - if (!(c->flags & REDIS_MULTI)) { + if (!(c->flags & CLIENT_MULTI)) { addReplyError(c,"EXEC without MULTI"); return; } @@ -129,8 +130,8 @@ void execCommand(redisClient *c) { * A failed EXEC in the first case returns a multi bulk nil object * (technically it is not an error but a special behavior), while * in the second an EXECABORT error is returned. */ - if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) { - addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr : + if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) { + addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr : shared.nullmultibulk); discardTransaction(c); goto handle_monitor; @@ -147,16 +148,17 @@ void execCommand(redisClient *c) { c->argv = c->mstate.commands[j].argv; c->cmd = c->mstate.commands[j].cmd; - /* Propagate a MULTI request once we encounter the first write op. + /* Propagate a MULTI request once we encounter the first command which + * is not readonly nor an administrative one. * This way we'll deliver the MULTI/..../EXEC block as a whole and * both the AOF and the replication link will have the same consistency * and atomicity guarantees. */ - if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) { + if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) { execCommandPropagateMulti(c); must_propagate = 1; } - call(c,REDIS_CALL_FULL); + call(c,CMD_CALL_FULL); /* Commands may alter argc/argv, restore mstate. */ c->mstate.commands[j].argc = c->argc; @@ -167,15 +169,28 @@ void execCommand(redisClient *c) { c->argc = orig_argc; c->cmd = orig_cmd; discardTransaction(c); + /* Make sure the EXEC command will be propagated as well if MULTI * was already propagated. */ - if (must_propagate) server.dirty++; + if (must_propagate) { + int is_master = server.masterhost == NULL; + server.dirty++; + /* If inside the MULTI/EXEC block this instance was suddenly + * switched from master to slave (using the SLAVEOF command), the + * initial MULTI was propagated into the replication backlog, but the + * rest was not. We need to make sure to at least terminate the + * backlog with the final EXEC. */ + if (server.repl_backlog && was_master && !is_master) { + char *execcmd = "*1\r\n$4\r\nEXEC\r\n"; + feedReplicationBacklog(execcmd,strlen(execcmd)); + } + } handle_monitor: /* Send EXEC to clients waiting data from MONITOR. We do it here * since the natural order of commands execution is actually: * MUTLI, EXEC, ... commands inside transaction ... - * Instead EXEC is flagged as REDIS_CMD_SKIP_MONITOR in the command + * Instead EXEC is flagged as CMD_SKIP_MONITOR in the command * table, and we do it here with correct ordering. */ if (listLength(server.monitors) && !server.loading) replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); @@ -199,7 +214,7 @@ typedef struct watchedKey { } watchedKey; /* Watch for the specified key */ -void watchForKey(redisClient *c, robj *key) { +void watchForKey(client *c, robj *key) { list *clients = NULL; listIter li; listNode *ln; @@ -230,7 +245,7 @@ void watchForKey(redisClient *c, robj *key) { /* Unwatch all the keys watched by this client. To clean the EXEC dirty * flag is up to the caller. */ -void unwatchAllKeys(redisClient *c) { +void unwatchAllKeys(client *c) { listIter li; listNode *ln; @@ -244,7 +259,7 @@ void unwatchAllKeys(redisClient *c) { * from the list */ wk = listNodeValue(ln); clients = dictFetchValue(wk->db->watched_keys, wk->key); - redisAssertWithInfo(c,NULL,clients != NULL); + serverAssertWithInfo(c,NULL,clients != NULL); listDelNode(clients,listSearchKey(clients,c)); /* Kill the entry at all if this was the only client */ if (listLength(clients) == 0) @@ -267,13 +282,13 @@ void touchWatchedKey(redisDb *db, robj *key) { clients = dictFetchValue(db->watched_keys, key); if (!clients) return; - /* Mark all the clients watching this key as REDIS_DIRTY_CAS */ + /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */ /* Check if we are already watching for this key */ listRewind(clients,&li); while((ln = listNext(&li))) { - redisClient *c = listNodeValue(ln); + client *c = listNodeValue(ln); - c->flags |= REDIS_DIRTY_CAS; + c->flags |= CLIENT_DIRTY_CAS; } } @@ -288,7 +303,7 @@ void touchWatchedKeysOnFlush(int dbid) { /* For every client, check all the waited keys */ listRewind(server.clients,&li1); while((ln = listNext(&li1))) { - redisClient *c = listNodeValue(ln); + client *c = listNodeValue(ln); listRewind(c->watched_keys,&li2); while((ln = listNext(&li2))) { watchedKey *wk = listNodeValue(ln); @@ -298,16 +313,16 @@ void touchWatchedKeysOnFlush(int dbid) { * removed. */ if (dbid == -1 || wk->db->id == dbid) { if (dictFind(wk->db->dict, wk->key->ptr) != NULL) - c->flags |= REDIS_DIRTY_CAS; + c->flags |= CLIENT_DIRTY_CAS; } } } } -void watchCommand(redisClient *c) { +void watchCommand(client *c) { int j; - if (c->flags & REDIS_MULTI) { + if (c->flags & CLIENT_MULTI) { addReplyError(c,"WATCH inside MULTI is not allowed"); return; } @@ -316,8 +331,8 @@ void watchCommand(redisClient *c) { addReply(c,shared.ok); } -void unwatchCommand(redisClient *c) { +void unwatchCommand(client *c) { unwatchAllKeys(c); - c->flags &= (~REDIS_DIRTY_CAS); + c->flags &= (~CLIENT_DIRTY_CAS); addReply(c,shared.ok); } |