summaryrefslogtreecommitdiff
path: root/src/blocked.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/blocked.c')
-rw-r--r--src/blocked.c140
1 files changed, 90 insertions, 50 deletions
diff --git a/src/blocked.c b/src/blocked.c
index aa298cffb..65b584213 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -141,12 +141,7 @@ void processUnblockedClients(void) {
* the code is conceptually more correct this way. */
if (!(c->flags & CLIENT_BLOCKED)) {
/* If we have a queued command, execute it now. */
- if (processPendingCommandsAndResetClient(c) == C_OK) {
- /* Now process client if it has more data in it's buffer. */
- if (c->querybuf && sdslen(c->querybuf) > 0) {
- if (processInputBuffer(c) == C_ERR) c = NULL;
- }
- } else {
+ if (processPendingCommandAndInputBuffer(c) == C_ERR) {
c = NULL;
}
}
@@ -204,7 +199,7 @@ void unblockClient(client *c) {
* we do not do it immediately after the command returns (when the
* client got blocked) in order to be still able to access the argument
* vector from module callbacks and updateStatsOnUnblock. */
- if (c->btype != BLOCKED_POSTPONE) {
+ if (c->btype != BLOCKED_POSTPONE && c->btype != BLOCKED_SHUTDOWN) {
freeClientOriginalArgv(c);
resetClient(c);
}
@@ -288,25 +283,24 @@ void disconnectAllBlockedClients(void) {
* when there may be clients blocked on a list key, and there may be new
* data to fetch (the key is ready). */
void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
+ /* Optimization: If no clients are in type BLOCKED_LIST,
+ * we can skip this loop. */
+ if (!server.blocked_clients_by_type[BLOCKED_LIST]) return;
+
/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
- int numclients = listLength(clients);
- int deleted = 0;
-
- while(numclients--) {
- listNode *clientnode = listFirst(clients);
- client *receiver = clientnode->value;
+ listNode *ln;
+ listIter li;
+ listRewind(clients,&li);
- if (receiver->btype != BLOCKED_LIST) {
- /* Put at the tail, so that at the next call
- * we'll not run into it again. */
- listRotateHeadToTail(clients);
- continue;
- }
+ while((ln = listNext(&li))) {
+ client *receiver = listNodeValue(ln);
+ if (receiver->btype != BLOCKED_LIST) continue;
+ int deleted = 0;
robj *dstkey = receiver->bpop.target;
int wherefrom = receiver->bpop.blockpos.wherefrom;
int whereto = receiver->bpop.blockpos.whereto;
@@ -342,25 +336,24 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
* when there may be clients blocked on a sorted set key, and there may be new
* data to fetch (the key is ready). */
void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
+ /* Optimization: If no clients are in type BLOCKED_ZSET,
+ * we can skip this loop. */
+ if (!server.blocked_clients_by_type[BLOCKED_ZSET]) return;
+
/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
- int numclients = listLength(clients);
- int deleted = 0;
-
- while (numclients--) {
- listNode *clientnode = listFirst(clients);
- client *receiver = clientnode->value;
+ listNode *ln;
+ listIter li;
+ listRewind(clients,&li);
- if (receiver->btype != BLOCKED_ZSET) {
- /* Put at the tail, so that at the next call
- * we'll not run into it again. */
- listRotateHeadToTail(clients);
- continue;
- }
+ while((ln = listNext(&li))) {
+ client *receiver = listNodeValue(ln);
+ if (receiver->btype != BLOCKED_ZSET) continue;
+ int deleted = 0;
long llen = zsetLength(o);
long count = receiver->bpop.count;
int where = receiver->bpop.blockpos.wherefrom;
@@ -407,6 +400,10 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
* when there may be clients blocked on a stream key, and there may be new
* data to fetch (the key is ready). */
void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
+ /* Optimization: If no clients are in type BLOCKED_STREAM,
+ * we can skip this loop. */
+ if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return;
+
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
stream *s = o->ptr;
@@ -520,30 +517,21 @@ unblock_receiver:
* see if the key is really able to serve the client, and in that case,
* unblock it. */
void serveClientsBlockedOnKeyByModule(readyList *rl) {
- dictEntry *de;
-
/* Optimization: If no clients are in type BLOCKED_MODULE,
* we can skip this loop. */
if (!server.blocked_clients_by_type[BLOCKED_MODULE]) return;
/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
- de = dictFind(rl->db->blocking_keys,rl->key);
+ dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
- int numclients = listLength(clients);
-
- while(numclients--) {
- listNode *clientnode = listFirst(clients);
- client *receiver = clientnode->value;
-
- /* Put at the tail, so that at the next call
- * we'll not run into it again: clients here may not be
- * ready to be served, so they'll remain in the list
- * sometimes. We want also be able to skip clients that are
- * not blocked for the MODULE type safely. */
- listRotateHeadToTail(clients);
+ listNode *ln;
+ listIter li;
+ listRewind(clients,&li);
+ while((ln = listNext(&li))) {
+ client *receiver = listNodeValue(ln);
if (receiver->btype != BLOCKED_MODULE) continue;
/* Note that if *this* client cannot be served by this key,
@@ -566,6 +554,49 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
}
}
+/* Helper function for handleClientsBlockedOnKeys(). This function is called
+ * when there may be clients blocked, via XREADGROUP, on an existing stream which
+ * was deleted. We need to unblock the clients in that case.
+ * The idea is that a client that is blocked via XREADGROUP is different from
+ * any other blocking type in the sense that it depends on the existence of both
+ * the key and the group. Even if the key is deleted and then revived with XADD
+ * it won't help any clients blocked on XREADGROUP because the group no longer
+ * exist, so they would fail with -NOGROUP anyway.
+ * The conclusion is that it's better to unblock these client (with error) upon
+ * the deletion of the key, rather than waiting for the first XADD. */
+void unblockDeletedStreamReadgroupClients(readyList *rl) {
+ /* Optimization: If no clients are in type BLOCKED_STREAM,
+ * we can skip this loop. */
+ if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return;
+
+ /* We serve clients in the same order they blocked for
+ * this key, from the first blocked to the last. */
+ dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
+ if (de) {
+ list *clients = dictGetVal(de);
+ listNode *ln;
+ listIter li;
+ listRewind(clients,&li);
+
+ while((ln = listNext(&li))) {
+ client *receiver = listNodeValue(ln);
+ if (receiver->btype != BLOCKED_STREAM || !receiver->bpop.xread_group)
+ continue;
+
+ long long prev_error_replies = server.stat_total_error_replies;
+ client *old_client = server.current_client;
+ server.current_client = receiver;
+ monotime replyTimer;
+ elapsedStart(&replyTimer);
+ addReplyError(receiver, "-UNBLOCKED the stream key no longer exists");
+ updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies);
+ unblockClient(receiver);
+ afterCommand(receiver);
+ server.current_client = old_client;
+ }
+ }
+}
+
/* This function should be called by Redis every time a single command,
* a MULTI/EXEC block, or a Lua script, terminated its execution after
* being called by a client. It handles serving clients blocked in
@@ -624,17 +655,27 @@ void handleClientsBlockedOnKeys(void) {
/* Serve clients blocked on the key. */
robj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOOKUP_NONOTIFY | LOOKUP_NOSTATS);
if (o != NULL) {
- if (o->type == OBJ_LIST)
+ int objtype = o->type;
+ if (objtype == OBJ_LIST)
serveClientsBlockedOnListKey(o,rl);
- else if (o->type == OBJ_ZSET)
+ else if (objtype == OBJ_ZSET)
serveClientsBlockedOnSortedSetKey(o,rl);
- else if (o->type == OBJ_STREAM)
+ else if (objtype == OBJ_STREAM)
serveClientsBlockedOnStreamKey(o,rl);
/* We want to serve clients blocked on module keys
* regardless of the object type: we don't know what the
* module is trying to accomplish right now. */
serveClientsBlockedOnKeyByModule(rl);
+ /* If we have XREADGROUP clients blocked on this key, and
+ * the key is not a stream, it must mean that the key was
+ * overwritten by either SET or something like
+ * (MULTI, DEL key, SADD key e, EXEC).
+ * In this case we need to unblock all these clients. */
+ if (objtype != OBJ_STREAM)
+ unblockDeletedStreamReadgroupClients(rl);
} else {
+ /* Unblock all XREADGROUP clients of this deleted key */
+ unblockDeletedStreamReadgroupClients(rl);
/* Edge case: If lookupKeyReadWithFlags decides to expire the key we have to
* take care of the propagation here, because afterCommand wasn't called */
if (server.also_propagate.numops > 0)
@@ -823,4 +864,3 @@ void signalKeyAsReady(redisDb *db, robj *key, int type) {
incrRefCount(key);
serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
}
-