summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2016-10-07 11:55:35 +0200
committerantirez <antirez@gmail.com>2016-10-07 11:55:35 +0200
commit8fadfe52a2d1abf3d4d12707004f1209703d446c (patch)
tree3657b6cd3f52b79d9e2cb1ef185c1fb6d23dd15e
parenta5998d1fda00862e57671d6986f2d8865ef5fd19 (diff)
downloadredis-8fadfe52a2d1abf3d4d12707004f1209703d446c.tar.gz
Module: API to block clients with threading support.
Just a draft to align the main ideas, never executed code. Compiles.
-rw-r--r--src/blocked.c7
-rw-r--r--src/module.c176
-rw-r--r--src/server.c4
-rw-r--r--src/server.h9
4 files changed, 185 insertions, 11 deletions
diff --git a/src/blocked.c b/src/blocked.c
index d22872548..54b26b713 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -136,6 +136,8 @@ void unblockClient(client *c) {
unblockClientWaitingData(c);
} else if (c->btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
+ } else if (c->btype == BLOCKED_MODULE) {
+ unblockClientFromModule(c);
} else {
serverPanic("Unknown btype in unblockClient().");
}
@@ -153,12 +155,15 @@ void unblockClient(client *c) {
}
/* This function gets called when a blocked client timed out in order to
- * send it a reply of some kind. */
+ * send it a reply of some kind. After this function is called,
+ * unblockClient() will be called with the same client as argument. */
void replyToBlockedClientTimedOut(client *c) {
if (c->btype == BLOCKED_LIST) {
addReply(c,shared.nullmultibulk);
} else if (c->btype == BLOCKED_WAIT) {
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
+ } else if (c->btype == BLOCKED_MODULE) {
+ moduleBlockedClientTimedOut(c);
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
}
diff --git a/src/module.c b/src/module.c
index 742d9b974..9a939d2fc 100644
--- a/src/module.c
+++ b/src/module.c
@@ -105,6 +105,7 @@ struct RedisModuleCtx {
int flags; /* REDISMODULE_CTX_... flags. */
void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */
int postponed_arrays_count; /* Number of entries in postponed_arrays. */
+ void *blocked_privdata; /* Privdata set when unblocking a clinet. */
/* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */
int *keys_pos;
@@ -114,10 +115,12 @@ struct RedisModuleCtx {
};
typedef struct RedisModuleCtx RedisModuleCtx;
-#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, 0, NULL}
+#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL}
#define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
#define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
#define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2)
+#define REDISMODULE_CTX_BLOCKED_REPLY (1<<3)
+#define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<4)
/* This represents a Redis key opened with RM_OpenKey(). */
struct RedisModuleKey {
@@ -183,6 +186,23 @@ typedef struct RedisModuleCallReply {
} val;
} RedisModuleCallReply;
+/* Structure representing a blocked client. We get a pointer to such
+ * an object when blocking from modules. */
+typedef struct RedisModuleBlockedClient {
+ client *client; /* Pointer to the blocked client. or NULL if the client
+ was destroyed during the life of this object. */
+ RedisModule *module; /* Module blocking the client. */
+ RedisModuleCmdFunc reply_callback; /* Reply callback on normal completion.*/
+ RedisModuleCmdFunc timeout_callback; /* Reply callback on timeout. */
+ void (*free_privdata)(void *); /* privdata cleanup callback. */
+ void *privdata; /* Module private data that may be used by the reply
+ or timeout callback. It is set via the
+ RedisModule_UnblockClient() API. */
+} RedisModuleBlockedClient;
+
+static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
+static list *moduleUnblockedClients;
+
/* --------------------------------------------------------------------------
* Prototypes
* -------------------------------------------------------------------------- */
@@ -403,26 +423,36 @@ void moduleFreeContext(RedisModuleCtx *ctx) {
}
}
-/* This Redis command binds the normal Redis command invocation with commands
- * exported by modules. */
-void RedisModuleCommandDispatcher(client *c) {
- RedisModuleCommandProxy *cp = (void*)(unsigned long)c->cmd->getkeys_proc;
- RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+/* Helper function for when a command callback is called, in order to handle
+ * details needed to correctly replicate commands. */
+void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
+ client *c = ctx->client;
- ctx.module = cp->module;
- ctx.client = c;
- cp->func(&ctx,(void**)c->argv,c->argc);
+ /* We don't want any automatic propagation here since in modules we handle
+ * replication / AOF propagation in explicit ways. */
preventCommandPropagation(c);
/* Handle the replication of the final EXEC, since whatever a command
* emits is always wrappered around MULTI/EXEC. */
- if (ctx.flags & REDISMODULE_CTX_MULTI_EMITTED) {
+ if (ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) {
robj *propargv[1];
propargv[0] = createStringObject("EXEC",4);
alsoPropagate(server.execCommand,c->db->id,propargv,1,
PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(propargv[0]);
}
+}
+
+/* This Redis command binds the normal Redis command invocation with commands
+ * exported by modules. */
+void RedisModuleCommandDispatcher(client *c) {
+ RedisModuleCommandProxy *cp = (void*)(unsigned long)c->cmd->getkeys_proc;
+ RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+
+ ctx.module = cp->module;
+ ctx.client = c;
+ cp->func(&ctx,(void**)c->argv,c->argc);
+ moduleHandlePropagationAfterCommandCallback(&ctx);
moduleFreeContext(&ctx);
}
@@ -3035,6 +3065,130 @@ void RM_LogIOError(RedisModuleIO *io, const char *levelstr, const char *fmt, ...
}
/* --------------------------------------------------------------------------
+ * Blocking clients from modules
+ * -------------------------------------------------------------------------- */
+
+/* This is called from blocked.c in order to unblock a client: may be called
+ * for multiple reasons while the client is in the middle of being blocked
+ * because the client is terminated, but is also called for cleanup when a
+ * client is unblocked in a clean way after replaying.
+ *
+ * What we do here is just to set the client to NULL in the redis module
+ * blocked client handle. This way if the client is terminated while there
+ * is a pending threaded operation involving the blocked client, we'll know
+ * that the client no longer exists and no reply callback should be called.
+ *
+ * The structure RedisModuleBlockedClient will be always deallocated when
+ * running the list of clients blocked by a module that need to be unblocked. */
+void unblockClientFromModule(client *c) {
+ RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+ bc->client = NULL;
+}
+
+int RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(void*), long long timeout_ms) {
+ client *c = ctx->client;
+ c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
+ RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+
+ bc->client = c;
+ bc->module = ctx->module;
+ bc->reply_callback = reply_callback;
+ bc->timeout_callback = timeout_callback;
+ bc->free_privdata = free_privdata;
+ bc->privdata = NULL;
+ c->bpop.timeout = timeout_ms;
+
+ blockClient(c,BLOCKED_MODULE);
+ return REDISMODULE_OK;
+}
+
+/* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
+ * the reply callbacks to be called in order to reply to the client.
+ * The 'privdata' argument will be accessible by the reply callback, so
+ * the caller of this function can pass any value that is needed in order to
+ * actually reply to the client.
+ *
+ * A common usage for 'privdata' is a thread that computes something that
+ * needs to be passed to the client, included but not limited some slow
+ * to compute reply or some reply obtained via networking.
+ *
+ * Note: this function can be called from threads spawned by the module. */
+int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
+ pthread_mutex_lock(&moduleUnblockedClientsMutex);
+ bc->privdata = privdata;
+ listAddNodeTail(moduleUnblockedClients,bc);
+ pthread_mutex_unlock(&moduleUnblockedClientsMutex);
+ return REDISMODULE_OK;
+}
+
+/* This function will check the moduleUnblockedClients queue in order to
+ * call the reply callback and really unblock the client.
+ *
+ * Clients end into this list because of calls to RM_UnblockClient(),
+ * however it is possible that while the module was doing work for the
+ * blocked client, it was terminated by Redis (for timeout or other reasons).
+ * When this happens the RedisModuleBlockedClient structure in the queue
+ * will have the 'client' field set to NULL. */
+void moduleHandleBlockedClients(void) {
+ listNode *ln;
+ RedisModuleBlockedClient *bc;
+
+ pthread_mutex_lock(&moduleUnblockedClientsMutex);
+ while (listLength(moduleUnblockedClients)) {
+ ln = listFirst(moduleUnblockedClients);
+ bc = ln->value;
+ client *c = bc->client;
+ listDelNode(server.unblocked_clients,ln);
+
+ if (c != NULL) {
+ RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+ ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
+ ctx.blocked_privdata = bc->privdata;
+ ctx.module = bc->module;
+ ctx.client = bc->client;
+ bc->reply_callback(&ctx,(void**)c->argv,c->argc);
+ moduleHandlePropagationAfterCommandCallback(&ctx);
+ moduleFreeContext(&ctx);
+ }
+ if (bc->privdata && bc->free_privdata)
+ bc->free_privdata(bc->privdata);
+ zfree(bc);
+ }
+ pthread_mutex_unlock(&moduleUnblockedClientsMutex);
+}
+
+/* Called when our client timed out. After this function unblockClient()
+ * is called, and it will invalidate the blocked client. So this function
+ * does not need to do any cleanup. Eventually the module will call the
+ * API to unblock the client and the memory will be released. */
+void moduleBlockedClientTimedOut(client *c) {
+ RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+ RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+ ctx.flags |= REDISMODULE_CTX_BLOCKED_TIMEOUT;
+ ctx.module = bc->module;
+ ctx.client = bc->client;
+ bc->timeout_callback(&ctx,(void**)c->argv,c->argc);
+ moduleFreeContext(&ctx);
+}
+
+/* Return non-zero if a module command was called in order to fill the
+ * reply for a blocked client. */
+int RM_IsBlockedReplyRequest(RedisModuleCtx *ctx) {
+ return (ctx->flags & REDISMODULE_CTX_BLOCKED_REPLY) != 0;
+}
+
+/* Return non-zero if a module command was called in order to fill the
+ * reply for a blocked client that timed out. */
+int RM_IsBlockedTimeoutRequest(RedisModuleCtx *ctx) {
+ return (ctx->flags & REDISMODULE_CTX_BLOCKED_TIMEOUT) != 0;
+}
+
+/* Get the privata data set by RedisModule_UnblockClient() */
+void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) {
+ return ctx->blocked_privdata;
+}
+
+/* --------------------------------------------------------------------------
* Modules API internals
* -------------------------------------------------------------------------- */
@@ -3070,6 +3224,8 @@ int moduleRegisterApi(const char *funcname, void *funcptr) {
void moduleRegisterCoreAPI(void);
void moduleInitModulesSystem(void) {
+ moduleUnblockedClients = listCreate();
+
server.loadmodule_queue = listCreate();
modules = dictCreate(&modulesDictType,NULL);
moduleRegisterCoreAPI();
diff --git a/src/server.c b/src/server.c
index 36b04abfb..a05491852 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1195,6 +1195,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
if (listLength(server.clients_waiting_acks))
processClientsWaitingReplicas();
+ /* Check if there are clients unblocked by modules that implement
+ * blocking commands. */
+ moduleHandleBlockedClients();
+
/* Try to process pending commands for clients that were just unblocked. */
if (listLength(server.unblocked_clients))
processUnblockedClients();
diff --git a/src/server.h b/src/server.h
index b9c46b810..69ee52e64 100644
--- a/src/server.h
+++ b/src/server.h
@@ -245,6 +245,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define BLOCKED_NONE 0 /* Not blocked, no CLIENT_BLOCKED flag set. */
#define BLOCKED_LIST 1 /* BLPOP & co. */
#define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */
+#define BLOCKED_MODULE 3 /* Blocked by a loadable module. */
/* Client request types */
#define PROTO_REQ_INLINE 1
@@ -619,6 +620,11 @@ typedef struct blockingState {
/* BLOCKED_WAIT */
int numreplicas; /* Number of replicas we are waiting for ACK. */
long long reploffset; /* Replication offset to reach. */
+
+ /* BLOCKED_MODULE */
+ void *module_blocked_handle; /* RedisModuleBlockedClient structure.
+ which is opaque for the Redis core, only
+ handled in module.c. */
} blockingState;
/* The following structure represents a node in the server.ready_keys list,
@@ -1226,6 +1232,9 @@ int *moduleGetCommandKeysViaAPI(struct redisCommand *cmd, robj **argv, int argc,
moduleType *moduleTypeLookupModuleByID(uint64_t id);
void moduleTypeNameByID(char *name, uint64_t moduleid);
void moduleFreeContext(struct RedisModuleCtx *ctx);
+void unblockClientFromModule(client *c);
+void moduleHandleBlockedClients(void);
+void moduleBlockedClientTimedOut(client *c);
/* Utils */
long long ustime(void);