summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2019-10-31 12:23:55 +0100
committerantirez <antirez@gmail.com>2019-10-31 12:23:55 +0100
commit66f55bc5c15d72542983f37c6c1b48b0c1618917 (patch)
tree769af6fe7ab147e80a4a6f00bb6700993b8ff198
parent629081f839498734d3426d71653619d4f9e93dc9 (diff)
downloadredis-66f55bc5c15d72542983f37c6c1b48b0c1618917.tar.gz
Modules: block on keys: fix bugs in processing order.
-rw-r--r--src/blocked.c16
-rw-r--r--src/module.c7
2 files changed, 23 insertions, 0 deletions
diff --git a/src/blocked.c b/src/blocked.c
index fb58f850b..3110c00fc 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -450,6 +450,22 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
listNode *clientnode = listFirst(clients);
client *receiver = clientnode->value;
+ /* Put at the tail, so that at the next call
+ * we'll not run into it again: clients here may not be
+ * ready to be served, so they'll remain in the list
+ * sometimes. We want also be able to skip clients that are
+ * not blocked for the MODULE type safely. */
+ listDelNode(clients,clientnode);
+ listAddNodeTail(clients,receiver);
+
+ if (receiver->btype != BLOCKED_MODULE) continue;
+
+ /* Note that if *this* client cannot be served by this key,
+ * it does not mean that another client that is next into the
+ * list cannot be served as well: they may be blocked by
+ * different modules with different triggers to consider if a key
+ * is ready or not. This means we can't exit the loop but need
+ * to continue after the first failure. */
if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
moduleUnblockClient(receiver);
diff --git a/src/module.c b/src/module.c
index 353b6f426..248a55e62 100644
--- a/src/module.c
+++ b/src/module.c
@@ -249,6 +249,7 @@ typedef struct RedisModuleBlockedClient {
in thread safe contexts. */
int dbid; /* Database number selected by the original client. */
int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */
+ int unblocked; /* Already on the moduleUnblocked list. */
} RedisModuleBlockedClient;
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
@@ -4037,6 +4038,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
bc->reply_client->flags |= CLIENT_MODULE;
bc->dbid = c->db->id;
bc->blocked_on_keys = keys != NULL;
+ bc->unblocked = 0;
c->bpop.timeout = timeout;
if (islua || ismulti) {
@@ -4063,6 +4065,10 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
int served = 0;
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+ /* Protect against re-processing: don't serve clients that are already
+ * in the unblocking list for any reason (including RM_UnblockClient()
+ * explicit call). */
+ if (bc->unblocked) return REDISMODULE_ERR;
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
ctx.blocked_ready_key = key;
@@ -4162,6 +4168,7 @@ void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) {
int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) {
pthread_mutex_lock(&moduleUnblockedClientsMutex);
if (!bc->blocked_on_keys) bc->privdata = privdata;
+ bc->unblocked = 1;
listAddNodeTail(moduleUnblockedClients,bc);
if (write(server.module_blocked_pipe[1],"A",1) != 1) {
/* Ignore the error, this is best-effort. */