summaryrefslogtreecommitdiff
path: root/src/blocked.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/blocked.c')
-rw-r--r--src/blocked.c292
1 files changed, 288 insertions, 4 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 54b26b713..f438c3353 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -65,6 +65,8 @@
#include "server.h"
+int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where);
+
/* Get a timeout value from an object and store it into 'timeout'.
* The final timeout is always stored as milliseconds as a time where the
* timeout will expire, however the parsing is performed according to
@@ -100,7 +102,8 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int
void blockClient(client *c, int btype) {
c->flags |= CLIENT_BLOCKED;
c->btype = btype;
- server.bpop_blocked_clients++;
+ server.blocked_clients++;
+ server.blocked_clients_by_type[btype]++;
}
/* This function is called in the beforeSleep() function of the event loop
@@ -132,7 +135,7 @@ void processUnblockedClients(void) {
/* Unblock a client calling the right function depending on the kind
* of operation the client is blocking for. */
void unblockClient(client *c) {
- if (c->btype == BLOCKED_LIST) {
+ if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) {
unblockClientWaitingData(c);
} else if (c->btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
@@ -143,9 +146,10 @@ void unblockClient(client *c) {
}
/* Clear the flags, and put the client in the unblocked list so that
* we'll process new commands in its query buffer ASAP. */
+ server.blocked_clients--;
+ server.blocked_clients_by_type[c->btype]--;
c->flags &= ~CLIENT_BLOCKED;
c->btype = BLOCKED_NONE;
- server.bpop_blocked_clients--;
/* The client may already be into the unblocked list because of a previous
* blocking operation, don't add back it into the list multiple times. */
if (!(c->flags & CLIENT_UNBLOCKED)) {
@@ -158,7 +162,7 @@ void unblockClient(client *c) {
* send it a reply of some kind. After this function is called,
* unblockClient() will be called with the same client as argument. */
void replyToBlockedClientTimedOut(client *c) {
- if (c->btype == BLOCKED_LIST) {
+ if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) {
addReply(c,shared.nullmultibulk);
} else if (c->btype == BLOCKED_WAIT) {
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
@@ -193,3 +197,283 @@ void disconnectAllBlockedClients(void) {
}
}
}
+
+/* This function should be called by Redis every time a single command,
+ * a MULTI/EXEC block, or a Lua script, terminated its execution after
+ * being called by a client.
+ *
+ * All the keys with at least one client blocked that received at least
+ * one new element via some PUSH/XADD operation are accumulated into
+ * the server.ready_keys list. This function will run the list and will
+ * serve clients accordingly. Note that the function will iterate again and
+ * again as a result of serving BRPOPLPUSH we can have new blocking clients
+ * to serve because of the PUSH side of BRPOPLPUSH. */
+void handleClientsBlockedOnKeys(void) {
+ while(listLength(server.ready_keys) != 0) {
+ list *l;
+
+ /* Point server.ready_keys to a fresh list and save the current one
+ * locally. This way as we run the old list we are free to call
+ * signalKeyAsReady() that may push new elements in server.ready_keys
+ * when handling clients blocked into BRPOPLPUSH. */
+ l = server.ready_keys;
+ server.ready_keys = listCreate();
+
+ while(listLength(l) != 0) {
+ listNode *ln = listFirst(l);
+ readyList *rl = ln->value;
+
+ /* First of all remove this key from db->ready_keys so that
+ * we can safely call signalKeyAsReady() against this key. */
+ dictDelete(rl->db->ready_keys,rl->key);
+
+ /* Serve clients blocked on list key. */
+ robj *o = lookupKeyWrite(rl->db,rl->key);
+ if (o != NULL && o->type == OBJ_LIST) {
+ dictEntry *de;
+
+ /* We serve clients in the same order they blocked for
+ * this key, from the first blocked to the last. */
+ de = dictFind(rl->db->blocking_keys,rl->key);
+ if (de) {
+ list *clients = dictGetVal(de);
+ int numclients = listLength(clients);
+
+ while(numclients--) {
+ listNode *clientnode = listFirst(clients);
+ client *receiver = clientnode->value;
+
+ if (receiver->btype != BLOCKED_LIST) {
+ /* Put on the tail, so that at the next call
+ * we'll not run into it again. */
+ listDelNode(clients,clientnode);
+ listAddNodeTail(clients,receiver);
+ continue;
+ }
+
+ robj *dstkey = receiver->bpop.target;
+ int where = (receiver->lastcmd &&
+ receiver->lastcmd->proc == blpopCommand) ?
+ LIST_HEAD : LIST_TAIL;
+ robj *value = listTypePop(o,where);
+
+ if (value) {
+ /* Protect receiver->bpop.target, that will be
+ * freed by the next unblockClient()
+ * call. */
+ if (dstkey) incrRefCount(dstkey);
+ unblockClient(receiver);
+
+ if (serveClientBlockedOnList(receiver,
+ rl->key,dstkey,rl->db,value,
+ where) == C_ERR)
+ {
+ /* If we failed serving the client we need
+ * to also undo the POP operation. */
+ listTypePush(o,value,where);
+ }
+
+ if (dstkey) decrRefCount(dstkey);
+ decrRefCount(value);
+ } else {
+ break;
+ }
+ }
+ }
+
+ if (listTypeLength(o) == 0) {
+ dbDelete(rl->db,rl->key);
+ }
+ /* We don't call signalModifiedKey() as it was already called
+ * when an element was pushed on the list. */
+ }
+
+ /* Serve clients blocked on stream key. */
+ else if (o != NULL && o->type == OBJ_STREAM) {
+ dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
+ stream *s = o->ptr;
+
+ /* We need to provide the new data arrived on the stream
+ * to all the clients that are waiting for an offset smaller
+ * than the current top item. */
+ if (de) {
+ list *clients = dictGetVal(de);
+ listNode *ln;
+ listIter li;
+ listRewind(clients,&li);
+
+ while((ln = listNext(&li))) {
+ client *receiver = listNodeValue(ln);
+ if (receiver->btype != BLOCKED_STREAM) continue;
+ streamID *gt = dictFetchValue(receiver->bpop.keys,
+ rl->key);
+ if (s->last_id.ms > gt->ms ||
+ (s->last_id.ms == gt->ms &&
+ s->last_id.seq > gt->seq))
+ {
+ streamID start = *gt;
+ start.seq++; /* Can't overflow, it's an uint64_t */
+ /* Note that after we unblock the client, 'gt'
+ * is no longer valid, so we must do it after
+ * we copied the ID into the 'start' variable. */
+ unblockClient(receiver);
+
+ /* Emit the two elements sub-array consisting of
+ * the name of the stream and the data we
+ * extracted from it. Wrapped in a single-item
+ * array, since we have just one key. */
+ addReplyMultiBulkLen(receiver,1);
+ addReplyMultiBulkLen(receiver,2);
+ addReplyBulk(receiver,rl->key);
+ streamReplyWithRange(receiver,s,&start,NULL,
+ receiver->bpop.xread_count,0);
+ }
+ }
+ }
+ }
+
+ /* Free this item. */
+ decrRefCount(rl->key);
+ zfree(rl);
+ listDelNode(l,ln);
+ }
+ listRelease(l); /* We have the new list on place at this point. */
+ }
+}
+
+/* This is how the current blocking lists/streams work, we use BLPOP as
+ * example, but the concept is the same for other list ops and XREAD.
+ * - If the user calls BLPOP and the key exists and contains a non empty list
+ * then LPOP is called instead. So BLPOP is semantically the same as LPOP
+ * if blocking is not required.
+ * - If instead BLPOP is called and the key does not exists or the list is
+ * empty we need to block. In order to do so we remove the notification for
+ * new data to read in the client socket (so that we'll not serve new
+ * requests if the blocking request is not served). Also we put the client
+ * in a dictionary (db->blocking_keys) mapping keys to a list of clients
+ * blocking for this keys.
+ * - If a PUSH operation against a key with blocked clients waiting is
+ * performed, we mark this key as "ready", and after the current command,
+ * MULTI/EXEC block, or script, is executed, we serve all the clients waiting
+ * for this list, from the one that blocked first, to the last, accordingly
+ * to the number of elements we have in the ready list.
+ */
+
+/* Set a client in blocking mode for the specified key (list or stream), with
+ * the specified timeout. The 'type' argument is BLOCKED_LIST or BLOCKED_STREAM
+ * depending on the kind of operation we are waiting for an empty key in
+ * order to awake the client. The client is blocked for all the 'numkeys'
+ * keys as in the 'keys' argument. When we block for stream keys, we also
+ * provide an array of streamID structures: clients will be unblocked only
+ * when items with an ID greater or equal to the specified one is appended
+ * to the stream. */
+void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) {
+ dictEntry *de;
+ list *l;
+ int j;
+
+ c->bpop.timeout = timeout;
+ c->bpop.target = target;
+
+ if (target != NULL) incrRefCount(target);
+
+ for (j = 0; j < numkeys; j++) {
+ /* The value associated with the key name in the bpop.keys dictionary
+ * is NULL for lists, or the stream ID for streams. */
+ void *key_data = NULL;
+ if (btype == BLOCKED_STREAM) {
+ key_data = zmalloc(sizeof(streamID));
+ memcpy(key_data,ids+j,sizeof(streamID));
+ }
+
+ /* If the key already exists in the dictionary ignore it. */
+ if (dictAdd(c->bpop.keys,keys[j],key_data) != DICT_OK) {
+ zfree(key_data);
+ continue;
+ }
+ incrRefCount(keys[j]);
+
+ /* And in the other "side", to map keys -> clients */
+ de = dictFind(c->db->blocking_keys,keys[j]);
+ if (de == NULL) {
+ int retval;
+
+ /* For every key we take a list of clients blocked for it */
+ l = listCreate();
+ retval = dictAdd(c->db->blocking_keys,keys[j],l);
+ incrRefCount(keys[j]);
+ serverAssertWithInfo(c,keys[j],retval == DICT_OK);
+ } else {
+ l = dictGetVal(de);
+ }
+ listAddNodeTail(l,c);
+ }
+ blockClient(c,btype);
+}
+
+/* Unblock a client that's waiting in a blocking operation such as BLPOP.
+ * You should never call this function directly, but unblockClient() instead. */
+void unblockClientWaitingData(client *c) {
+ dictEntry *de;
+ dictIterator *di;
+ list *l;
+
+ serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);
+ di = dictGetIterator(c->bpop.keys);
+ /* The client may wait for multiple keys, so unblock it for every key. */
+ while((de = dictNext(di)) != NULL) {
+ robj *key = dictGetKey(de);
+
+ /* Remove this client from the list of clients waiting for this key. */
+ l = dictFetchValue(c->db->blocking_keys,key);
+ serverAssertWithInfo(c,key,l != NULL);
+ listDelNode(l,listSearchKey(l,c));
+ /* If the list is empty we need to remove it to avoid wasting memory */
+ if (listLength(l) == 0)
+ dictDelete(c->db->blocking_keys,key);
+ }
+ dictReleaseIterator(di);
+
+ /* Cleanup the client structure */
+ dictEmpty(c->bpop.keys,NULL);
+ if (c->bpop.target) {
+ decrRefCount(c->bpop.target);
+ c->bpop.target = NULL;
+ }
+ if (c->bpop.xread_group) {
+ decrRefCount(c->bpop.xread_group);
+ c->bpop.xread_group = NULL;
+ }
+}
+
+/* If the specified key has clients blocked waiting for list pushes, this
+ * function will put the key reference into the server.ready_keys list.
+ * Note that db->ready_keys is a hash table that allows us to avoid putting
+ * the same key again and again in the list in case of multiple pushes
+ * made by a script or in the context of MULTI/EXEC.
+ *
+ * The list will be finally processed by handleClientsBlockedOnLists() */
+void signalKeyAsReady(redisDb *db, robj *key) {
+ readyList *rl;
+
+ /* No clients blocking for this key? No need to queue it. */
+ if (dictFind(db->blocking_keys,key) == NULL) return;
+
+ /* Key was already signaled? No need to queue it again. */
+ if (dictFind(db->ready_keys,key) != NULL) return;
+
+ /* Ok, we need to queue this key into server.ready_keys. */
+ rl = zmalloc(sizeof(*rl));
+ rl->key = key;
+ rl->db = db;
+ incrRefCount(key);
+ listAddNodeTail(server.ready_keys,rl);
+
+ /* We also add the key in the db->ready_keys dictionary in order
+ * to avoid adding it multiple times into a list with a simple O(1)
+ * check. */
+ incrRefCount(key);
+ serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
+}
+
+