summaryrefslogtreecommitdiff
path: root/src/blocked.c
diff options
context:
space:
mode:
authorguybe7 <guy.benoish@redislabs.com>2022-10-18 18:50:02 +0200
committerGitHub <noreply@github.com>2022-10-18 19:50:02 +0300
commitb57fd01064428ab388c9d9038a617a52488a447b (patch)
tree82bcfcf7355292dcdbd8da8d56b149cedb10da56 /src/blocked.c
parentb43f254813025e3deea6ef65126ea2bad49af857 (diff)
downloadredis-b57fd01064428ab388c9d9038a617a52488a447b.tar.gz
Blocked module clients should be aware when a key is deleted (#11310)
The use case is a module that wants to implement a blocking command on a key that necessarily exists and wants to unblock the client in case the key is deleted (much like what we implemented for XREADGROUP in #10306) New module API: * RedisModule_BlockClientOnKeysWithFlags Flags: * REDISMODULE_BLOCK_UNBLOCK_NONE * REDISMODULE_BLOCK_UNBLOCK_DELETED ### Detailed description of code changes blocked.c: 1. Both module and stream functions are called whether the key exists or not, regardless of its type. We do that in order to allow modules/stream to unblock the client in case the key is no longer present or has changed type (the behavior for streams didn't change, just code that moved into serveClientsBlockedOnStreamKey) 2. Make sure afterCommand is called in serveClientsBlockedOnKeyByModule, in order to propagate actions from moduleTryServeClientBlockedOnKey. 3. handleClientsBlockedOnKeys: call propagatePendingCommands directly after lookupKeyReadWithFlags to prevent a possible lazy-expire DEL from being mixed with any command propagated by the preceding functions. 4. blockForKeys: Caller can specifiy that it wants to be awakened if key is deleted. Minor optimizations (use dictAddRaw). 5. signalKeyAsReady became signalKeyAsReadyLogic which can take a boolean in case the key is deleted. It will only signal if there's at least one client that awaits key deletion (to save calls to handleClientsBlockedOnKeys). Minor optimizations (use dictAddRaw) db.c: 1. scanDatabaseForDeletedStreams is now scanDatabaseForDeletedKeys and will signalKeyAsReady for any key that was removed from the database or changed type. It is the responsibility of the code in blocked.c to ignore or act on deleted/type-changed keys. 2. Use the new signalDeletedKeyAsReady where needed blockedonkey.c + tcl: 1. Added test of new capabilities (FSL.BPOPGT now requires the key to exist in order to work)
Diffstat (limited to 'src/blocked.c')
-rw-r--r--src/blocked.c201
1 files changed, 108 insertions, 93 deletions
diff --git a/src/blocked.c b/src/blocked.c
index c4aded0c5..497ffe4ce 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -398,14 +398,26 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
/* Helper function for handleClientsBlockedOnKeys(). This function is called
* when there may be clients blocked on a stream key, and there may be new
- * data to fetch (the key is ready). */
+ * data to fetch (the key is ready).
+ * This function also handles the case where there may be clients blocked,
+ * via XREADGROUP, on an existing stream which was deleted.
+ * We need to unblock the clients in that case.
+ * The idea is that a client that is blocked via XREADGROUP is different from
+ * any other blocking type in the sense that it depends on the existence of both
+ * the key and the group. Even if the key is deleted and then revived with XADD
+ * it won't help any clients blocked on XREADGROUP because the group no longer
+ * exist, so they would fail with -NOGROUP anyway.
+ * The conclusion is that it's better to unblock these client (with error) upon
+ * the deletion of the key, rather than waiting for the first XADD.*/
void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
/* Optimization: If no clients are in type BLOCKED_STREAM,
* we can skip this loop. */
if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return;
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
- stream *s = o->ptr;
+ /* This function may be called with o=NULL (in order to unblock
+ * XREADGROUP clients whose key was deleted) */
+ stream *s = o? o->ptr : NULL;
/* We need to provide the new data arrived on the stream
* to all the clients that are waiting for an offset smaller
@@ -422,6 +434,13 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
bkinfo *bki = dictFetchValue(receiver->bpop.keys,rl->key);
streamID *gt = &bki->stream_id;
+ if (!receiver->bpop.xread_group && (!o || o->type != OBJ_STREAM)) {
+ /* If it's a blocking XREAD and the stream was either deleted
+ * or replaced with another key, we don't do anything (it's ok
+ * the the client blocks on a non-existing key). */
+ continue;
+ }
+
long long prev_error_replies = server.stat_total_error_replies;
client *old_client = server.current_client;
server.current_client = receiver;
@@ -439,6 +458,12 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
* otherwise. */
streamCG *group = NULL;
if (receiver->bpop.xread_group) {
+ /* If it's a blocking XREADGROUP and the stream was either deleted
+ * or replaced with another key, we unblock the client */
+ if (!o || o->type != OBJ_STREAM) {
+ addReplyError(receiver, "-UNBLOCKED the stream key no longer exists");
+ goto unblock_receiver;
+ }
group = streamLookupCG(s,
receiver->bpop.xread_group->ptr);
/* If the group was not found, send an error
@@ -545,52 +570,13 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
server.current_client = receiver;
monotime replyTimer;
elapsedStart(&replyTimer);
- if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
- updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies);
- moduleUnblockClient(receiver);
- afterCommand(receiver);
- server.current_client = old_client;
- }
- }
-}
-
-/* Helper function for handleClientsBlockedOnKeys(). This function is called
- * when there may be clients blocked, via XREADGROUP, on an existing stream which
- * was deleted. We need to unblock the clients in that case.
- * The idea is that a client that is blocked via XREADGROUP is different from
- * any other blocking type in the sense that it depends on the existence of both
- * the key and the group. Even if the key is deleted and then revived with XADD
- * it won't help any clients blocked on XREADGROUP because the group no longer
- * exist, so they would fail with -NOGROUP anyway.
- * The conclusion is that it's better to unblock these client (with error) upon
- * the deletion of the key, rather than waiting for the first XADD. */
-void unblockDeletedStreamReadgroupClients(readyList *rl) {
- /* Optimization: If no clients are in type BLOCKED_STREAM,
- * we can skip this loop. */
- if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return;
-
- /* We serve clients in the same order they blocked for
- * this key, from the first blocked to the last. */
- dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
- if (de) {
- list *clients = dictGetVal(de);
- listNode *ln;
- listIter li;
- listRewind(clients,&li);
-
- while((ln = listNext(&li))) {
- client *receiver = listNodeValue(ln);
- if (receiver->btype != BLOCKED_STREAM || !receiver->bpop.xread_group)
- continue;
-
- long long prev_error_replies = server.stat_total_error_replies;
- client *old_client = server.current_client;
- server.current_client = receiver;
- monotime replyTimer;
- elapsedStart(&replyTimer);
- addReplyError(receiver, "-UNBLOCKED the stream key no longer exists");
- updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies);
- unblockClient(receiver);
+ if (moduleTryServeClientBlockedOnKey(receiver, rl->key)) {
+ updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies);
+ moduleUnblockClient(receiver);
+ }
+ /* We need to call afterCommand even if the client was not unblocked
+ * in order to propagate any changes that could have been done inside
+ * moduleTryServeClientBlockedOnKey */
afterCommand(receiver);
server.current_client = old_client;
}
@@ -646,33 +632,35 @@ void handleClientsBlockedOnKeys(void) {
/* Serve clients blocked on the key. */
robj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOOKUP_NONOTIFY | LOOKUP_NOSTATS);
- if (o != NULL) {
- int objtype = o->type;
- if (objtype == OBJ_LIST)
- serveClientsBlockedOnListKey(o,rl);
- else if (objtype == OBJ_ZSET)
- serveClientsBlockedOnSortedSetKey(o,rl);
- else if (objtype == OBJ_STREAM)
- serveClientsBlockedOnStreamKey(o,rl);
- /* We want to serve clients blocked on module keys
- * regardless of the object type: we don't know what the
- * module is trying to accomplish right now. */
- serveClientsBlockedOnKeyByModule(rl);
- /* If we have XREADGROUP clients blocked on this key, and
- * the key is not a stream, it must mean that the key was
- * overwritten by either SET or something like
- * (MULTI, DEL key, SADD key e, EXEC).
- * In this case we need to unblock all these clients. */
- if (objtype != OBJ_STREAM)
- unblockDeletedStreamReadgroupClients(rl);
- } else {
- /* Unblock all XREADGROUP clients of this deleted key */
- unblockDeletedStreamReadgroupClients(rl);
+ if (!o) {
/* Edge case: If lookupKeyReadWithFlags decides to expire the key we have to
* take care of the propagation here, because afterCommand wasn't called */
- if (server.also_propagate.numops > 0)
- propagatePendingCommands();
+ propagatePendingCommands();
+ } else {
+ if (o->type == OBJ_LIST)
+ serveClientsBlockedOnListKey(o,rl);
+ else if (o->type == OBJ_ZSET)
+ serveClientsBlockedOnSortedSetKey(o,rl);
}
+ /* We need to try to serve stream clients even if the key no longer exists because
+ * XREADGROUP clients need to be unblocked in case the key is missing, either deleted
+ * or replaced by SET or something like {MULTI, DEL key, SADD key e, EXEC}.
+ * In this case we need to unblock all these clients. */
+ serveClientsBlockedOnStreamKey(o,rl);
+ /* We want to serve clients blocked on module keys regardless of the object type, or
+ * whether the object exists or not: we don't know what the module is trying to
+ * accomplish right now.
+ * Please note that this function must be called only after handling non-module
+ * clients, since moduleTryServeClientBlockedOnKey may delete the key, causing `o`
+ * to be stale.
+ * The scenario is that we have one client blocked on BLPOP while another module
+ * client is blocked by MODULE.SAME-AS-BLPOP on the same key.
+ * Of course we can call lookupKeyReadWithFlags again, but:
+ * 1) It takes CPU
+ * 2) It makes more sense to give priority to "native" blocking clients rather
+ * than module blocking clients
+ * */
+ serveClientsBlockedOnKeyByModule(rl);
/* Free this item. */
decrRefCount(rl->key);
@@ -717,8 +705,7 @@ void handleClientsBlockedOnKeys(void) {
*
* 'count' for those commands that support the optional count argument.
* Otherwise the value is 0. */
-void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids) {
- dictEntry *de;
+void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids, int unblock_on_nokey) {
list *l;
int j;
@@ -745,20 +732,27 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, ms
incrRefCount(keys[j]);
/* And in the other "side", to map keys -> clients */
- de = dictFind(c->db->blocking_keys,keys[j]);
- if (de == NULL) {
- int retval;
-
+ dictEntry *de, *existing;
+ de = dictAddRaw(c->db->blocking_keys, keys[j], &existing);
+ if (de) {
+ incrRefCount(keys[j]);
/* For every key we take a list of clients blocked for it */
l = listCreate();
- retval = dictAdd(c->db->blocking_keys,keys[j],l);
- incrRefCount(keys[j]);
- serverAssertWithInfo(c,keys[j],retval == DICT_OK);
+ dictSetVal(c->db->blocking_keys, de, l);
} else {
- l = dictGetVal(de);
+ l = dictGetVal(existing);
}
listAddNodeTail(l,c);
bki->listnode = listLast(l);
+
+ /* We need to add the key to blocking_keys_unblock_on_nokey, if the client
+ * wants to be awakened if key is deleted (like XREADGROUP) */
+ if (unblock_on_nokey) {
+ de = dictAddRaw(c->db->blocking_keys_unblock_on_nokey, keys[j], NULL);
+ if (de) {
+ incrRefCount(keys[j]);
+ }
+ }
}
blockClient(c,btype);
}
@@ -782,8 +776,10 @@ void unblockClientWaitingData(client *c) {
serverAssertWithInfo(c,key,l != NULL);
listDelNode(l,bki->listnode);
/* If the list is empty we need to remove it to avoid wasting memory */
- if (listLength(l) == 0)
+ if (listLength(l) == 0) {
dictDelete(c->db->blocking_keys,key);
+ dictDelete(c->db->blocking_keys_unblock_on_nokey,key);
+ }
}
dictReleaseIterator(di);
@@ -818,7 +814,7 @@ static int getBlockedTypeByType(int type) {
* made by a script or in the context of MULTI/EXEC.
*
* The list will be finally processed by handleClientsBlockedOnKeys() */
-void signalKeyAsReady(redisDb *db, robj *key, int type) {
+static void signalKeyAsReadyLogic(redisDb *db, robj *key, int type, int deleted) {
readyList *rl;
/* Quick returns. */
@@ -836,11 +832,28 @@ void signalKeyAsReady(redisDb *db, robj *key, int type) {
return;
}
- /* No clients blocking for this key? No need to queue it. */
- if (dictFind(db->blocking_keys,key) == NULL) return;
+ if (deleted) {
+ /* Key deleted and no clients blocking for this key? No need to queue it. */
+ if (dictFind(db->blocking_keys_unblock_on_nokey,key) == NULL)
+ return;
+ /* Note: if we made it here it means the key is also present in db->blocking_keys */
+ } else {
+ /* No clients blocking for this key? No need to queue it. */
+ if (dictFind(db->blocking_keys,key) == NULL)
+ return;
+ }
- /* Key was already signaled? No need to queue it again. */
- if (dictFind(db->ready_keys,key) != NULL) return;
+ dictEntry *de, *existing;
+ de = dictAddRaw(db->ready_keys, key, &existing);
+ if (de) {
+ /* We add the key in the db->ready_keys dictionary in order
+ * to avoid adding it multiple times into a list with a simple O(1)
+ * check. */
+ incrRefCount(key);
+ } else {
+ /* Key was already signaled? No need to queue it again. */
+ return;
+ }
/* Ok, we need to queue this key into server.ready_keys. */
rl = zmalloc(sizeof(*rl));
@@ -848,10 +861,12 @@ void signalKeyAsReady(redisDb *db, robj *key, int type) {
rl->db = db;
incrRefCount(key);
listAddNodeTail(server.ready_keys,rl);
+}
- /* We also add the key in the db->ready_keys dictionary in order
- * to avoid adding it multiple times into a list with a simple O(1)
- * check. */
- incrRefCount(key);
- serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
+void signalKeyAsReady(redisDb *db, robj *key, int type) {
+ signalKeyAsReadyLogic(db, key, type, 0);
+}
+
+void signalDeletedKeyAsReady(redisDb *db, robj *key, int type) {
+ signalKeyAsReadyLogic(db, key, type, 1);
}