From 8a979f039011a4672b1052ee84ca56f214e6a681 Mon Sep 17 00:00:00 2001 From: Damian Janowski & Michel Martens Date: Mon, 8 Nov 2010 10:43:21 -0300 Subject: Fix case in RPOPLPUSH. --- src/redis.c | 2 +- src/redis.h | 2 +- src/t_list.c | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/redis.c b/src/redis.c index 5b39c011f..1f7e7d266 100644 --- a/src/redis.c +++ b/src/redis.c @@ -96,7 +96,7 @@ struct redisCommand readonlyCommandTable[] = { {"lrange",lrangeCommand,4,0,NULL,1,1,1}, {"ltrim",ltrimCommand,4,0,NULL,1,1,1}, {"lrem",lremCommand,4,0,NULL,1,1,1}, - {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_DENYOOM,NULL,1,2,1}, + {"rpoplpush",rpoplpushCommand,3,REDIS_CMD_DENYOOM,NULL,1,2,1}, {"sadd",saddCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1}, {"srem",sremCommand,3,0,NULL,1,1,1}, {"smove",smoveCommand,4,0,NULL,1,2,1}, diff --git a/src/redis.h b/src/redis.h index 11e4e557f..6bd0fd5df 100644 --- a/src/redis.h +++ b/src/redis.h @@ -932,7 +932,7 @@ void flushdbCommand(redisClient *c); void flushallCommand(redisClient *c); void sortCommand(redisClient *c); void lremCommand(redisClient *c); -void rpoplpushcommand(redisClient *c); +void rpoplpushCommand(redisClient *c); void infoCommand(redisClient *c); void mgetCommand(redisClient *c); void monitorCommand(redisClient *c); diff --git a/src/t_list.c b/src/t_list.c index 42e1d5873..10e7f72c7 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -618,7 +618,7 @@ void lremCommand(redisClient *c) { * since the element is not just returned but pushed against another list * as well. This command was originally proposed by Ezra Zygmuntowicz. */ -void rpoplpushcommand(redisClient *c) { +void rpoplpushCommand(redisClient *c) { robj *sobj, *value; if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,sobj,REDIS_LIST)) return; -- cgit v1.2.1 From b2a7fd0cf7afef7e7ede9e46a317fcb9ae84768c Mon Sep 17 00:00:00 2001 From: Damian Janowski & Michel Martens Date: Mon, 8 Nov 2010 15:25:59 -0300 Subject: BRPOPLPUSH. --- src/redis.c | 1 + src/redis.h | 2 ++ src/t_list.c | 83 +++++++++++++++++++++++++++++++++++++++++++----------------- 3 files changed, 63 insertions(+), 23 deletions(-) (limited to 'src') diff --git a/src/redis.c b/src/redis.c index 1f7e7d266..58a796c03 100644 --- a/src/redis.c +++ b/src/redis.c @@ -89,6 +89,7 @@ struct redisCommand readonlyCommandTable[] = { {"rpop",rpopCommand,2,0,NULL,1,1,1}, {"lpop",lpopCommand,2,0,NULL,1,1,1}, {"brpop",brpopCommand,-3,0,NULL,1,1,1}, + {"brpoplpush",brpoplpushCommand,4,REDIS_CMD_DENYOOM,NULL,1,2,1}, {"blpop",blpopCommand,-3,0,NULL,1,1,1}, {"llen",llenCommand,2,0,NULL,1,1,1}, {"lindex",lindexCommand,3,0,NULL,1,1,1}, diff --git a/src/redis.h b/src/redis.h index 6bd0fd5df..83a94483f 100644 --- a/src/redis.h +++ b/src/redis.h @@ -321,6 +321,7 @@ typedef struct redisClient { int blocking_keys_num; /* Number of blocking keys */ time_t blockingto; /* Blocking operation timeout. If UNIX current time * is >= blockingto then the operation timed out. */ + robj *blocking_target; list *io_keys; /* Keys this client is waiting to be loaded from the * swap file in order to continue. */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ @@ -961,6 +962,7 @@ void execCommand(redisClient *c); void discardCommand(redisClient *c); void blpopCommand(redisClient *c); void brpopCommand(redisClient *c); +void brpoplpushCommand(redisClient *c); void appendCommand(redisClient *c); void substrCommand(redisClient *c); void strlenCommand(redisClient *c); diff --git a/src/t_list.c b/src/t_list.c index 10e7f72c7..437f4004b 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -777,9 +777,28 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { redisAssert(ln != NULL); receiver = ln->value; - addReplyMultiBulkLen(receiver,2); - addReplyBulk(receiver,key); - addReplyBulk(receiver,ele); + if (receiver->blocking_target == NULL) { + addReplyMultiBulkLen(receiver,2); + addReplyBulk(receiver,key); + addReplyBulk(receiver,ele); + } + else { + receiver->argc++; + + robj *dobj = lookupKeyWrite(receiver->db,receiver->blocking_target); + if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; + + addReplyBulk(receiver,ele); + + /* Create the list if the key does not exist */ + if (!dobj) { + dobj = createZiplistObject(); + dbAdd(receiver->db,receiver->blocking_target,dobj); + } + + listTypePush(dobj,ele,REDIS_HEAD); + } + unblockClientWaitingData(receiver); return 1; } @@ -814,26 +833,36 @@ void blockingPopGenericCommand(redisClient *c, int where) { robj *argv[2], **orig_argv; int orig_argc; - /* We need to alter the command arguments before to call - * popGenericCommand() as the command takes a single key. */ - orig_argv = c->argv; - orig_argc = c->argc; - argv[1] = c->argv[j]; - c->argv = argv; - c->argc = 2; - - /* Also the return value is different, we need to output - * the multi bulk reply header and the key name. The - * "real" command will add the last element (the value) - * for us. If this souds like an hack to you it's just - * because it is... */ - addReplyMultiBulkLen(c,2); - addReplyBulk(c,argv[1]); - popGenericCommand(c,where); - - /* Fix the client structure with the original stuff */ - c->argv = orig_argv; - c->argc = orig_argc; + if (c->blocking_target == NULL) { + /* We need to alter the command arguments before to call + * popGenericCommand() as the command takes a single key. */ + orig_argv = c->argv; + orig_argc = c->argc; + argv[1] = c->argv[j]; + c->argv = argv; + c->argc = 2; + + /* Also the return value is different, we need to output + * the multi bulk reply header and the key name. The + * "real" command will add the last element (the value) + * for us. If this souds like an hack to you it's just + * because it is... */ + addReplyMultiBulkLen(c,2); + addReplyBulk(c,argv[1]); + + popGenericCommand(c,where); + + /* Fix the client structure with the original stuff */ + c->argv = orig_argv; + c->argc = orig_argc; + } + else { + c->argv[2] = c->blocking_target; + c->blocking_target = NULL; + + rpoplpushCommand(c); + } + return; } } @@ -860,3 +889,11 @@ void blpopCommand(redisClient *c) { void brpopCommand(redisClient *c) { blockingPopGenericCommand(c,REDIS_TAIL); } + +void brpoplpushCommand(redisClient *c) { + c->blocking_target = c->argv[2]; + c->argv[2] = c->argv[3]; + c->argc--; + + blockingPopGenericCommand(c,REDIS_TAIL); +} -- cgit v1.2.1 From 357a841714d50f3d3c293b9bb03839bac28cb05a Mon Sep 17 00:00:00 2001 From: Damian Janowski & Michel Martens Date: Mon, 8 Nov 2010 19:38:01 -0300 Subject: Move to struct. --- src/networking.c | 8 +++++--- src/redis.h | 17 +++++++++++------ src/t_list.c | 36 ++++++++++++++++++------------------ 3 files changed, 34 insertions(+), 27 deletions(-) (limited to 'src') diff --git a/src/networking.c b/src/networking.c index 634e2107c..530e2ca87 100644 --- a/src/networking.c +++ b/src/networking.c @@ -41,8 +41,10 @@ redisClient *createClient(int fd) { c->reply = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); - c->blocking_keys = NULL; - c->blocking_keys_num = 0; + c->bstate.keys = NULL; + c->bstate.count = 0; + c->bstate.timeout = 0; + c->bstate.target = NULL; c->io_keys = listCreate(); c->watched_keys = listCreate(); listSetFreeMethod(c->io_keys,decrRefCount); @@ -677,7 +679,7 @@ void closeTimedoutClients(void) { redisLog(REDIS_VERBOSE,"Closing idle client"); freeClient(c); } else if (c->flags & REDIS_BLOCKED) { - if (c->blockingto != 0 && c->blockingto < now) { + if (c->bstate.timeout != 0 && c->bstate.timeout < now) { addReply(c,shared.nullmultibulk); unblockClientWaitingData(c); } diff --git a/src/redis.h b/src/redis.h index 83a94483f..0de94585b 100644 --- a/src/redis.h +++ b/src/redis.h @@ -293,6 +293,16 @@ typedef struct multiState { int count; /* Total number of MULTI commands */ } multiState; +typedef struct blockingState { + robj **keys; /* The key we are waiting to terminate a blocking + * operation such as BLPOP. Otherwise NULL. */ + int count; /* Number of blocking keys */ + time_t timeout; /* Blocking operation timeout. If UNIX current time + * is >= timeout then the operation timed out. */ + robj *target; /* The key that should receive the element, + * for BRPOPLPUSH. */ +} blockingState; + /* With multiplexing we need to take per-clinet state. * Clients are taken in a liked list. */ typedef struct redisClient { @@ -316,12 +326,7 @@ typedef struct redisClient { long repldboff; /* replication DB file offset */ off_t repldbsize; /* replication DB file size */ multiState mstate; /* MULTI/EXEC state */ - robj **blocking_keys; /* The key we are waiting to terminate a blocking - * operation such as BLPOP. Otherwise NULL. */ - int blocking_keys_num; /* Number of blocking keys */ - time_t blockingto; /* Blocking operation timeout. If UNIX current time - * is >= blockingto then the operation timed out. */ - robj *blocking_target; + blockingState bstate; /* blocking state */ list *io_keys; /* Keys this client is waiting to be loaded from the * swap file in order to continue. */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ diff --git a/src/t_list.c b/src/t_list.c index 437f4004b..17015acfe 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -694,12 +694,12 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) { list *l; int j; - c->blocking_keys = zmalloc(sizeof(robj*)*numkeys); - c->blocking_keys_num = numkeys; - c->blockingto = timeout; + c->bstate.keys = zmalloc(sizeof(robj*)*numkeys); + c->bstate.count = numkeys; + c->bstate.timeout = timeout; for (j = 0; j < numkeys; j++) { /* Add the key in the client structure, to map clients -> keys */ - c->blocking_keys[j] = keys[j]; + c->bstate.keys[j] = keys[j]; incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ @@ -728,22 +728,22 @@ void unblockClientWaitingData(redisClient *c) { list *l; int j; - redisAssert(c->blocking_keys != NULL); + redisAssert(c->bstate.keys != NULL); /* The client may wait for multiple keys, so unblock it for every key. */ - for (j = 0; j < c->blocking_keys_num; j++) { + for (j = 0; j < c->bstate.count; j++) { /* Remove this client from the list of clients waiting for this key. */ - de = dictFind(c->db->blocking_keys,c->blocking_keys[j]); + de = dictFind(c->db->blocking_keys,c->bstate.keys[j]); redisAssert(de != NULL); l = dictGetEntryVal(de); listDelNode(l,listSearchKey(l,c)); /* If the list is empty we need to remove it to avoid wasting memory */ if (listLength(l) == 0) - dictDelete(c->db->blocking_keys,c->blocking_keys[j]); - decrRefCount(c->blocking_keys[j]); + dictDelete(c->db->blocking_keys,c->bstate.keys[j]); + decrRefCount(c->bstate.keys[j]); } /* Cleanup the client structure */ - zfree(c->blocking_keys); - c->blocking_keys = NULL; + zfree(c->bstate.keys); + c->bstate.keys = NULL; c->flags &= (~REDIS_BLOCKED); server.blpop_blocked_clients--; /* We want to process data if there is some command waiting @@ -777,7 +777,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { redisAssert(ln != NULL); receiver = ln->value; - if (receiver->blocking_target == NULL) { + if (receiver->bstate.target == NULL) { addReplyMultiBulkLen(receiver,2); addReplyBulk(receiver,key); addReplyBulk(receiver,ele); @@ -785,7 +785,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { else { receiver->argc++; - robj *dobj = lookupKeyWrite(receiver->db,receiver->blocking_target); + robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target); if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; addReplyBulk(receiver,ele); @@ -793,7 +793,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { /* Create the list if the key does not exist */ if (!dobj) { dobj = createZiplistObject(); - dbAdd(receiver->db,receiver->blocking_target,dobj); + dbAdd(receiver->db,receiver->bstate.target,dobj); } listTypePush(dobj,ele,REDIS_HEAD); @@ -833,7 +833,7 @@ void blockingPopGenericCommand(redisClient *c, int where) { robj *argv[2], **orig_argv; int orig_argc; - if (c->blocking_target == NULL) { + if (c->bstate.target == NULL) { /* We need to alter the command arguments before to call * popGenericCommand() as the command takes a single key. */ orig_argv = c->argv; @@ -857,8 +857,8 @@ void blockingPopGenericCommand(redisClient *c, int where) { c->argc = orig_argc; } else { - c->argv[2] = c->blocking_target; - c->blocking_target = NULL; + c->argv[2] = c->bstate.target; + c->bstate.target = NULL; rpoplpushCommand(c); } @@ -891,7 +891,7 @@ void brpopCommand(redisClient *c) { } void brpoplpushCommand(redisClient *c) { - c->blocking_target = c->argv[2]; + c->bstate.target = c->argv[2]; c->argv[2] = c->argv[3]; c->argc--; -- cgit v1.2.1 From ba3b474111a79bfa378a8c77d89fff600cd5b23a Mon Sep 17 00:00:00 2001 From: Damian Janowski & Michel Martens Date: Mon, 8 Nov 2010 20:47:46 -0300 Subject: Refactor code for BRPOPLPUSH. --- src/t_list.c | 123 +++++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 77 insertions(+), 46 deletions(-) (limited to 'src') diff --git a/src/t_list.c b/src/t_list.c index 17015acfe..2dedf4811 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -689,7 +689,7 @@ void rpoplpushCommand(redisClient *c) { /* Set a client in blocking mode for the specified key, with the specified * timeout */ -void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) { +void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) { dictEntry *de; list *l; int j; @@ -697,6 +697,12 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) { c->bstate.keys = zmalloc(sizeof(robj*)*numkeys); c->bstate.count = numkeys; c->bstate.timeout = timeout; + c->bstate.target = target; + + if (target != NULL) { + incrRefCount(target); + } + for (j = 0; j < numkeys; j++) { /* Add the key in the client structure, to map clients -> keys */ c->bstate.keys[j] = keys[j]; @@ -741,9 +747,15 @@ void unblockClientWaitingData(redisClient *c) { dictDelete(c->db->blocking_keys,c->bstate.keys[j]); decrRefCount(c->bstate.keys[j]); } + + if (c->bstate.target != NULL) { + decrRefCount(c->bstate.target); + } + /* Cleanup the client structure */ zfree(c->bstate.keys); c->bstate.keys = NULL; + c->bstate.target = NULL; c->flags &= (~REDIS_BLOCKED); server.blpop_blocked_clients--; /* We want to process data if there is some command waiting @@ -783,8 +795,6 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { addReplyBulk(receiver,ele); } else { - receiver->argc++; - robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target); if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; @@ -806,17 +816,10 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { /* Blocking RPOP/LPOP */ void blockingPopGenericCommand(redisClient *c, int where) { robj *o; - long long lltimeout; time_t timeout; int j; - /* Make sure timeout is an integer value */ - if (getLongLongFromObjectOrReply(c,c->argv[c->argc-1],&lltimeout, - "timeout is not an integer") != REDIS_OK) return; - - /* Make sure the timeout is not negative */ - if (lltimeout < 0) { - addReplyError(c,"timeout is negative"); + if (checkTimeout(c, c->argv[c->argc - 1], &timeout) != REDIS_OK) { return; } @@ -833,35 +836,27 @@ void blockingPopGenericCommand(redisClient *c, int where) { robj *argv[2], **orig_argv; int orig_argc; - if (c->bstate.target == NULL) { - /* We need to alter the command arguments before to call - * popGenericCommand() as the command takes a single key. */ - orig_argv = c->argv; - orig_argc = c->argc; - argv[1] = c->argv[j]; - c->argv = argv; - c->argc = 2; - - /* Also the return value is different, we need to output - * the multi bulk reply header and the key name. The - * "real" command will add the last element (the value) - * for us. If this souds like an hack to you it's just - * because it is... */ - addReplyMultiBulkLen(c,2); - addReplyBulk(c,argv[1]); - - popGenericCommand(c,where); - - /* Fix the client structure with the original stuff */ - c->argv = orig_argv; - c->argc = orig_argc; - } - else { - c->argv[2] = c->bstate.target; - c->bstate.target = NULL; - - rpoplpushCommand(c); - } + /* We need to alter the command arguments before to call + * popGenericCommand() as the command takes a single key. */ + orig_argv = c->argv; + orig_argc = c->argc; + argv[1] = c->argv[j]; + c->argv = argv; + c->argc = 2; + + /* Also the return value is different, we need to output + * the multi bulk reply header and the key name. The + * "real" command will add the last element (the value) + * for us. If this souds like an hack to you it's just + * because it is... */ + addReplyMultiBulkLen(c,2); + addReplyBulk(c,argv[1]); + + popGenericCommand(c,where); + + /* Fix the client structure with the original stuff */ + c->argv = orig_argv; + c->argc = orig_argc; return; } @@ -877,9 +872,26 @@ void blockingPopGenericCommand(redisClient *c, int where) { } /* If the list is empty or the key does not exists we must block */ - timeout = lltimeout; if (timeout > 0) timeout += time(NULL); - blockForKeys(c,c->argv+1,c->argc-2,timeout); + blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL); +} + +int checkTimeout(redisClient *c, robj *object, time_t *timeout) { + long long lltimeout; + + if (getLongLongFromObject(object, &lltimeout) != REDIS_OK) { + addReplyError(c, "timeout is not an integer"); + return REDIS_ERR; + } + + if (lltimeout < 0) { + addReplyError(c, "timeout is negative"); + return REDIS_ERR; + } + + *timeout = lltimeout; + + return REDIS_OK; } void blpopCommand(redisClient *c) { @@ -891,9 +903,28 @@ void brpopCommand(redisClient *c) { } void brpoplpushCommand(redisClient *c) { - c->bstate.target = c->argv[2]; - c->argv[2] = c->argv[3]; - c->argc--; + time_t timeout; - blockingPopGenericCommand(c,REDIS_TAIL); + if (checkTimeout(c, c->argv[3], &timeout) != REDIS_OK) { + return; + } + + robj *key = lookupKeyWrite(c->db, c->argv[1]); + + + if (key == NULL) { + // block + if (c->flags & REDIS_MULTI) { + addReply(c,shared.nullmultibulk); + } else { + if (timeout > 0) timeout += time(NULL); + blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]); + } + } else if (key->type != REDIS_LIST) { + addReply(c, shared.wrongtypeerr); + } else { + // The list exists and has elements. + redisAssert(listTypeLength(key) > 0); + rpoplpushCommand(c); + } } -- cgit v1.2.1 From 7c25a43adc67c4a8d08e930aa92f1c5575ec3646 Mon Sep 17 00:00:00 2001 From: Damian Janowski & Michel Martens Date: Tue, 9 Nov 2010 10:31:02 -0300 Subject: Handle BRPOPLPUSH inside a transaction. --- src/t_list.c | 62 ++++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 37 insertions(+), 25 deletions(-) (limited to 'src') diff --git a/src/t_list.c b/src/t_list.c index 2dedf4811..d14de708f 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -790,23 +790,28 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { receiver = ln->value; if (receiver->bstate.target == NULL) { - addReplyMultiBulkLen(receiver,2); - addReplyBulk(receiver,key); - addReplyBulk(receiver,ele); + /* BRPOP/BLPOP return a multi-bulk with the name + * of the popped list */ + addReplyMultiBulkLen(receiver,2); + addReplyBulk(receiver,key); + addReplyBulk(receiver,ele); } else { - robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target); - if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; + /* BRPOPLPUSH */ + robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target); + if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; - addReplyBulk(receiver,ele); + addReplyBulk(receiver,ele); - /* Create the list if the key does not exist */ - if (!dobj) { - dobj = createZiplistObject(); - dbAdd(receiver->db,receiver->bstate.target,dobj); - } + if (!handleClientsWaitingListPush(receiver, receiver->bstate.target, ele)) { + /* Create the list if the key does not exist */ + if (!dobj) { + dobj = createZiplistObject(); + dbAdd(receiver->db, receiver->bstate.target, dobj); + } - listTypePush(dobj,ele,REDIS_HEAD); + listTypePush(dobj, ele, REDIS_HEAD); + } } unblockClientWaitingData(receiver); @@ -880,13 +885,13 @@ int checkTimeout(redisClient *c, robj *object, time_t *timeout) { long long lltimeout; if (getLongLongFromObject(object, &lltimeout) != REDIS_OK) { - addReplyError(c, "timeout is not an integer"); - return REDIS_ERR; + addReplyError(c, "timeout is not an integer"); + return REDIS_ERR; } - + if (lltimeout < 0) { - addReplyError(c, "timeout is negative"); - return REDIS_ERR; + addReplyError(c, "timeout is negative"); + return REDIS_ERR; } *timeout = lltimeout; @@ -911,20 +916,27 @@ void brpoplpushCommand(redisClient *c) { robj *key = lookupKeyWrite(c->db, c->argv[1]); - if (key == NULL) { - // block if (c->flags & REDIS_MULTI) { - addReply(c,shared.nullmultibulk); + + /* Blocking against an empty list in a multi state + * returns immediately. */ + addReply(c, shared.nullmultibulk); } else { if (timeout > 0) timeout += time(NULL); + + /* The list is empty and the client blocks. */ blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]); } - } else if (key->type != REDIS_LIST) { - addReply(c, shared.wrongtypeerr); } else { - // The list exists and has elements. - redisAssert(listTypeLength(key) > 0); - rpoplpushCommand(c); + if (key->type != REDIS_LIST) { + addReply(c, shared.wrongtypeerr); + } else { + + /* The list exists and has elements, so + * the regular rpoplpushCommand is executed. */ + redisAssert(listTypeLength(key) > 0); + rpoplpushCommand(c); + } } } -- cgit v1.2.1 From 59bd44d1c883b52507358996d1436d466c377e60 Mon Sep 17 00:00:00 2001 From: Damian Janowski & Michel Martens Date: Tue, 9 Nov 2010 11:00:54 -0300 Subject: Remove warning. --- src/t_list.c | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) (limited to 'src') diff --git a/src/t_list.c b/src/t_list.c index d14de708f..7f12e4ac0 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -818,6 +818,24 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { return 1; } +int checkTimeout(redisClient *c, robj *object, time_t *timeout) { + long long lltimeout; + + if (getLongLongFromObject(object, &lltimeout) != REDIS_OK) { + addReplyError(c, "timeout is not an integer"); + return REDIS_ERR; + } + + if (lltimeout < 0) { + addReplyError(c, "timeout is negative"); + return REDIS_ERR; + } + + *timeout = lltimeout; + + return REDIS_OK; +} + /* Blocking RPOP/LPOP */ void blockingPopGenericCommand(redisClient *c, int where) { robj *o; @@ -881,24 +899,6 @@ void blockingPopGenericCommand(redisClient *c, int where) { blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL); } -int checkTimeout(redisClient *c, robj *object, time_t *timeout) { - long long lltimeout; - - if (getLongLongFromObject(object, &lltimeout) != REDIS_OK) { - addReplyError(c, "timeout is not an integer"); - return REDIS_ERR; - } - - if (lltimeout < 0) { - addReplyError(c, "timeout is negative"); - return REDIS_ERR; - } - - *timeout = lltimeout; - - return REDIS_OK; -} - void blpopCommand(redisClient *c) { blockingPopGenericCommand(c,REDIS_HEAD); } -- cgit v1.2.1 From e3c51c4b1bb60069bbd6552fe9109885b886aa86 Mon Sep 17 00:00:00 2001 From: Damian Janowski & Michel Martens Date: Tue, 9 Nov 2010 15:06:25 -0300 Subject: Rename bstate to bpop. --- src/networking.c | 10 +++++----- src/redis.h | 2 +- src/t_list.c | 38 +++++++++++++++++++------------------- 3 files changed, 25 insertions(+), 25 deletions(-) (limited to 'src') diff --git a/src/networking.c b/src/networking.c index 530e2ca87..7d14ac53f 100644 --- a/src/networking.c +++ b/src/networking.c @@ -41,10 +41,10 @@ redisClient *createClient(int fd) { c->reply = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); - c->bstate.keys = NULL; - c->bstate.count = 0; - c->bstate.timeout = 0; - c->bstate.target = NULL; + c->bpop.keys = NULL; + c->bpop.count = 0; + c->bpop.timeout = 0; + c->bpop.target = NULL; c->io_keys = listCreate(); c->watched_keys = listCreate(); listSetFreeMethod(c->io_keys,decrRefCount); @@ -679,7 +679,7 @@ void closeTimedoutClients(void) { redisLog(REDIS_VERBOSE,"Closing idle client"); freeClient(c); } else if (c->flags & REDIS_BLOCKED) { - if (c->bstate.timeout != 0 && c->bstate.timeout < now) { + if (c->bpop.timeout != 0 && c->bpop.timeout < now) { addReply(c,shared.nullmultibulk); unblockClientWaitingData(c); } diff --git a/src/redis.h b/src/redis.h index 0de94585b..f1142a5b3 100644 --- a/src/redis.h +++ b/src/redis.h @@ -326,7 +326,7 @@ typedef struct redisClient { long repldboff; /* replication DB file offset */ off_t repldbsize; /* replication DB file size */ multiState mstate; /* MULTI/EXEC state */ - blockingState bstate; /* blocking state */ + blockingState bpop; /* blocking state */ list *io_keys; /* Keys this client is waiting to be loaded from the * swap file in order to continue. */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ diff --git a/src/t_list.c b/src/t_list.c index 7f12e4ac0..f5739792a 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -694,10 +694,10 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj list *l; int j; - c->bstate.keys = zmalloc(sizeof(robj*)*numkeys); - c->bstate.count = numkeys; - c->bstate.timeout = timeout; - c->bstate.target = target; + c->bpop.keys = zmalloc(sizeof(robj*)*numkeys); + c->bpop.count = numkeys; + c->bpop.timeout = timeout; + c->bpop.target = target; if (target != NULL) { incrRefCount(target); @@ -705,7 +705,7 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj for (j = 0; j < numkeys; j++) { /* Add the key in the client structure, to map clients -> keys */ - c->bstate.keys[j] = keys[j]; + c->bpop.keys[j] = keys[j]; incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ @@ -734,28 +734,28 @@ void unblockClientWaitingData(redisClient *c) { list *l; int j; - redisAssert(c->bstate.keys != NULL); + redisAssert(c->bpop.keys != NULL); /* The client may wait for multiple keys, so unblock it for every key. */ - for (j = 0; j < c->bstate.count; j++) { + for (j = 0; j < c->bpop.count; j++) { /* Remove this client from the list of clients waiting for this key. */ - de = dictFind(c->db->blocking_keys,c->bstate.keys[j]); + de = dictFind(c->db->blocking_keys,c->bpop.keys[j]); redisAssert(de != NULL); l = dictGetEntryVal(de); listDelNode(l,listSearchKey(l,c)); /* If the list is empty we need to remove it to avoid wasting memory */ if (listLength(l) == 0) - dictDelete(c->db->blocking_keys,c->bstate.keys[j]); - decrRefCount(c->bstate.keys[j]); + dictDelete(c->db->blocking_keys,c->bpop.keys[j]); + decrRefCount(c->bpop.keys[j]); } - if (c->bstate.target != NULL) { - decrRefCount(c->bstate.target); + if (c->bpop.target != NULL) { + decrRefCount(c->bpop.target); } /* Cleanup the client structure */ - zfree(c->bstate.keys); - c->bstate.keys = NULL; - c->bstate.target = NULL; + zfree(c->bpop.keys); + c->bpop.keys = NULL; + c->bpop.target = NULL; c->flags &= (~REDIS_BLOCKED); server.blpop_blocked_clients--; /* We want to process data if there is some command waiting @@ -789,7 +789,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { redisAssert(ln != NULL); receiver = ln->value; - if (receiver->bstate.target == NULL) { + if (receiver->bpop.target == NULL) { /* BRPOP/BLPOP return a multi-bulk with the name * of the popped list */ addReplyMultiBulkLen(receiver,2); @@ -798,16 +798,16 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { } else { /* BRPOPLPUSH */ - robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target); + robj *dobj = lookupKeyWrite(receiver->db,receiver->bpop.target); if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; addReplyBulk(receiver,ele); - if (!handleClientsWaitingListPush(receiver, receiver->bstate.target, ele)) { + if (!handleClientsWaitingListPush(receiver, receiver->bpop.target, ele)) { /* Create the list if the key does not exist */ if (!dobj) { dobj = createZiplistObject(); - dbAdd(receiver->db, receiver->bstate.target, dobj); + dbAdd(receiver->db, receiver->bpop.target, dobj); } listTypePush(dobj, ele, REDIS_HEAD); -- cgit v1.2.1 From 8987bf23bfd7b4d8501db98f4ae7e37310f58fbd Mon Sep 17 00:00:00 2001 From: Damian Janowski & Michel Martens Date: Tue, 9 Nov 2010 15:16:09 -0300 Subject: Adhere to conventions. --- src/t_list.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src') diff --git a/src/t_list.c b/src/t_list.c index f5739792a..0da70a040 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -795,8 +795,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { addReplyMultiBulkLen(receiver,2); addReplyBulk(receiver,key); addReplyBulk(receiver,ele); - } - else { + } else { /* BRPOPLPUSH */ robj *dobj = lookupKeyWrite(receiver->db,receiver->bpop.target); if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; -- cgit v1.2.1 From baa14ef913032baf34645faeae80f5cd6895f97a Mon Sep 17 00:00:00 2001 From: Michel Martens & Damian Janowski Date: Mon, 29 Nov 2010 23:47:45 -0300 Subject: Fix BRPOPLPUSH behavior for all use cases. --- src/t_list.c | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) (limited to 'src') diff --git a/src/t_list.c b/src/t_list.c index 0da70a040..ceed70c31 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -748,10 +748,6 @@ void unblockClientWaitingData(redisClient *c) { decrRefCount(c->bpop.keys[j]); } - if (c->bpop.target != NULL) { - decrRefCount(c->bpop.target); - } - /* Cleanup the client structure */ zfree(c->bpop.keys); c->bpop.keys = NULL; @@ -789,7 +785,11 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { redisAssert(ln != NULL); receiver = ln->value; - if (receiver->bpop.target == NULL) { + robj *target = receiver->bpop.target; + + unblockClientWaitingData(receiver); + + if (target == NULL) { /* BRPOP/BLPOP return a multi-bulk with the name * of the popped list */ addReplyMultiBulkLen(receiver,2); @@ -797,23 +797,23 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { addReplyBulk(receiver,ele); } else { /* BRPOPLPUSH */ - robj *dobj = lookupKeyWrite(receiver->db,receiver->bpop.target); + robj *dobj = lookupKeyWrite(receiver->db,target); if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; addReplyBulk(receiver,ele); - if (!handleClientsWaitingListPush(receiver, receiver->bpop.target, ele)) { + if (!handleClientsWaitingListPush(receiver, target, ele)) { /* Create the list if the key does not exist */ if (!dobj) { dobj = createZiplistObject(); - dbAdd(receiver->db, receiver->bpop.target, dobj); + dbAdd(receiver->db, target, dobj); } - listTypePush(dobj, ele, REDIS_HEAD); } + + decrRefCount(target); } - unblockClientWaitingData(receiver); return 1; } -- cgit v1.2.1 From c8a0070a611011c706e67da52a7f17ebfc0c0c1a Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 6 Dec 2010 13:45:48 +0100 Subject: Move timeout logic --- src/t_list.c | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) (limited to 'src') diff --git a/src/t_list.c b/src/t_list.c index ceed70c31..a47ab65c8 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -817,20 +817,20 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { return 1; } -int checkTimeout(redisClient *c, robj *object, time_t *timeout) { - long long lltimeout; +int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) { + long tval; - if (getLongLongFromObject(object, &lltimeout) != REDIS_OK) { - addReplyError(c, "timeout is not an integer"); + if (getLongFromObjectOrReply(c,object,&tval, + "timeout is not an integer or out of range") != REDIS_OK) return REDIS_ERR; - } - if (lltimeout < 0) { - addReplyError(c, "timeout is negative"); + if (tval < 0) { + addReplyError(c,"timeout is negative"); return REDIS_ERR; } - *timeout = lltimeout; + if (tval > 0) tval += time(NULL); + *timeout = tval; return REDIS_OK; } @@ -841,9 +841,8 @@ void blockingPopGenericCommand(redisClient *c, int where) { time_t timeout; int j; - if (checkTimeout(c, c->argv[c->argc - 1], &timeout) != REDIS_OK) { + if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout) != REDIS_OK) return; - } for (j = 1; j < c->argc-1; j++) { o = lookupKeyWrite(c->db,c->argv[j]); @@ -894,7 +893,6 @@ void blockingPopGenericCommand(redisClient *c, int where) { } /* If the list is empty or the key does not exists we must block */ - if (timeout > 0) timeout += time(NULL); blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL); } @@ -909,9 +907,8 @@ void brpopCommand(redisClient *c) { void brpoplpushCommand(redisClient *c) { time_t timeout; - if (checkTimeout(c, c->argv[3], &timeout) != REDIS_OK) { + if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout) != REDIS_OK) return; - } robj *key = lookupKeyWrite(c->db, c->argv[1]); @@ -922,8 +919,6 @@ void brpoplpushCommand(redisClient *c) { * returns immediately. */ addReply(c, shared.nullmultibulk); } else { - if (timeout > 0) timeout += time(NULL); - /* The list is empty and the client blocks. */ blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]); } -- cgit v1.2.1 From 5fa95ad7639ae3f43e175d95a7d6384e4723b80e Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 6 Dec 2010 14:05:01 +0100 Subject: Rename blpop_blocked_clients to bpop_blocked_clients --- src/redis.c | 6 +++--- src/redis.h | 2 +- src/t_list.c | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/redis.c b/src/redis.c index 58a796c03..8a5f9632a 100644 --- a/src/redis.c +++ b/src/redis.c @@ -573,7 +573,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } /* Close connections of timedout clients */ - if ((server.maxidletime && !(loops % 100)) || server.blpop_blocked_clients) + if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients) closeTimedoutClients(); /* Check if a background saving or AOF rewrite in progress terminated */ @@ -759,7 +759,7 @@ void initServerConfig() { server.rdbcompression = 1; server.activerehashing = 1; server.maxclients = 0; - server.blpop_blocked_clients = 0; + server.bpop_blocked_clients = 0; server.maxmemory = 0; server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU; server.maxmemory_samples = 3; @@ -1170,7 +1170,7 @@ sds genRedisInfoString(void) { (float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000, listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), - server.blpop_blocked_clients, + server.bpop_blocked_clients, zmalloc_used_memory(), hmem, zmalloc_get_rss(), diff --git a/src/redis.h b/src/redis.h index f1142a5b3..27cb82598 100644 --- a/src/redis.h +++ b/src/redis.h @@ -433,7 +433,7 @@ struct redisServer { int maxmemory_policy; int maxmemory_samples; /* Blocked clients */ - unsigned int blpop_blocked_clients; + unsigned int bpop_blocked_clients; unsigned int vm_blocked_clients; /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ diff --git a/src/t_list.c b/src/t_list.c index a47ab65c8..b46b04943 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -725,7 +725,7 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj } /* Mark the client as a blocked client */ c->flags |= REDIS_BLOCKED; - server.blpop_blocked_clients++; + server.bpop_blocked_clients++; } /* Unblock a client that's waiting in a blocking operation such as BLPOP */ @@ -753,7 +753,7 @@ void unblockClientWaitingData(redisClient *c) { c->bpop.keys = NULL; c->bpop.target = NULL; c->flags &= (~REDIS_BLOCKED); - server.blpop_blocked_clients--; + server.bpop_blocked_clients--; /* We want to process data if there is some command waiting * in the input buffer. Note that this is safe even if * unblockClientWaitingData() gets called from freeClient() because -- cgit v1.2.1 From ac06fc011df598372232a5dc1805683004240c0d Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 6 Dec 2010 14:48:58 +0100 Subject: Move code for pushing on a (blocking) RPOPLPUSH --- src/t_list.c | 60 +++++++++++++++++++++++++++--------------------------------- 1 file changed, 27 insertions(+), 33 deletions(-) (limited to 'src') diff --git a/src/t_list.c b/src/t_list.c index b46b04943..3ce0f992f 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -605,19 +605,37 @@ void lremCommand(redisClient *c) { /* This is the semantic of this command: * RPOPLPUSH srclist dstlist: - * IF LLEN(srclist) > 0 - * element = RPOP srclist - * LPUSH dstlist element - * RETURN element - * ELSE - * RETURN nil - * END + * IF LLEN(srclist) > 0 + * element = RPOP srclist + * LPUSH dstlist element + * RETURN element + * ELSE + * RETURN nil + * END * END * * The idea is to be able to get an element from a list in a reliable way * since the element is not just returned but pushed against another list * as well. This command was originally proposed by Ezra Zygmuntowicz. */ + +void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) { + if (!handleClientsWaitingListPush(c,dstkey,value)) { + /* Create the list if the key does not exist */ + if (!dstobj) { + dstobj = createZiplistObject(); + dbAdd(c->db,dstkey,dstobj); + } else { + touchWatchedKey(c->db,dstkey); + server.dirty++; + } + listTypePush(dstobj,value,REDIS_HEAD); + } + + /* Always send the pushed value to the client. */ + addReplyBulk(c,value); +} + void rpoplpushCommand(redisClient *c) { robj *sobj, *value; if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || @@ -629,20 +647,7 @@ void rpoplpushCommand(redisClient *c) { robj *dobj = lookupKeyWrite(c->db,c->argv[2]); if (dobj && checkType(c,dobj,REDIS_LIST)) return; value = listTypePop(sobj,REDIS_TAIL); - - /* Add the element to the target list (unless it's directly - * passed to some BLPOP-ing client */ - if (!handleClientsWaitingListPush(c,c->argv[2],value)) { - /* Create the list if the key does not exist */ - if (!dobj) { - dobj = createZiplistObject(); - dbAdd(c->db,c->argv[2],dobj); - } - listTypePush(dobj,value,REDIS_HEAD); - } - - /* Send the element to the client as reply as well */ - addReplyBulk(c,value); + rpoplpushHandlePush(c,c->argv[2],dobj,value); /* listTypePop returns an object with its refcount incremented */ decrRefCount(value); @@ -799,18 +804,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { /* BRPOPLPUSH */ robj *dobj = lookupKeyWrite(receiver->db,target); if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; - - addReplyBulk(receiver,ele); - - if (!handleClientsWaitingListPush(receiver, target, ele)) { - /* Create the list if the key does not exist */ - if (!dobj) { - dobj = createZiplistObject(); - dbAdd(receiver->db, target, dobj); - } - listTypePush(dobj, ele, REDIS_HEAD); - } - + rpoplpushHandlePush(receiver,target,dobj,ele); decrRefCount(target); } -- cgit v1.2.1 From 8a88c368edbc12540eee3d129b8a017bd6a84cac Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 6 Dec 2010 16:04:10 +0100 Subject: Check other blocked clients when value could not be pushed --- src/t_list.c | 64 ++++++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 41 insertions(+), 23 deletions(-) (limited to 'src') diff --git a/src/t_list.c b/src/t_list.c index 3ce0f992f..866a6a3e2 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -780,35 +780,53 @@ void unblockClientWaitingData(redisClient *c) { int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { struct dictEntry *de; redisClient *receiver; - list *l; + int numclients; + list *clients; listNode *ln; + robj *dstkey, *dstobj; de = dictFind(c->db->blocking_keys,key); if (de == NULL) return 0; - l = dictGetEntryVal(de); - ln = listFirst(l); - redisAssert(ln != NULL); - receiver = ln->value; - - robj *target = receiver->bpop.target; - - unblockClientWaitingData(receiver); - - if (target == NULL) { - /* BRPOP/BLPOP return a multi-bulk with the name - * of the popped list */ - addReplyMultiBulkLen(receiver,2); - addReplyBulk(receiver,key); - addReplyBulk(receiver,ele); - } else { - /* BRPOPLPUSH */ - robj *dobj = lookupKeyWrite(receiver->db,target); - if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; - rpoplpushHandlePush(receiver,target,dobj,ele); - decrRefCount(target); + clients = dictGetEntryVal(de); + numclients = listLength(clients); + + /* Try to handle the push as long as there are clients waiting for a push. + * Note that "numclients" is used because the list of clients waiting for a + * push on "key" is deleted by unblockClient() when empty. + * + * This loop will have more than 1 iteration when there is a BRPOPLPUSH + * that cannot push the target list because it does not contain a list. If + * this happens, it simply tries the next client waiting for a push. */ + while (numclients--) { + ln = listFirst(clients); + redisAssert(ln != NULL); + receiver = ln->value; + dstkey = receiver->bpop.target; + + /* This should remove the first element of the "clients" list. */ + unblockClientWaitingData(receiver); + redisAssert(ln != listFirst(clients)); + + if (dstkey == NULL) { + /* BRPOP/BLPOP */ + addReplyMultiBulkLen(receiver,2); + addReplyBulk(receiver,key); + addReplyBulk(receiver,ele); + return 1; + } else { + /* BRPOPLPUSH */ + dstobj = lookupKeyWrite(receiver->db,dstkey); + if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) { + decrRefCount(dstkey); + } else { + rpoplpushHandlePush(receiver,dstkey,dstobj,ele); + decrRefCount(dstkey); + return 1; + } + } } - return 1; + return 0; } int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) { -- cgit v1.2.1 From ecf940141501e47dcc8dfecbc84a4e3f6ee7b0d3 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 6 Dec 2010 16:04:42 +0100 Subject: Fix case and indent --- src/t_list.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/t_list.c b/src/t_list.c index 866a6a3e2..867e258a1 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -705,7 +705,7 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj c->bpop.target = target; if (target != NULL) { - incrRefCount(target); + incrRefCount(target); } for (j = 0; j < numkeys; j++) { -- cgit v1.2.1 From a4ce7581553b1f4e29a7ed2141add788e56142c5 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 6 Dec 2010 16:39:39 +0100 Subject: Don't execute commands for clients when they are unblocked --- src/redis.c | 19 +++++++++++++++++-- src/redis.h | 1 + src/t_list.c | 7 +------ 3 files changed, 19 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/redis.c b/src/redis.c index 8a5f9632a..a1653c36e 100644 --- a/src/redis.c +++ b/src/redis.c @@ -646,15 +646,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * for ready file descriptors. */ void beforeSleep(struct aeEventLoop *eventLoop) { REDIS_NOTUSED(eventLoop); + listNode *ln; + redisClient *c; /* Awake clients that got all the swapped keys they requested */ if (server.vm_enabled && listLength(server.io_ready_clients)) { listIter li; - listNode *ln; listRewind(server.io_ready_clients,&li); while((ln = listNext(&li))) { - redisClient *c = ln->value; + c = ln->value; struct redisCommand *cmd; /* Resume the client. */ @@ -672,6 +673,19 @@ void beforeSleep(struct aeEventLoop *eventLoop) { processInputBuffer(c); } } + + /* Try to process pending commands for clients that were just unblocked. */ + while (listLength(server.unblocked_clients)) { + ln = listFirst(server.unblocked_clients); + redisAssert(ln != NULL); + c = ln->value; + listDelNode(server.unblocked_clients,ln); + + /* Process remaining data in the input buffer. */ + if (c->querybuf && sdslen(c->querybuf) > 0) + processInputBuffer(c); + } + /* Write the AOF buffer on disk */ flushAppendOnlyFile(); } @@ -818,6 +832,7 @@ void initServer() { server.clients = listCreate(); server.slaves = listCreate(); server.monitors = listCreate(); + server.unblocked_clients = listCreate(); createSharedObjects(); server.el = aeCreateEventLoop(); server.db = zmalloc(sizeof(redisDb)*server.dbnum); diff --git a/src/redis.h b/src/redis.h index 27cb82598..3639f0623 100644 --- a/src/redis.h +++ b/src/redis.h @@ -435,6 +435,7 @@ struct redisServer { /* Blocked clients */ unsigned int bpop_blocked_clients; unsigned int vm_blocked_clients; + list *unblocked_clients; /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; diff --git a/src/t_list.c b/src/t_list.c index 867e258a1..7dc3f1393 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -759,12 +759,7 @@ void unblockClientWaitingData(redisClient *c) { c->bpop.target = NULL; c->flags &= (~REDIS_BLOCKED); server.bpop_blocked_clients--; - /* We want to process data if there is some command waiting - * in the input buffer. Note that this is safe even if - * unblockClientWaitingData() gets called from freeClient() because - * freeClient() will be smart enough to call this function - * *after* c->querybuf was set to NULL. */ - if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c); + listAddNodeTail(server.unblocked_clients,c); } /* This should be called from any function PUSHing into lists. -- cgit v1.2.1