summaryrefslogtreecommitdiff
path: root/src/blocked.c
diff options
context:
space:
mode:
authorItamar Haber <itamar@redislabs.com>2018-04-30 02:10:42 +0300
committerItamar Haber <itamar@redislabs.com>2018-04-30 02:10:42 +0300
commit438125b47c3b2b94e67c80bf64a7d8b6207b236a (patch)
tree17aa868be731d92a1d64cf0837979cef81fbb579 /src/blocked.c
parente6b0e8d9ec4561a07864358af8d2d4e81ac413fc (diff)
downloadredis-438125b47c3b2b94e67c80bf64a7d8b6207b236a.tar.gz
Implements [B]Z[REV]POP and the respective unit tests
An implementation of the [Ze POP Redis Module](https://github.com/itamarhaber/zpop) as core Redis commands. Fixes #1861.
Diffstat (limited to 'src/blocked.c')
-rw-r--r--src/blocked.c70
1 files changed, 56 insertions, 14 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 1d59ee16a..61fd9fa87 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -135,7 +135,9 @@ void processUnblockedClients(void) {
/* Unblock a client calling the right function depending on the kind
* of operation the client is blocking for. */
void unblockClient(client *c) {
- if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) {
+ if (c->btype == BLOCKED_LIST ||
+ c->btype == BLOCKED_ZSET ||
+ c->btype == BLOCKED_STREAM) {
unblockClientWaitingData(c);
} else if (c->btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
@@ -162,7 +164,9 @@ void unblockClient(client *c) {
* send it a reply of some kind. After this function is called,
* unblockClient() will be called with the same client as argument. */
void replyToBlockedClientTimedOut(client *c) {
- if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) {
+ if (c->btype == BLOCKED_LIST ||
+ c->btype == BLOCKED_ZSET ||
+ c->btype == BLOCKED_STREAM) {
addReply(c,shared.nullmultibulk);
} else if (c->btype == BLOCKED_WAIT) {
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
@@ -244,7 +248,7 @@ void handleClientsBlockedOnKeys(void) {
client *receiver = clientnode->value;
if (receiver->btype != BLOCKED_LIST) {
- /* Put on the tail, so that at the next call
+ /* Put at the tail, so that at the next call
* we'll not run into it again. */
listDelNode(clients,clientnode);
listAddNodeTail(clients,receiver);
@@ -289,6 +293,43 @@ void handleClientsBlockedOnKeys(void) {
* when an element was pushed on the list. */
}
+ /* Serve clients blocked on sorted set key. */
+ else if (o != NULL && o->type == OBJ_ZSET) {
+ dictEntry *de;
+
+ /* We serve clients in the same order they blocked for
+ * this key, from the first blocked to the last. */
+ de = dictFind(rl->db->blocking_keys,rl->key);
+ if (de) {
+ list *clients = dictGetVal(de);
+ int numclients = listLength(clients);
+
+ while(numclients--) {
+ listNode *clientnode = listFirst(clients);
+ client *receiver = clientnode->value;
+
+ if (receiver->btype != BLOCKED_ZSET) {
+ /* Put at the tail, so that at the next call
+ * we'll not run into it again. */
+ listDelNode(clients,clientnode);
+ listAddNodeTail(clients,receiver);
+ continue;
+ }
+
+ int reverse = (receiver->lastcmd &&
+ receiver->lastcmd->proc == bzpopCommand) ?
+ 0 : 1;
+ unblockClient(receiver);
+ genericZpopCommand(receiver,&rl->key,1,reverse);
+
+ propagate(reverse ?
+ server.zrevpopCommand : server.zpopCommand,
+ receiver->db->id,receiver->argv,receiver->argc,
+ PROPAGATE_AOF|PROPAGATE_REPL);
+ }
+ }
+ }
+
/* Serve clients blocked on stream key. */
else if (o != NULL && o->type == OBJ_STREAM) {
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
@@ -371,8 +412,9 @@ void handleClientsBlockedOnKeys(void) {
}
}
-/* This is how the current blocking lists/streams work, we use BLPOP as
- * example, but the concept is the same for other list ops and XREAD.
+/* This is how the current blocking lists/sorted sets/streams work, we use
+ * BLPOP as example, but the concept is the same for other list ops, sorted
+ * sets and XREAD.
* - If the user calls BLPOP and the key exists and contains a non empty list
* then LPOP is called instead. So BLPOP is semantically the same as LPOP
* if blocking is not required.
@@ -389,14 +431,14 @@ void handleClientsBlockedOnKeys(void) {
* to the number of elements we have in the ready list.
*/
-/* Set a client in blocking mode for the specified key (list or stream), with
- * the specified timeout. The 'type' argument is BLOCKED_LIST or BLOCKED_STREAM
- * depending on the kind of operation we are waiting for an empty key in
- * order to awake the client. The client is blocked for all the 'numkeys'
- * keys as in the 'keys' argument. When we block for stream keys, we also
- * provide an array of streamID structures: clients will be unblocked only
- * when items with an ID greater or equal to the specified one is appended
- * to the stream. */
+/* Set a client in blocking mode for the specified key (list, zset or stream),
+ * with the specified timeout. The 'type' argument is BLOCKED_LIST,
+ * BLOCKED_ZSET or BLOCKED_STREAM depending on the kind of operation we are
+ * waiting for an empty key in order to awake the client. The client is blocked
+ * for all the 'numkeys' keys as in the 'keys' argument. When we block for
+ * stream keys, we also provide an array of streamID structures: clients will
+ * be unblocked only when items with an ID greater or equal to the specified
+ * one is appended to the stream. */
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) {
dictEntry *de;
list *l;
@@ -409,7 +451,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
for (j = 0; j < numkeys; j++) {
/* The value associated with the key name in the bpop.keys dictionary
- * is NULL for lists, or the stream ID for streams. */
+ * is NULL for lists and sorted sets, or the stream ID for streams. */
void *key_data = NULL;
if (btype == BLOCKED_STREAM) {
key_data = zmalloc(sizeof(streamID));