diff options
Diffstat (limited to 'src/t_list.c')
-rw-r--r-- | src/t_list.c | 63 |
1 files changed, 56 insertions, 7 deletions
diff --git a/src/t_list.c b/src/t_list.c index b85cd2c1c..6e84ba420 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -619,6 +619,31 @@ void lremCommand(redisClient *c) { if (removed) touchWatchedKey(c->db,c->argv[1]); } +void rewriteClientCommandVector(redisClient *c, int argc, ...) { + va_list ap; + int j; + robj **argv; /* The new argument vector */ + + argv = zmalloc(sizeof(robj*)*argc); + va_start(ap,argc); + for (j = 0; j < argc; j++) { + robj *a; + + a = va_arg(ap, robj*); + argv[j] = a; + incrRefCount(a); + } + /* We free the objects in the original vector at the end, so we are + * sure that if the same objects are reused in the new vector the + * refcount gets incremented before it gets decremented. */ + for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]); + zfree(c->argv); + /* Replace argv and argc with our new versions. */ + c->argv = argv; + c->argc = argc; + va_end(ap); +} + /* This is the semantic of this command: * RPOPLPUSH srclist dstlist: * IF LLEN(srclist) > 0 @@ -635,7 +660,9 @@ void lremCommand(redisClient *c) { * as well. This command was originally proposed by Ezra Zygmuntowicz. */ -void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) { +void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) { + robj *aux; + if (!handleClientsWaitingListPush(c,dstkey,value)) { /* Create the list if the key does not exist */ if (!dstobj) { @@ -643,9 +670,25 @@ void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value dbAdd(c->db,dstkey,dstobj); } else { touchWatchedKey(c->db,dstkey); - server.dirty++; } listTypePush(dstobj,value,REDIS_HEAD); + /* If we are pushing as a result of LPUSH against a key + * watched by BLPOPLPUSH, we need to rewrite the command vector. + * But if this is called directly by RPOPLPUSH (either directly + * or via a BRPOPLPUSH where the popped list exists) + * we should replicate the BRPOPLPUSH command itself. */ + if (c != origclient) { + aux = createStringObject("LPUSH",5); + rewriteClientCommandVector(origclient,3,aux,dstkey,value); + decrRefCount(aux); + } else { + /* Make sure to always use RPOPLPUSH in the replication / AOF, + * even if the original command was BRPOPLPUSH. */ + aux = createStringObject("RPOPLPUSH",9); + rewriteClientCommandVector(origclient,3,aux,c->argv[1],c->argv[2]); + decrRefCount(aux); + } + server.dirty++; } /* Always send the pushed value to the client. */ @@ -661,16 +704,22 @@ void rpoplpushCommand(redisClient *c) { addReply(c,shared.nullbulk); } else { robj *dobj = lookupKeyWrite(c->db,c->argv[2]); + robj *touchedkey = c->argv[1]; + if (dobj && checkType(c,dobj,REDIS_LIST)) return; value = listTypePop(sobj,REDIS_TAIL); - rpoplpushHandlePush(c,c->argv[2],dobj,value); + /* We saved touched key, and protect it, since rpoplpushHandlePush + * may change the client command argument vector. */ + incrRefCount(touchedkey); + rpoplpushHandlePush(c,c,c->argv[2],dobj,value); /* listTypePop returns an object with its refcount incremented */ decrRefCount(value); /* Delete the source list when it is empty */ - if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]); - touchWatchedKey(c->db,c->argv[1]); + if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey); + touchWatchedKey(c->db,touchedkey); + decrRefCount(touchedkey); server.dirty++; } } @@ -823,14 +872,14 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { addReplyMultiBulkLen(receiver,2); addReplyBulk(receiver,key); addReplyBulk(receiver,ele); - return 1; + return 1; /* Serve just the first client as in B[RL]POP semantics */ } else { /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */ dstobj = lookupKeyWrite(receiver->db,dstkey); if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) { decrRefCount(dstkey); } else { - rpoplpushHandlePush(receiver,dstkey,dstobj,ele); + rpoplpushHandlePush(c,receiver,dstkey,dstobj,ele); decrRefCount(dstkey); return 1; } |