summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2011-06-09 19:04:48 +0200
committerantirez <antirez@gmail.com>2011-06-09 19:04:48 +0200
commitbe87e76906e6670100b65e61d2980408660050b6 (patch)
tree59e9947d0c3cab202097628c8b83a4691dd9042e
parent09d9879963c0138dc846a6f172616ec70fdf1539 (diff)
parentdfc74051ce4243209e26bea8dbdda2ee00a9cf06 (diff)
downloadredis-be87e76906e6670100b65e61d2980408660050b6.tar.gz
Merge remote branch 'origin/2.2' into 2.2
-rw-r--r--src/networking.c25
-rw-r--r--src/redis.h1
-rw-r--r--src/t_list.c47
-rw-r--r--src/t_set.c17
-rw-r--r--tests/integration/replication.tcl18
-rw-r--r--tests/unit/type/list.tcl8
6 files changed, 95 insertions, 21 deletions
diff --git a/src/networking.c b/src/networking.c
index 9c250cdd6..7d4b96364 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -872,3 +872,28 @@ void getClientsMaxBuffers(unsigned long *longest_output_list,
*biggest_input_buffer = bib;
}
+void rewriteClientCommandVector(redisClient *c, int argc, ...) {
+ va_list ap;
+ int j;
+ robj **argv; /* The new argument vector */
+
+ argv = zmalloc(sizeof(robj*)*argc);
+ va_start(ap,argc);
+ for (j = 0; j < argc; j++) {
+ robj *a;
+
+ a = va_arg(ap, robj*);
+ argv[j] = a;
+ incrRefCount(a);
+ }
+ /* We free the objects in the original vector at the end, so we are
+ * sure that if the same objects are reused in the new vector the
+ * refcount gets incremented before it gets decremented. */
+ for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]);
+ zfree(c->argv);
+ /* Replace argv and argc with our new versions. */
+ c->argv = argv;
+ c->argc = argc;
+ va_end(ap);
+}
+
diff --git a/src/redis.h b/src/redis.h
index 7c1471794..7dcca3bee 100644
--- a/src/redis.h
+++ b/src/redis.h
@@ -661,6 +661,7 @@ void addReplyMultiBulkLen(redisClient *c, long length);
void *dupClientReplyValue(void *o);
void getClientsMaxBuffers(unsigned long *longest_output_list,
unsigned long *biggest_input_buffer);
+void rewriteClientCommandVector(redisClient *c, int argc, ...);
#ifdef __GNUC__
void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
diff --git a/src/t_list.c b/src/t_list.c
index b85cd2c1c..c05bd1ae3 100644
--- a/src/t_list.c
+++ b/src/t_list.c
@@ -635,7 +635,9 @@ void lremCommand(redisClient *c) {
* as well. This command was originally proposed by Ezra Zygmuntowicz.
*/
-void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
+void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
+ robj *aux;
+
if (!handleClientsWaitingListPush(c,dstkey,value)) {
/* Create the list if the key does not exist */
if (!dstobj) {
@@ -643,9 +645,25 @@ void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value
dbAdd(c->db,dstkey,dstobj);
} else {
touchWatchedKey(c->db,dstkey);
- server.dirty++;
}
listTypePush(dstobj,value,REDIS_HEAD);
+ /* If we are pushing as a result of LPUSH against a key
+ * watched by BLPOPLPUSH, we need to rewrite the command vector.
+ * But if this is called directly by RPOPLPUSH (either directly
+ * or via a BRPOPLPUSH where the popped list exists)
+ * we should replicate the BRPOPLPUSH command itself. */
+ if (c != origclient) {
+ aux = createStringObject("LPUSH",5);
+ rewriteClientCommandVector(origclient,3,aux,dstkey,value);
+ decrRefCount(aux);
+ } else {
+ /* Make sure to always use RPOPLPUSH in the replication / AOF,
+ * even if the original command was BRPOPLPUSH. */
+ aux = createStringObject("RPOPLPUSH",9);
+ rewriteClientCommandVector(origclient,3,aux,c->argv[1],c->argv[2]);
+ decrRefCount(aux);
+ }
+ server.dirty++;
}
/* Always send the pushed value to the client. */
@@ -661,16 +679,22 @@ void rpoplpushCommand(redisClient *c) {
addReply(c,shared.nullbulk);
} else {
robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
+ robj *touchedkey = c->argv[1];
+
if (dobj && checkType(c,dobj,REDIS_LIST)) return;
value = listTypePop(sobj,REDIS_TAIL);
- rpoplpushHandlePush(c,c->argv[2],dobj,value);
+ /* We saved touched key, and protect it, since rpoplpushHandlePush
+ * may change the client command argument vector. */
+ incrRefCount(touchedkey);
+ rpoplpushHandlePush(c,c,c->argv[2],dobj,value);
/* listTypePop returns an object with its refcount incremented */
decrRefCount(value);
/* Delete the source list when it is empty */
- if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]);
- touchWatchedKey(c->db,c->argv[1]);
+ if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey);
+ touchWatchedKey(c->db,touchedkey);
+ decrRefCount(touchedkey);
server.dirty++;
}
}
@@ -772,6 +796,7 @@ void unblockClientWaitingData(redisClient *c) {
/* Cleanup the client structure */
zfree(c->bpop.keys);
c->bpop.keys = NULL;
+ if (c->bpop.target) decrRefCount(c->bpop.target);
c->bpop.target = NULL;
c->flags &= ~REDIS_BLOCKED;
c->flags |= REDIS_UNBLOCKED;
@@ -815,6 +840,9 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
receiver = ln->value;
dstkey = receiver->bpop.target;
+ /* Protect receiver->bpop.target, that will be freed by
+ * the next unblockClientWaitingData() call. */
+ if (dstkey) incrRefCount(dstkey);
/* This should remove the first element of the "clients" list. */
unblockClientWaitingData(receiver);
@@ -823,17 +851,16 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
addReplyMultiBulkLen(receiver,2);
addReplyBulk(receiver,key);
addReplyBulk(receiver,ele);
- return 1;
+ return 1; /* Serve just the first client as in B[RL]POP semantics */
} else {
/* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
dstobj = lookupKeyWrite(receiver->db,dstkey);
- if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) {
- decrRefCount(dstkey);
- } else {
- rpoplpushHandlePush(receiver,dstkey,dstobj,ele);
+ if (!(dstobj && checkType(receiver,dstobj,REDIS_LIST))) {
+ rpoplpushHandlePush(c,receiver,dstkey,dstobj,ele);
decrRefCount(dstkey);
return 1;
}
+ decrRefCount(dstkey);
}
}
diff --git a/src/t_set.c b/src/t_set.c
index bffe873a9..84f736c5f 100644
--- a/src/t_set.c
+++ b/src/t_set.c
@@ -325,7 +325,7 @@ void scardCommand(redisClient *c) {
}
void spopCommand(redisClient *c) {
- robj *set, *ele;
+ robj *set, *ele, *aux;
int64_t llele;
int encoding;
@@ -341,16 +341,11 @@ void spopCommand(redisClient *c) {
setTypeRemove(set,ele);
}
- /* Change argv to replicate as SREM */
- c->argc = 3;
- c->argv = zrealloc(c->argv,sizeof(robj*)*(c->argc));
-
- /* Overwrite SREM with SPOP (same length) */
- redisAssert(sdslen(c->argv[0]->ptr) == 4);
- memcpy(c->argv[0]->ptr, "SREM", 4);
-
- /* Popped element already has incremented refcount */
- c->argv[2] = ele;
+ /* Replicate/AOF this command as an SREM operation */
+ aux = createStringObject("SREM",4);
+ rewriteClientCommandVector(c,3,aux,c->argv[1],ele);
+ decrRefCount(ele);
+ decrRefCount(aux);
addReplyBulk(c,ele);
if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl
index 892fae03b..227356b24 100644
--- a/tests/integration/replication.tcl
+++ b/tests/integration/replication.tcl
@@ -6,6 +6,24 @@ start_server {tags {"repl"}} {
s -1 role
} {slave}
+ test {BRPOPLPUSH replication, when blocking against empty list} {
+ set rd [redis_deferring_client]
+ $rd brpoplpush a b 5
+ r lpush a foo
+ after 1000
+ assert_equal [r debug digest] [r -1 debug digest]
+ }
+
+ test {BRPOPLPUSH replication, list exists} {
+ set rd [redis_deferring_client]
+ r lpush c 1
+ r lpush c 2
+ r lpush c 3
+ $rd brpoplpush c d 5
+ after 1000
+ assert_equal [r debug digest] [r -1 debug digest]
+ }
+
test {MASTER and SLAVE dataset should be identical after complex ops} {
createComplexDataset r 10000
after 500
diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl
index 6b128b726..84c95c2eb 100644
--- a/tests/unit/type/list.tcl
+++ b/tests/unit/type/list.tcl
@@ -262,6 +262,14 @@ start_server {
r exec
} {foo bar {} {} {bar foo}}
+ test {BRPOPLPUSH timeout} {
+ set rd [redis_deferring_client]
+
+ $rd brpoplpush foo_list bar_list 1
+ after 2000
+ $rd read
+ } {}
+
foreach {pop} {BLPOP BRPOP} {
test "$pop: with single empty list argument" {
set rd [redis_deferring_client]