summaryrefslogtreecommitdiff
path: root/src/blocked.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/blocked.c')
-rw-r--r--src/blocked.c48
1 files changed, 48 insertions, 0 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 867f03de6..14c2ff830 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -174,6 +174,7 @@ void unblockClient(client *c) {
} else if (c->btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
} else if (c->btype == BLOCKED_MODULE) {
+ if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
unblockClientFromModule(c);
} else {
serverPanic("Unknown btype in unblockClient().");
@@ -430,6 +431,49 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
}
}
+/* Helper function for handleClientsBlockedOnKeys(). This function is called
+ * in order to check if we can serve clients blocked by modules using
+ * RM_BlockClientOnKeys(), when the corresponding key was signaled as ready:
+ * our goal here is to call the RedisModuleBlockedClient reply() callback to
+ * see if the key is really able to serve the client, and in that case,
+ * unblock it. */
+void serveClientsBlockedOnKeyByModule(readyList *rl) {
+ dictEntry *de;
+
+ /* We serve clients in the same order they blocked for
+ * this key, from the first blocked to the last. */
+ de = dictFind(rl->db->blocking_keys,rl->key);
+ if (de) {
+ list *clients = dictGetVal(de);
+ int numclients = listLength(clients);
+
+ while(numclients--) {
+ listNode *clientnode = listFirst(clients);
+ client *receiver = clientnode->value;
+
+ /* Put at the tail, so that at the next call
+ * we'll not run into it again: clients here may not be
+ * ready to be served, so they'll remain in the list
+ * sometimes. We want also be able to skip clients that are
+ * not blocked for the MODULE type safely. */
+ listDelNode(clients,clientnode);
+ listAddNodeTail(clients,receiver);
+
+ if (receiver->btype != BLOCKED_MODULE) continue;
+
+ /* Note that if *this* client cannot be served by this key,
+ * it does not mean that another client that is next into the
+ * list cannot be served as well: they may be blocked by
+ * different modules with different triggers to consider if a key
+ * is ready or not. This means we can't exit the loop but need
+ * to continue after the first failure. */
+ if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
+
+ moduleUnblockClient(receiver);
+ }
+ }
+}
+
/* This function should be called by Redis every time a single command,
* a MULTI/EXEC block, or a Lua script, terminated its execution after
* being called by a client. It handles serving clients blocked in
@@ -480,6 +524,10 @@ void handleClientsBlockedOnKeys(void) {
serveClientsBlockedOnSortedSetKey(o,rl);
else if (o->type == OBJ_STREAM)
serveClientsBlockedOnStreamKey(o,rl);
+ /* We want to serve clients blocked on module keys
+ * regardless of the object type: we don't know what the
+ * module is trying to accomplish right now. */
+ serveClientsBlockedOnKeyByModule(rl);
}
/* Free this item. */