diff options
author | Pieter Noordhuis <pcnoordhuis@gmail.com> | 2010-12-06 16:04:10 +0100 |
---|---|---|
committer | Pieter Noordhuis <pcnoordhuis@gmail.com> | 2010-12-06 16:04:10 +0100 |
commit | 8a88c368edbc12540eee3d129b8a017bd6a84cac (patch) | |
tree | 3b9541fc5187d398821d357e2598b6277f19e109 /src | |
parent | ac06fc011df598372232a5dc1805683004240c0d (diff) | |
download | redis-8a88c368edbc12540eee3d129b8a017bd6a84cac.tar.gz |
Check other blocked clients when value could not be pushed
Diffstat (limited to 'src')
-rw-r--r-- | src/t_list.c | 64 |
1 files changed, 41 insertions, 23 deletions
diff --git a/src/t_list.c b/src/t_list.c index 3ce0f992f..866a6a3e2 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -780,35 +780,53 @@ void unblockClientWaitingData(redisClient *c) { int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { struct dictEntry *de; redisClient *receiver; - list *l; + int numclients; + list *clients; listNode *ln; + robj *dstkey, *dstobj; de = dictFind(c->db->blocking_keys,key); if (de == NULL) return 0; - l = dictGetEntryVal(de); - ln = listFirst(l); - redisAssert(ln != NULL); - receiver = ln->value; - - robj *target = receiver->bpop.target; - - unblockClientWaitingData(receiver); - - if (target == NULL) { - /* BRPOP/BLPOP return a multi-bulk with the name - * of the popped list */ - addReplyMultiBulkLen(receiver,2); - addReplyBulk(receiver,key); - addReplyBulk(receiver,ele); - } else { - /* BRPOPLPUSH */ - robj *dobj = lookupKeyWrite(receiver->db,target); - if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; - rpoplpushHandlePush(receiver,target,dobj,ele); - decrRefCount(target); + clients = dictGetEntryVal(de); + numclients = listLength(clients); + + /* Try to handle the push as long as there are clients waiting for a push. + * Note that "numclients" is used because the list of clients waiting for a + * push on "key" is deleted by unblockClient() when empty. + * + * This loop will have more than 1 iteration when there is a BRPOPLPUSH + * that cannot push the target list because it does not contain a list. If + * this happens, it simply tries the next client waiting for a push. */ + while (numclients--) { + ln = listFirst(clients); + redisAssert(ln != NULL); + receiver = ln->value; + dstkey = receiver->bpop.target; + + /* This should remove the first element of the "clients" list. */ + unblockClientWaitingData(receiver); + redisAssert(ln != listFirst(clients)); + + if (dstkey == NULL) { + /* BRPOP/BLPOP */ + addReplyMultiBulkLen(receiver,2); + addReplyBulk(receiver,key); + addReplyBulk(receiver,ele); + return 1; + } else { + /* BRPOPLPUSH */ + dstobj = lookupKeyWrite(receiver->db,dstkey); + if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) { + decrRefCount(dstkey); + } else { + rpoplpushHandlePush(receiver,dstkey,dstobj,ele); + decrRefCount(dstkey); + return 1; + } + } } - return 1; + return 0; } int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) { |