summaryrefslogtreecommitdiff
path: root/src/blocked.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/blocked.c')
-rw-r--r--src/blocked.c201
1 files changed, 108 insertions, 93 deletions
diff --git a/src/blocked.c b/src/blocked.c
index c4aded0c5..497ffe4ce 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -398,14 +398,26 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
/* Helper function for handleClientsBlockedOnKeys(). This function is called
* when there may be clients blocked on a stream key, and there may be new
- * data to fetch (the key is ready). */
+ * data to fetch (the key is ready).
+ * This function also handles the case where there may be clients blocked,
+ * via XREADGROUP, on an existing stream which was deleted.
+ * We need to unblock the clients in that case.
+ * The idea is that a client that is blocked via XREADGROUP is different from
+ * any other blocking type in the sense that it depends on the existence of both
+ * the key and the group. Even if the key is deleted and then revived with XADD
+ * it won't help any clients blocked on XREADGROUP because the group no longer
+ * exist, so they would fail with -NOGROUP anyway.
+ * The conclusion is that it's better to unblock these client (with error) upon
+ * the deletion of the key, rather than waiting for the first XADD.*/
void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
/* Optimization: If no clients are in type BLOCKED_STREAM,
* we can skip this loop. */
if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return;
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
- stream *s = o->ptr;
+ /* This function may be called with o=NULL (in order to unblock
+ * XREADGROUP clients whose key was deleted) */
+ stream *s = o? o->ptr : NULL;
/* We need to provide the new data arrived on the stream
* to all the clients that are waiting for an offset smaller
@@ -422,6 +434,13 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
bkinfo *bki = dictFetchValue(receiver->bpop.keys,rl->key);
streamID *gt = &bki->stream_id;
+ if (!receiver->bpop.xread_group && (!o || o->type != OBJ_STREAM)) {
+ /* If it's a blocking XREAD and the stream was either deleted
+ * or replaced with another key, we don't do anything (it's ok
+ * the the client blocks on a non-existing key). */
+ continue;
+ }
+
long long prev_error_replies = server.stat_total_error_replies;
client *old_client = server.current_client;
server.current_client = receiver;
@@ -439,6 +458,12 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
* otherwise. */
streamCG *group = NULL;
if (receiver->bpop.xread_group) {
+ /* If it's a blocking XREADGROUP and the stream was either deleted
+ * or replaced with another key, we unblock the client */
+ if (!o || o->type != OBJ_STREAM) {
+ addReplyError(receiver, "-UNBLOCKED the stream key no longer exists");
+ goto unblock_receiver;
+ }
group = streamLookupCG(s,
receiver->bpop.xread_group->ptr);
/* If the group was not found, send an error
@@ -545,52 +570,13 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
server.current_client = receiver;
monotime replyTimer;
elapsedStart(&replyTimer);
- if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
- updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies);
- moduleUnblockClient(receiver);
- afterCommand(receiver);
- server.current_client = old_client;
- }
- }
-}
-
-/* Helper function for handleClientsBlockedOnKeys(). This function is called
- * when there may be clients blocked, via XREADGROUP, on an existing stream which
- * was deleted. We need to unblock the clients in that case.
- * The idea is that a client that is blocked via XREADGROUP is different from
- * any other blocking type in the sense that it depends on the existence of both
- * the key and the group. Even if the key is deleted and then revived with XADD
- * it won't help any clients blocked on XREADGROUP because the group no longer
- * exist, so they would fail with -NOGROUP anyway.
- * The conclusion is that it's better to unblock these client (with error) upon
- * the deletion of the key, rather than waiting for the first XADD. */
-void unblockDeletedStreamReadgroupClients(readyList *rl) {
- /* Optimization: If no clients are in type BLOCKED_STREAM,
- * we can skip this loop. */
- if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return;
-
- /* We serve clients in the same order they blocked for
- * this key, from the first blocked to the last. */
- dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
- if (de) {
- list *clients = dictGetVal(de);
- listNode *ln;
- listIter li;
- listRewind(clients,&li);
-
- while((ln = listNext(&li))) {
- client *receiver = listNodeValue(ln);
- if (receiver->btype != BLOCKED_STREAM || !receiver->bpop.xread_group)
- continue;
-
- long long prev_error_replies = server.stat_total_error_replies;
- client *old_client = server.current_client;
- server.current_client = receiver;
- monotime replyTimer;
- elapsedStart(&replyTimer);
- addReplyError(receiver, "-UNBLOCKED the stream key no longer exists");
- updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies);
- unblockClient(receiver);
+ if (moduleTryServeClientBlockedOnKey(receiver, rl->key)) {
+ updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies);
+ moduleUnblockClient(receiver);
+ }
+ /* We need to call afterCommand even if the client was not unblocked
+ * in order to propagate any changes that could have been done inside
+ * moduleTryServeClientBlockedOnKey */
afterCommand(receiver);
server.current_client = old_client;
}
@@ -646,33 +632,35 @@ void handleClientsBlockedOnKeys(void) {
/* Serve clients blocked on the key. */
robj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOOKUP_NONOTIFY | LOOKUP_NOSTATS);
- if (o != NULL) {
- int objtype = o->type;
- if (objtype == OBJ_LIST)
- serveClientsBlockedOnListKey(o,rl);
- else if (objtype == OBJ_ZSET)
- serveClientsBlockedOnSortedSetKey(o,rl);
- else if (objtype == OBJ_STREAM)
- serveClientsBlockedOnStreamKey(o,rl);
- /* We want to serve clients blocked on module keys
- * regardless of the object type: we don't know what the
- * module is trying to accomplish right now. */
- serveClientsBlockedOnKeyByModule(rl);
- /* If we have XREADGROUP clients blocked on this key, and
- * the key is not a stream, it must mean that the key was
- * overwritten by either SET or something like
- * (MULTI, DEL key, SADD key e, EXEC).
- * In this case we need to unblock all these clients. */
- if (objtype != OBJ_STREAM)
- unblockDeletedStreamReadgroupClients(rl);
- } else {
- /* Unblock all XREADGROUP clients of this deleted key */
- unblockDeletedStreamReadgroupClients(rl);
+ if (!o) {
/* Edge case: If lookupKeyReadWithFlags decides to expire the key we have to
* take care of the propagation here, because afterCommand wasn't called */
- if (server.also_propagate.numops > 0)
- propagatePendingCommands();
+ propagatePendingCommands();
+ } else {
+ if (o->type == OBJ_LIST)
+ serveClientsBlockedOnListKey(o,rl);
+ else if (o->type == OBJ_ZSET)
+ serveClientsBlockedOnSortedSetKey(o,rl);
}
+ /* We need to try to serve stream clients even if the key no longer exists because
+ * XREADGROUP clients need to be unblocked in case the key is missing, either deleted
+ * or replaced by SET or something like {MULTI, DEL key, SADD key e, EXEC}.
+ * In this case we need to unblock all these clients. */
+ serveClientsBlockedOnStreamKey(o,rl);
+ /* We want to serve clients blocked on module keys regardless of the object type, or
+ * whether the object exists or not: we don't know what the module is trying to
+ * accomplish right now.
+ * Please note that this function must be called only after handling non-module
+ * clients, since moduleTryServeClientBlockedOnKey may delete the key, causing `o`
+ * to be stale.
+ * The scenario is that we have one client blocked on BLPOP while another module
+ * client is blocked by MODULE.SAME-AS-BLPOP on the same key.
+ * Of course we can call lookupKeyReadWithFlags again, but:
+ * 1) It takes CPU
+ * 2) It makes more sense to give priority to "native" blocking clients rather
+ * than module blocking clients
+ * */
+ serveClientsBlockedOnKeyByModule(rl);
/* Free this item. */
decrRefCount(rl->key);
@@ -717,8 +705,7 @@ void handleClientsBlockedOnKeys(void) {
*
* 'count' for those commands that support the optional count argument.
* Otherwise the value is 0. */
-void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids) {
- dictEntry *de;
+void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids, int unblock_on_nokey) {
list *l;
int j;
@@ -745,20 +732,27 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, ms
incrRefCount(keys[j]);
/* And in the other "side", to map keys -> clients */
- de = dictFind(c->db->blocking_keys,keys[j]);
- if (de == NULL) {
- int retval;
-
+ dictEntry *de, *existing;
+ de = dictAddRaw(c->db->blocking_keys, keys[j], &existing);
+ if (de) {
+ incrRefCount(keys[j]);
/* For every key we take a list of clients blocked for it */
l = listCreate();
- retval = dictAdd(c->db->blocking_keys,keys[j],l);
- incrRefCount(keys[j]);
- serverAssertWithInfo(c,keys[j],retval == DICT_OK);
+ dictSetVal(c->db->blocking_keys, de, l);
} else {
- l = dictGetVal(de);
+ l = dictGetVal(existing);
}
listAddNodeTail(l,c);
bki->listnode = listLast(l);
+
+ /* We need to add the key to blocking_keys_unblock_on_nokey, if the client
+ * wants to be awakened if key is deleted (like XREADGROUP) */
+ if (unblock_on_nokey) {
+ de = dictAddRaw(c->db->blocking_keys_unblock_on_nokey, keys[j], NULL);
+ if (de) {
+ incrRefCount(keys[j]);
+ }
+ }
}
blockClient(c,btype);
}
@@ -782,8 +776,10 @@ void unblockClientWaitingData(client *c) {
serverAssertWithInfo(c,key,l != NULL);
listDelNode(l,bki->listnode);
/* If the list is empty we need to remove it to avoid wasting memory */
- if (listLength(l) == 0)
+ if (listLength(l) == 0) {
dictDelete(c->db->blocking_keys,key);
+ dictDelete(c->db->blocking_keys_unblock_on_nokey,key);
+ }
}
dictReleaseIterator(di);
@@ -818,7 +814,7 @@ static int getBlockedTypeByType(int type) {
* made by a script or in the context of MULTI/EXEC.
*
* The list will be finally processed by handleClientsBlockedOnKeys() */
-void signalKeyAsReady(redisDb *db, robj *key, int type) {
+static void signalKeyAsReadyLogic(redisDb *db, robj *key, int type, int deleted) {
readyList *rl;
/* Quick returns. */
@@ -836,11 +832,28 @@ void signalKeyAsReady(redisDb *db, robj *key, int type) {
return;
}
- /* No clients blocking for this key? No need to queue it. */
- if (dictFind(db->blocking_keys,key) == NULL) return;
+ if (deleted) {
+ /* Key deleted and no clients blocking for this key? No need to queue it. */
+ if (dictFind(db->blocking_keys_unblock_on_nokey,key) == NULL)
+ return;
+ /* Note: if we made it here it means the key is also present in db->blocking_keys */
+ } else {
+ /* No clients blocking for this key? No need to queue it. */
+ if (dictFind(db->blocking_keys,key) == NULL)
+ return;
+ }
- /* Key was already signaled? No need to queue it again. */
- if (dictFind(db->ready_keys,key) != NULL) return;
+ dictEntry *de, *existing;
+ de = dictAddRaw(db->ready_keys, key, &existing);
+ if (de) {
+ /* We add the key in the db->ready_keys dictionary in order
+ * to avoid adding it multiple times into a list with a simple O(1)
+ * check. */
+ incrRefCount(key);
+ } else {
+ /* Key was already signaled? No need to queue it again. */
+ return;
+ }
/* Ok, we need to queue this key into server.ready_keys. */
rl = zmalloc(sizeof(*rl));
@@ -848,10 +861,12 @@ void signalKeyAsReady(redisDb *db, robj *key, int type) {
rl->db = db;
incrRefCount(key);
listAddNodeTail(server.ready_keys,rl);
+}
- /* We also add the key in the db->ready_keys dictionary in order
- * to avoid adding it multiple times into a list with a simple O(1)
- * check. */
- incrRefCount(key);
- serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
+void signalKeyAsReady(redisDb *db, robj *key, int type) {
+ signalKeyAsReadyLogic(db, key, type, 0);
+}
+
+void signalDeletedKeyAsReady(redisDb *db, robj *key, int type) {
+ signalKeyAsReadyLogic(db, key, type, 1);
}