summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2019-10-30 10:57:44 +0100
committerantirez <antirez@gmail.com>2019-10-30 10:57:44 +0100
commit215b72c0ba1f42d15dcfe6fa60abb414275296ba (patch)
tree5197f70a3346bb94604abd05d516fea2a8ee7f50
parentfb6110ac2062a548aa79c9bb5f11e3fa10e71ba2 (diff)
downloadredis-215b72c0ba1f42d15dcfe6fa60abb414275296ba.tar.gz
Modules: block on keys: implement the internals.
-rw-r--r--src/blocked.c30
-rw-r--r--src/module.c54
-rw-r--r--src/server.h2
3 files changed, 78 insertions, 8 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 867f03de6..2b91c1b44 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -430,6 +430,32 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
}
}
+/* Helper function for handleClientsBlockedOnKeys(). This function is called
+ * in order to check if we can serve clients blocked by modules using
+ * RM_BlockClientOnKeys(), when the corresponding key was signaled as ready:
+ * our goal here is to call the is_key_ready() callback to see if the key
+ * is really able to serve the client, and in that case, unblock it. */
+void serveClientsBlockedOnKeyByModule(readyList *rl) {
+ dictEntry *de;
+
+ /* We serve clients in the same order they blocked for
+ * this key, from the first blocked to the last. */
+ de = dictFind(rl->db->blocking_keys,rl->key);
+ if (de) {
+ list *clients = dictGetVal(de);
+ int numclients = listLength(clients);
+
+ while(numclients--) {
+ listNode *clientnode = listFirst(clients);
+ client *receiver = clientnode->value;
+
+ if (!moduleIsKeyReady(receiver, rl->key)) continue;
+
+ moduleUnblockClient(receiver);
+ }
+ }
+}
+
/* This function should be called by Redis every time a single command,
* a MULTI/EXEC block, or a Lua script, terminated its execution after
* being called by a client. It handles serving clients blocked in
@@ -480,6 +506,10 @@ void handleClientsBlockedOnKeys(void) {
serveClientsBlockedOnSortedSetKey(o,rl);
else if (o->type == OBJ_STREAM)
serveClientsBlockedOnStreamKey(o,rl);
+ /* We want to serve clients blocked on module keys
+ * regardless of the object type: we don't know what the
+ * module is trying to accomplish right now. */
+ serveClientsBlockedOnKeyByModule(rl);
}
/* Free this item. */
diff --git a/src/module.c b/src/module.c
index 113375e75..85bc9e808 100644
--- a/src/module.c
+++ b/src/module.c
@@ -4060,6 +4060,24 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
return bc;
}
+/* This function is called from module.c in order to check if a module
+ * blocked for BLOCKED_MODULE and subtype 'on keys' (bc->blocked_on_keys true)
+ * can really be unblocked since the key is ready. */
+int moduleIsKeyReady(client *c, robj *key) {
+ RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+ RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+ ctx.module = bc->module;
+ ctx.client = bc->client;
+ ctx.blocked_privdata = bc->privdata; /* In case the callback uses the
+ API to get the pointer to the
+ privdata, even if we provide it
+ as argument. */
+ selectDb(ctx.client, bc->dbid);
+ int ready = bc->is_key_ready(&ctx, key, bc->privdata);
+ moduleFreeContext(&ctx);
+ return ready;
+}
+
/* Block a client in the context of a blocking command, returning an handle
* which will be used, later, in order to unblock the client with a call to
* RedisModule_UnblockClient(). The arguments specify callback functions
@@ -4137,6 +4155,25 @@ void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) {
signalKeyAsReady(ctx->client->db, key);
}
+/* Implements RM_UnblockClient() and moduleUnblockClient(). */
+int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) {
+ pthread_mutex_lock(&moduleUnblockedClientsMutex);
+ if (!bc->blocked_on_keys) bc->privdata = privdata;
+ listAddNodeTail(moduleUnblockedClients,bc);
+ if (write(server.module_blocked_pipe[1],"A",1) != 1) {
+ /* Ignore the error, this is best-effort. */
+ }
+ pthread_mutex_unlock(&moduleUnblockedClientsMutex);
+ return REDISMODULE_OK;
+}
+
+/* This API is used by the Redis core to unblock a client that was blocked
+ * by a module. */
+void moduleUnblockClient(client *c) {
+ RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+ moduleUnblockClientByHandle(bc,NULL);
+}
+
/* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
* the reply callbacks to be called in order to reply to the client.
* The 'privdata' argument will be accessible by the reply callback, so
@@ -4147,15 +4184,16 @@ void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) {
* needs to be passed to the client, included but not limited some slow
* to compute reply or some reply obtained via networking.
*
- * Note: this function can be called from threads spawned by the module. */
+ * Note: this function can be called from threads spawned by the module.
+ *
+ * Note: when we unblock a client that is blocked for keys using
+ * the API RedisModule_BlockClientOnKeys(), the privdata argument here is
+ * not used, and the reply callback is called with the privdata pointer that
+ * was passed when blocking the client. Also note if you unblock clients
+ * blocked on keys in this way, the reply callback should be ready to handle
+ * the fact the key may not be ready at all. */
int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
- pthread_mutex_lock(&moduleUnblockedClientsMutex);
- if (!bc->blocked_on_keys) bc->privdata = privdata;
- listAddNodeTail(moduleUnblockedClients,bc);
- if (write(server.module_blocked_pipe[1],"A",1) != 1) {
- /* Ignore the error, this is best-effort. */
- }
- pthread_mutex_unlock(&moduleUnblockedClientsMutex);
+ moduleUnblockClientByHandle(bc,privdata);
return REDISMODULE_OK;
}
diff --git a/src/server.h b/src/server.h
index 97672d727..3a2bb1c7b 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1602,6 +1602,8 @@ ssize_t rdbSaveModulesAux(rio *rdb, int when);
int moduleAllDatatypesHandleErrors();
sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections);
void moduleFireServerEvent(uint64_t eid, int subid, void *data);
+int moduleIsKeyReady(client *c, robj *key);
+void moduleUnblockClient(client *c);
/* Utils */
long long ustime(void);