diff options
author | Itamar Haber <itamar@redislabs.com> | 2018-04-30 02:10:42 +0300 |
---|---|---|
committer | Itamar Haber <itamar@redislabs.com> | 2018-04-30 02:10:42 +0300 |
commit | 438125b47c3b2b94e67c80bf64a7d8b6207b236a (patch) | |
tree | 17aa868be731d92a1d64cf0837979cef81fbb579 /src/blocked.c | |
parent | e6b0e8d9ec4561a07864358af8d2d4e81ac413fc (diff) | |
download | redis-438125b47c3b2b94e67c80bf64a7d8b6207b236a.tar.gz |
Implements [B]Z[REV]POP and the respective unit tests
An implementation of the
[Ze POP Redis Module](https://github.com/itamarhaber/zpop) as core
Redis commands.
Fixes #1861.
Diffstat (limited to 'src/blocked.c')
-rw-r--r-- | src/blocked.c | 70 |
1 files changed, 56 insertions, 14 deletions
diff --git a/src/blocked.c b/src/blocked.c index 1d59ee16a..61fd9fa87 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -135,7 +135,9 @@ void processUnblockedClients(void) { /* Unblock a client calling the right function depending on the kind * of operation the client is blocking for. */ void unblockClient(client *c) { - if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) { + if (c->btype == BLOCKED_LIST || + c->btype == BLOCKED_ZSET || + c->btype == BLOCKED_STREAM) { unblockClientWaitingData(c); } else if (c->btype == BLOCKED_WAIT) { unblockClientWaitingReplicas(c); @@ -162,7 +164,9 @@ void unblockClient(client *c) { * send it a reply of some kind. After this function is called, * unblockClient() will be called with the same client as argument. */ void replyToBlockedClientTimedOut(client *c) { - if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) { + if (c->btype == BLOCKED_LIST || + c->btype == BLOCKED_ZSET || + c->btype == BLOCKED_STREAM) { addReply(c,shared.nullmultibulk); } else if (c->btype == BLOCKED_WAIT) { addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset)); @@ -244,7 +248,7 @@ void handleClientsBlockedOnKeys(void) { client *receiver = clientnode->value; if (receiver->btype != BLOCKED_LIST) { - /* Put on the tail, so that at the next call + /* Put at the tail, so that at the next call * we'll not run into it again. */ listDelNode(clients,clientnode); listAddNodeTail(clients,receiver); @@ -289,6 +293,43 @@ void handleClientsBlockedOnKeys(void) { * when an element was pushed on the list. */ } + /* Serve clients blocked on sorted set key. */ + else if (o != NULL && o->type == OBJ_ZSET) { + dictEntry *de; + + /* We serve clients in the same order they blocked for + * this key, from the first blocked to the last. */ + de = dictFind(rl->db->blocking_keys,rl->key); + if (de) { + list *clients = dictGetVal(de); + int numclients = listLength(clients); + + while(numclients--) { + listNode *clientnode = listFirst(clients); + client *receiver = clientnode->value; + + if (receiver->btype != BLOCKED_ZSET) { + /* Put at the tail, so that at the next call + * we'll not run into it again. */ + listDelNode(clients,clientnode); + listAddNodeTail(clients,receiver); + continue; + } + + int reverse = (receiver->lastcmd && + receiver->lastcmd->proc == bzpopCommand) ? + 0 : 1; + unblockClient(receiver); + genericZpopCommand(receiver,&rl->key,1,reverse); + + propagate(reverse ? + server.zrevpopCommand : server.zpopCommand, + receiver->db->id,receiver->argv,receiver->argc, + PROPAGATE_AOF|PROPAGATE_REPL); + } + } + } + /* Serve clients blocked on stream key. */ else if (o != NULL && o->type == OBJ_STREAM) { dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); @@ -371,8 +412,9 @@ void handleClientsBlockedOnKeys(void) { } } -/* This is how the current blocking lists/streams work, we use BLPOP as - * example, but the concept is the same for other list ops and XREAD. +/* This is how the current blocking lists/sorted sets/streams work, we use + * BLPOP as example, but the concept is the same for other list ops, sorted + * sets and XREAD. * - If the user calls BLPOP and the key exists and contains a non empty list * then LPOP is called instead. So BLPOP is semantically the same as LPOP * if blocking is not required. @@ -389,14 +431,14 @@ void handleClientsBlockedOnKeys(void) { * to the number of elements we have in the ready list. */ -/* Set a client in blocking mode for the specified key (list or stream), with - * the specified timeout. The 'type' argument is BLOCKED_LIST or BLOCKED_STREAM - * depending on the kind of operation we are waiting for an empty key in - * order to awake the client. The client is blocked for all the 'numkeys' - * keys as in the 'keys' argument. When we block for stream keys, we also - * provide an array of streamID structures: clients will be unblocked only - * when items with an ID greater or equal to the specified one is appended - * to the stream. */ +/* Set a client in blocking mode for the specified key (list, zset or stream), + * with the specified timeout. The 'type' argument is BLOCKED_LIST, + * BLOCKED_ZSET or BLOCKED_STREAM depending on the kind of operation we are + * waiting for an empty key in order to awake the client. The client is blocked + * for all the 'numkeys' keys as in the 'keys' argument. When we block for + * stream keys, we also provide an array of streamID structures: clients will + * be unblocked only when items with an ID greater or equal to the specified + * one is appended to the stream. */ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) { dictEntry *de; list *l; @@ -409,7 +451,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo for (j = 0; j < numkeys; j++) { /* The value associated with the key name in the bpop.keys dictionary - * is NULL for lists, or the stream ID for streams. */ + * is NULL for lists and sorted sets, or the stream ID for streams. */ void *key_data = NULL; if (btype == BLOCKED_STREAM) { key_data = zmalloc(sizeof(streamID)); |