diff options
Diffstat (limited to 'src/blocked.c')
-rw-r--r-- | src/blocked.c | 292 |
1 files changed, 288 insertions, 4 deletions
diff --git a/src/blocked.c b/src/blocked.c index 54b26b713..f438c3353 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -65,6 +65,8 @@ #include "server.h" +int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where); + /* Get a timeout value from an object and store it into 'timeout'. * The final timeout is always stored as milliseconds as a time where the * timeout will expire, however the parsing is performed according to @@ -100,7 +102,8 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int void blockClient(client *c, int btype) { c->flags |= CLIENT_BLOCKED; c->btype = btype; - server.bpop_blocked_clients++; + server.blocked_clients++; + server.blocked_clients_by_type[btype]++; } /* This function is called in the beforeSleep() function of the event loop @@ -132,7 +135,7 @@ void processUnblockedClients(void) { /* Unblock a client calling the right function depending on the kind * of operation the client is blocking for. */ void unblockClient(client *c) { - if (c->btype == BLOCKED_LIST) { + if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) { unblockClientWaitingData(c); } else if (c->btype == BLOCKED_WAIT) { unblockClientWaitingReplicas(c); @@ -143,9 +146,10 @@ void unblockClient(client *c) { } /* Clear the flags, and put the client in the unblocked list so that * we'll process new commands in its query buffer ASAP. */ + server.blocked_clients--; + server.blocked_clients_by_type[c->btype]--; c->flags &= ~CLIENT_BLOCKED; c->btype = BLOCKED_NONE; - server.bpop_blocked_clients--; /* The client may already be into the unblocked list because of a previous * blocking operation, don't add back it into the list multiple times. */ if (!(c->flags & CLIENT_UNBLOCKED)) { @@ -158,7 +162,7 @@ void unblockClient(client *c) { * send it a reply of some kind. After this function is called, * unblockClient() will be called with the same client as argument. */ void replyToBlockedClientTimedOut(client *c) { - if (c->btype == BLOCKED_LIST) { + if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) { addReply(c,shared.nullmultibulk); } else if (c->btype == BLOCKED_WAIT) { addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset)); @@ -193,3 +197,283 @@ void disconnectAllBlockedClients(void) { } } } + +/* This function should be called by Redis every time a single command, + * a MULTI/EXEC block, or a Lua script, terminated its execution after + * being called by a client. + * + * All the keys with at least one client blocked that received at least + * one new element via some PUSH/XADD operation are accumulated into + * the server.ready_keys list. This function will run the list and will + * serve clients accordingly. Note that the function will iterate again and + * again as a result of serving BRPOPLPUSH we can have new blocking clients + * to serve because of the PUSH side of BRPOPLPUSH. */ +void handleClientsBlockedOnKeys(void) { + while(listLength(server.ready_keys) != 0) { + list *l; + + /* Point server.ready_keys to a fresh list and save the current one + * locally. This way as we run the old list we are free to call + * signalKeyAsReady() that may push new elements in server.ready_keys + * when handling clients blocked into BRPOPLPUSH. */ + l = server.ready_keys; + server.ready_keys = listCreate(); + + while(listLength(l) != 0) { + listNode *ln = listFirst(l); + readyList *rl = ln->value; + + /* First of all remove this key from db->ready_keys so that + * we can safely call signalKeyAsReady() against this key. */ + dictDelete(rl->db->ready_keys,rl->key); + + /* Serve clients blocked on list key. */ + robj *o = lookupKeyWrite(rl->db,rl->key); + if (o != NULL && o->type == OBJ_LIST) { + dictEntry *de; + + /* We serve clients in the same order they blocked for + * this key, from the first blocked to the last. */ + de = dictFind(rl->db->blocking_keys,rl->key); + if (de) { + list *clients = dictGetVal(de); + int numclients = listLength(clients); + + while(numclients--) { + listNode *clientnode = listFirst(clients); + client *receiver = clientnode->value; + + if (receiver->btype != BLOCKED_LIST) { + /* Put on the tail, so that at the next call + * we'll not run into it again. */ + listDelNode(clients,clientnode); + listAddNodeTail(clients,receiver); + continue; + } + + robj *dstkey = receiver->bpop.target; + int where = (receiver->lastcmd && + receiver->lastcmd->proc == blpopCommand) ? + LIST_HEAD : LIST_TAIL; + robj *value = listTypePop(o,where); + + if (value) { + /* Protect receiver->bpop.target, that will be + * freed by the next unblockClient() + * call. */ + if (dstkey) incrRefCount(dstkey); + unblockClient(receiver); + + if (serveClientBlockedOnList(receiver, + rl->key,dstkey,rl->db,value, + where) == C_ERR) + { + /* If we failed serving the client we need + * to also undo the POP operation. */ + listTypePush(o,value,where); + } + + if (dstkey) decrRefCount(dstkey); + decrRefCount(value); + } else { + break; + } + } + } + + if (listTypeLength(o) == 0) { + dbDelete(rl->db,rl->key); + } + /* We don't call signalModifiedKey() as it was already called + * when an element was pushed on the list. */ + } + + /* Serve clients blocked on stream key. */ + else if (o != NULL && o->type == OBJ_STREAM) { + dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); + stream *s = o->ptr; + + /* We need to provide the new data arrived on the stream + * to all the clients that are waiting for an offset smaller + * than the current top item. */ + if (de) { + list *clients = dictGetVal(de); + listNode *ln; + listIter li; + listRewind(clients,&li); + + while((ln = listNext(&li))) { + client *receiver = listNodeValue(ln); + if (receiver->btype != BLOCKED_STREAM) continue; + streamID *gt = dictFetchValue(receiver->bpop.keys, + rl->key); + if (s->last_id.ms > gt->ms || + (s->last_id.ms == gt->ms && + s->last_id.seq > gt->seq)) + { + streamID start = *gt; + start.seq++; /* Can't overflow, it's an uint64_t */ + /* Note that after we unblock the client, 'gt' + * is no longer valid, so we must do it after + * we copied the ID into the 'start' variable. */ + unblockClient(receiver); + + /* Emit the two elements sub-array consisting of + * the name of the stream and the data we + * extracted from it. Wrapped in a single-item + * array, since we have just one key. */ + addReplyMultiBulkLen(receiver,1); + addReplyMultiBulkLen(receiver,2); + addReplyBulk(receiver,rl->key); + streamReplyWithRange(receiver,s,&start,NULL, + receiver->bpop.xread_count,0); + } + } + } + } + + /* Free this item. */ + decrRefCount(rl->key); + zfree(rl); + listDelNode(l,ln); + } + listRelease(l); /* We have the new list on place at this point. */ + } +} + +/* This is how the current blocking lists/streams work, we use BLPOP as + * example, but the concept is the same for other list ops and XREAD. + * - If the user calls BLPOP and the key exists and contains a non empty list + * then LPOP is called instead. So BLPOP is semantically the same as LPOP + * if blocking is not required. + * - If instead BLPOP is called and the key does not exists or the list is + * empty we need to block. In order to do so we remove the notification for + * new data to read in the client socket (so that we'll not serve new + * requests if the blocking request is not served). Also we put the client + * in a dictionary (db->blocking_keys) mapping keys to a list of clients + * blocking for this keys. + * - If a PUSH operation against a key with blocked clients waiting is + * performed, we mark this key as "ready", and after the current command, + * MULTI/EXEC block, or script, is executed, we serve all the clients waiting + * for this list, from the one that blocked first, to the last, accordingly + * to the number of elements we have in the ready list. + */ + +/* Set a client in blocking mode for the specified key (list or stream), with + * the specified timeout. The 'type' argument is BLOCKED_LIST or BLOCKED_STREAM + * depending on the kind of operation we are waiting for an empty key in + * order to awake the client. The client is blocked for all the 'numkeys' + * keys as in the 'keys' argument. When we block for stream keys, we also + * provide an array of streamID structures: clients will be unblocked only + * when items with an ID greater or equal to the specified one is appended + * to the stream. */ +void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) { + dictEntry *de; + list *l; + int j; + + c->bpop.timeout = timeout; + c->bpop.target = target; + + if (target != NULL) incrRefCount(target); + + for (j = 0; j < numkeys; j++) { + /* The value associated with the key name in the bpop.keys dictionary + * is NULL for lists, or the stream ID for streams. */ + void *key_data = NULL; + if (btype == BLOCKED_STREAM) { + key_data = zmalloc(sizeof(streamID)); + memcpy(key_data,ids+j,sizeof(streamID)); + } + + /* If the key already exists in the dictionary ignore it. */ + if (dictAdd(c->bpop.keys,keys[j],key_data) != DICT_OK) { + zfree(key_data); + continue; + } + incrRefCount(keys[j]); + + /* And in the other "side", to map keys -> clients */ + de = dictFind(c->db->blocking_keys,keys[j]); + if (de == NULL) { + int retval; + + /* For every key we take a list of clients blocked for it */ + l = listCreate(); + retval = dictAdd(c->db->blocking_keys,keys[j],l); + incrRefCount(keys[j]); + serverAssertWithInfo(c,keys[j],retval == DICT_OK); + } else { + l = dictGetVal(de); + } + listAddNodeTail(l,c); + } + blockClient(c,btype); +} + +/* Unblock a client that's waiting in a blocking operation such as BLPOP. + * You should never call this function directly, but unblockClient() instead. */ +void unblockClientWaitingData(client *c) { + dictEntry *de; + dictIterator *di; + list *l; + + serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0); + di = dictGetIterator(c->bpop.keys); + /* The client may wait for multiple keys, so unblock it for every key. */ + while((de = dictNext(di)) != NULL) { + robj *key = dictGetKey(de); + + /* Remove this client from the list of clients waiting for this key. */ + l = dictFetchValue(c->db->blocking_keys,key); + serverAssertWithInfo(c,key,l != NULL); + listDelNode(l,listSearchKey(l,c)); + /* If the list is empty we need to remove it to avoid wasting memory */ + if (listLength(l) == 0) + dictDelete(c->db->blocking_keys,key); + } + dictReleaseIterator(di); + + /* Cleanup the client structure */ + dictEmpty(c->bpop.keys,NULL); + if (c->bpop.target) { + decrRefCount(c->bpop.target); + c->bpop.target = NULL; + } + if (c->bpop.xread_group) { + decrRefCount(c->bpop.xread_group); + c->bpop.xread_group = NULL; + } +} + +/* If the specified key has clients blocked waiting for list pushes, this + * function will put the key reference into the server.ready_keys list. + * Note that db->ready_keys is a hash table that allows us to avoid putting + * the same key again and again in the list in case of multiple pushes + * made by a script or in the context of MULTI/EXEC. + * + * The list will be finally processed by handleClientsBlockedOnLists() */ +void signalKeyAsReady(redisDb *db, robj *key) { + readyList *rl; + + /* No clients blocking for this key? No need to queue it. */ + if (dictFind(db->blocking_keys,key) == NULL) return; + + /* Key was already signaled? No need to queue it again. */ + if (dictFind(db->ready_keys,key) != NULL) return; + + /* Ok, we need to queue this key into server.ready_keys. */ + rl = zmalloc(sizeof(*rl)); + rl->key = key; + rl->db = db; + incrRefCount(key); + listAddNodeTail(server.ready_keys,rl); + + /* We also add the key in the db->ready_keys dictionary in order + * to avoid adding it multiple times into a list with a simple O(1) + * check. */ + incrRefCount(key); + serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK); +} + + |