summaryrefslogtreecommitdiff
path: root/src/blocked.c
diff options
context:
space:
mode:
authorguybe7 <guy.benoish@redislabs.com>2022-03-08 17:10:36 +0200
committerGitHub <noreply@github.com>2022-03-08 17:10:36 +0200
commit2a2954086a28b9969882158a5b0630c3f944c051 (patch)
treedcb2e312467c9dcbf07b3bf46f2342ca84b6cb04 /src/blocked.c
parent728e62523e1a7998ed30be2ab0a6947390ff6a87 (diff)
downloadredis-2a2954086a28b9969882158a5b0630c3f944c051.tar.gz
XREADGROUP: Unblock client if stream is deleted (#10306)
Deleting a stream while a client is blocked XREADGROUP should unblock the client. The idea is that if a client is blocked via XREADGROUP is different from any other blocking type in the sense that it depends on the existence of both the key and the group. Even if the key is deleted and then revived with XADD it won't help any clients blocked on XREADGROUP because the group no longer exist, so they would fail with -NOGROUP anyway. The conclusion is that it's better to unblock these clients (with error) upon the deletion of the key, rather than waiting for the first XADD. Other changes: 1. Slightly optimize all `serveClientsBlockedOn*` functions by checking `server.blocked_clients_by_type` 2. All `serveClientsBlockedOn*` functions now use a list iterator rather than looking at `listFirst`, relying on `unblockClient` to delete the head of the list. Before this commit, only `serveClientsBlockedOnStreams` used to work like that. 3. bugfix: CLIENT UNBLOCK ERROR should work even if the command doesn't have a timeout_callback (only relevant to module commands)
Diffstat (limited to 'src/blocked.c')
-rw-r--r--src/blocked.c131
1 files changed, 88 insertions, 43 deletions
diff --git a/src/blocked.c b/src/blocked.c
index aa298cffb..a982884c4 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -288,25 +288,24 @@ void disconnectAllBlockedClients(void) {
* when there may be clients blocked on a list key, and there may be new
* data to fetch (the key is ready). */
void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
+ /* Optimization: If no clients are in type BLOCKED_LIST,
+ * we can skip this loop. */
+ if (!server.blocked_clients_by_type[BLOCKED_LIST]) return;
+
/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
- int numclients = listLength(clients);
- int deleted = 0;
-
- while(numclients--) {
- listNode *clientnode = listFirst(clients);
- client *receiver = clientnode->value;
+ listNode *ln;
+ listIter li;
+ listRewind(clients,&li);
- if (receiver->btype != BLOCKED_LIST) {
- /* Put at the tail, so that at the next call
- * we'll not run into it again. */
- listRotateHeadToTail(clients);
- continue;
- }
+ while((ln = listNext(&li))) {
+ client *receiver = listNodeValue(ln);
+ if (receiver->btype != BLOCKED_LIST) continue;
+ int deleted = 0;
robj *dstkey = receiver->bpop.target;
int wherefrom = receiver->bpop.blockpos.wherefrom;
int whereto = receiver->bpop.blockpos.whereto;
@@ -342,25 +341,24 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
* when there may be clients blocked on a sorted set key, and there may be new
* data to fetch (the key is ready). */
void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
+ /* Optimization: If no clients are in type BLOCKED_ZSET,
+ * we can skip this loop. */
+ if (!server.blocked_clients_by_type[BLOCKED_ZSET]) return;
+
/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
- int numclients = listLength(clients);
- int deleted = 0;
-
- while (numclients--) {
- listNode *clientnode = listFirst(clients);
- client *receiver = clientnode->value;
+ listNode *ln;
+ listIter li;
+ listRewind(clients,&li);
- if (receiver->btype != BLOCKED_ZSET) {
- /* Put at the tail, so that at the next call
- * we'll not run into it again. */
- listRotateHeadToTail(clients);
- continue;
- }
+ while((ln = listNext(&li))) {
+ client *receiver = listNodeValue(ln);
+ if (receiver->btype != BLOCKED_ZSET) continue;
+ int deleted = 0;
long llen = zsetLength(o);
long count = receiver->bpop.count;
int where = receiver->bpop.blockpos.wherefrom;
@@ -407,6 +405,10 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
* when there may be clients blocked on a stream key, and there may be new
* data to fetch (the key is ready). */
void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
+ /* Optimization: If no clients are in type BLOCKED_STREAM,
+ * we can skip this loop. */
+ if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return;
+
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
stream *s = o->ptr;
@@ -520,30 +522,21 @@ unblock_receiver:
* see if the key is really able to serve the client, and in that case,
* unblock it. */
void serveClientsBlockedOnKeyByModule(readyList *rl) {
- dictEntry *de;
-
/* Optimization: If no clients are in type BLOCKED_MODULE,
* we can skip this loop. */
if (!server.blocked_clients_by_type[BLOCKED_MODULE]) return;
/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
- de = dictFind(rl->db->blocking_keys,rl->key);
+ dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
- int numclients = listLength(clients);
-
- while(numclients--) {
- listNode *clientnode = listFirst(clients);
- client *receiver = clientnode->value;
-
- /* Put at the tail, so that at the next call
- * we'll not run into it again: clients here may not be
- * ready to be served, so they'll remain in the list
- * sometimes. We want also be able to skip clients that are
- * not blocked for the MODULE type safely. */
- listRotateHeadToTail(clients);
+ listNode *ln;
+ listIter li;
+ listRewind(clients,&li);
+ while((ln = listNext(&li))) {
+ client *receiver = listNodeValue(ln);
if (receiver->btype != BLOCKED_MODULE) continue;
/* Note that if *this* client cannot be served by this key,
@@ -566,6 +559,49 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
}
}
+/* Helper function for handleClientsBlockedOnKeys(). This function is called
+ * when there may be clients blocked, via XREADGROUP, on an existing stream which
+ * was deleted. We need to unblock the clients in that case.
+ * The idea is that a client that is blocked via XREADGROUP is different from
+ * any other blocking type in the sense that it depends on the existence of both
+ * the key and the group. Even if the key is deleted and then revived with XADD
+ * it won't help any clients blocked on XREADGROUP because the group no longer
+ * exist, so they would fail with -NOGROUP anyway.
+ * The conclusion is that it's better to unblock these client (with error) upon
+ * the deletion of the key, rather than waiting for the first XADD. */
+void unblockDeletedStreamReadgroupClients(readyList *rl) {
+ /* Optimization: If no clients are in type BLOCKED_STREAM,
+ * we can skip this loop. */
+ if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return;
+
+ /* We serve clients in the same order they blocked for
+ * this key, from the first blocked to the last. */
+ dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
+ if (de) {
+ list *clients = dictGetVal(de);
+ listNode *ln;
+ listIter li;
+ listRewind(clients,&li);
+
+ while((ln = listNext(&li))) {
+ client *receiver = listNodeValue(ln);
+ if (receiver->btype != BLOCKED_STREAM || !receiver->bpop.xread_group)
+ continue;
+
+ long long prev_error_replies = server.stat_total_error_replies;
+ client *old_client = server.current_client;
+ server.current_client = receiver;
+ monotime replyTimer;
+ elapsedStart(&replyTimer);
+ addReplyError(receiver, "-UNBLOCKED the stream key no longer exists");
+ updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies);
+ unblockClient(receiver);
+ afterCommand(receiver);
+ server.current_client = old_client;
+ }
+ }
+}
+
/* This function should be called by Redis every time a single command,
* a MULTI/EXEC block, or a Lua script, terminated its execution after
* being called by a client. It handles serving clients blocked in
@@ -624,17 +660,27 @@ void handleClientsBlockedOnKeys(void) {
/* Serve clients blocked on the key. */
robj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOOKUP_NONOTIFY | LOOKUP_NOSTATS);
if (o != NULL) {
- if (o->type == OBJ_LIST)
+ int objtype = o->type;
+ if (objtype == OBJ_LIST)
serveClientsBlockedOnListKey(o,rl);
- else if (o->type == OBJ_ZSET)
+ else if (objtype == OBJ_ZSET)
serveClientsBlockedOnSortedSetKey(o,rl);
- else if (o->type == OBJ_STREAM)
+ else if (objtype == OBJ_STREAM)
serveClientsBlockedOnStreamKey(o,rl);
/* We want to serve clients blocked on module keys
* regardless of the object type: we don't know what the
* module is trying to accomplish right now. */
serveClientsBlockedOnKeyByModule(rl);
+ /* If we have XREADGROUP clients blocked on this key, and
+ * the key is not a stream, it must mean that the key was
+ * overwritten by either SET or something like
+ * (MULTI, DEL key, SADD key e, EXEC).
+ * In this case we need to unblock all these clients. */
+ if (objtype != OBJ_STREAM)
+ unblockDeletedStreamReadgroupClients(rl);
} else {
+ /* Unblock all XREADGROUP clients of this deleted key */
+ unblockDeletedStreamReadgroupClients(rl);
/* Edge case: If lookupKeyReadWithFlags decides to expire the key we have to
* take care of the propagation here, because afterCommand wasn't called */
if (server.also_propagate.numops > 0)
@@ -823,4 +869,3 @@ void signalKeyAsReady(redisDb *db, robj *key, int type) {
incrRefCount(key);
serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
}
-