diff options
Diffstat (limited to 'src/blocked.c')
-rw-r--r-- | src/blocked.c | 140 |
1 files changed, 90 insertions, 50 deletions
diff --git a/src/blocked.c b/src/blocked.c index aa298cffb..65b584213 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -141,12 +141,7 @@ void processUnblockedClients(void) { * the code is conceptually more correct this way. */ if (!(c->flags & CLIENT_BLOCKED)) { /* If we have a queued command, execute it now. */ - if (processPendingCommandsAndResetClient(c) == C_OK) { - /* Now process client if it has more data in it's buffer. */ - if (c->querybuf && sdslen(c->querybuf) > 0) { - if (processInputBuffer(c) == C_ERR) c = NULL; - } - } else { + if (processPendingCommandAndInputBuffer(c) == C_ERR) { c = NULL; } } @@ -204,7 +199,7 @@ void unblockClient(client *c) { * we do not do it immediately after the command returns (when the * client got blocked) in order to be still able to access the argument * vector from module callbacks and updateStatsOnUnblock. */ - if (c->btype != BLOCKED_POSTPONE) { + if (c->btype != BLOCKED_POSTPONE && c->btype != BLOCKED_SHUTDOWN) { freeClientOriginalArgv(c); resetClient(c); } @@ -288,25 +283,24 @@ void disconnectAllBlockedClients(void) { * when there may be clients blocked on a list key, and there may be new * data to fetch (the key is ready). */ void serveClientsBlockedOnListKey(robj *o, readyList *rl) { + /* Optimization: If no clients are in type BLOCKED_LIST, + * we can skip this loop. */ + if (!server.blocked_clients_by_type[BLOCKED_LIST]) return; + /* We serve clients in the same order they blocked for * this key, from the first blocked to the last. */ dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); if (de) { list *clients = dictGetVal(de); - int numclients = listLength(clients); - int deleted = 0; - - while(numclients--) { - listNode *clientnode = listFirst(clients); - client *receiver = clientnode->value; + listNode *ln; + listIter li; + listRewind(clients,&li); - if (receiver->btype != BLOCKED_LIST) { - /* Put at the tail, so that at the next call - * we'll not run into it again. */ - listRotateHeadToTail(clients); - continue; - } + while((ln = listNext(&li))) { + client *receiver = listNodeValue(ln); + if (receiver->btype != BLOCKED_LIST) continue; + int deleted = 0; robj *dstkey = receiver->bpop.target; int wherefrom = receiver->bpop.blockpos.wherefrom; int whereto = receiver->bpop.blockpos.whereto; @@ -342,25 +336,24 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) { * when there may be clients blocked on a sorted set key, and there may be new * data to fetch (the key is ready). */ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) { + /* Optimization: If no clients are in type BLOCKED_ZSET, + * we can skip this loop. */ + if (!server.blocked_clients_by_type[BLOCKED_ZSET]) return; + /* We serve clients in the same order they blocked for * this key, from the first blocked to the last. */ dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); if (de) { list *clients = dictGetVal(de); - int numclients = listLength(clients); - int deleted = 0; - - while (numclients--) { - listNode *clientnode = listFirst(clients); - client *receiver = clientnode->value; + listNode *ln; + listIter li; + listRewind(clients,&li); - if (receiver->btype != BLOCKED_ZSET) { - /* Put at the tail, so that at the next call - * we'll not run into it again. */ - listRotateHeadToTail(clients); - continue; - } + while((ln = listNext(&li))) { + client *receiver = listNodeValue(ln); + if (receiver->btype != BLOCKED_ZSET) continue; + int deleted = 0; long llen = zsetLength(o); long count = receiver->bpop.count; int where = receiver->bpop.blockpos.wherefrom; @@ -407,6 +400,10 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) { * when there may be clients blocked on a stream key, and there may be new * data to fetch (the key is ready). */ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { + /* Optimization: If no clients are in type BLOCKED_STREAM, + * we can skip this loop. */ + if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return; + dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); stream *s = o->ptr; @@ -520,30 +517,21 @@ unblock_receiver: * see if the key is really able to serve the client, and in that case, * unblock it. */ void serveClientsBlockedOnKeyByModule(readyList *rl) { - dictEntry *de; - /* Optimization: If no clients are in type BLOCKED_MODULE, * we can skip this loop. */ if (!server.blocked_clients_by_type[BLOCKED_MODULE]) return; /* We serve clients in the same order they blocked for * this key, from the first blocked to the last. */ - de = dictFind(rl->db->blocking_keys,rl->key); + dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); if (de) { list *clients = dictGetVal(de); - int numclients = listLength(clients); - - while(numclients--) { - listNode *clientnode = listFirst(clients); - client *receiver = clientnode->value; - - /* Put at the tail, so that at the next call - * we'll not run into it again: clients here may not be - * ready to be served, so they'll remain in the list - * sometimes. We want also be able to skip clients that are - * not blocked for the MODULE type safely. */ - listRotateHeadToTail(clients); + listNode *ln; + listIter li; + listRewind(clients,&li); + while((ln = listNext(&li))) { + client *receiver = listNodeValue(ln); if (receiver->btype != BLOCKED_MODULE) continue; /* Note that if *this* client cannot be served by this key, @@ -566,6 +554,49 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) { } } +/* Helper function for handleClientsBlockedOnKeys(). This function is called + * when there may be clients blocked, via XREADGROUP, on an existing stream which + * was deleted. We need to unblock the clients in that case. + * The idea is that a client that is blocked via XREADGROUP is different from + * any other blocking type in the sense that it depends on the existence of both + * the key and the group. Even if the key is deleted and then revived with XADD + * it won't help any clients blocked on XREADGROUP because the group no longer + * exist, so they would fail with -NOGROUP anyway. + * The conclusion is that it's better to unblock these client (with error) upon + * the deletion of the key, rather than waiting for the first XADD. */ +void unblockDeletedStreamReadgroupClients(readyList *rl) { + /* Optimization: If no clients are in type BLOCKED_STREAM, + * we can skip this loop. */ + if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return; + + /* We serve clients in the same order they blocked for + * this key, from the first blocked to the last. */ + dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); + if (de) { + list *clients = dictGetVal(de); + listNode *ln; + listIter li; + listRewind(clients,&li); + + while((ln = listNext(&li))) { + client *receiver = listNodeValue(ln); + if (receiver->btype != BLOCKED_STREAM || !receiver->bpop.xread_group) + continue; + + long long prev_error_replies = server.stat_total_error_replies; + client *old_client = server.current_client; + server.current_client = receiver; + monotime replyTimer; + elapsedStart(&replyTimer); + addReplyError(receiver, "-UNBLOCKED the stream key no longer exists"); + updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); + unblockClient(receiver); + afterCommand(receiver); + server.current_client = old_client; + } + } +} + /* This function should be called by Redis every time a single command, * a MULTI/EXEC block, or a Lua script, terminated its execution after * being called by a client. It handles serving clients blocked in @@ -624,17 +655,27 @@ void handleClientsBlockedOnKeys(void) { /* Serve clients blocked on the key. */ robj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOOKUP_NONOTIFY | LOOKUP_NOSTATS); if (o != NULL) { - if (o->type == OBJ_LIST) + int objtype = o->type; + if (objtype == OBJ_LIST) serveClientsBlockedOnListKey(o,rl); - else if (o->type == OBJ_ZSET) + else if (objtype == OBJ_ZSET) serveClientsBlockedOnSortedSetKey(o,rl); - else if (o->type == OBJ_STREAM) + else if (objtype == OBJ_STREAM) serveClientsBlockedOnStreamKey(o,rl); /* We want to serve clients blocked on module keys * regardless of the object type: we don't know what the * module is trying to accomplish right now. */ serveClientsBlockedOnKeyByModule(rl); + /* If we have XREADGROUP clients blocked on this key, and + * the key is not a stream, it must mean that the key was + * overwritten by either SET or something like + * (MULTI, DEL key, SADD key e, EXEC). + * In this case we need to unblock all these clients. */ + if (objtype != OBJ_STREAM) + unblockDeletedStreamReadgroupClients(rl); } else { + /* Unblock all XREADGROUP clients of this deleted key */ + unblockDeletedStreamReadgroupClients(rl); /* Edge case: If lookupKeyReadWithFlags decides to expire the key we have to * take care of the propagation here, because afterCommand wasn't called */ if (server.also_propagate.numops > 0) @@ -823,4 +864,3 @@ void signalKeyAsReady(redisDb *db, robj *key, int type) { incrRefCount(key); serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK); } - |