summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2019-10-30 10:11:58 +0100
committerantirez <antirez@gmail.com>2019-10-30 10:11:58 +0100
commit3649568ff22dbb6ea3e26c722d115ba5635547ae (patch)
tree1129bdb77d155881432f2909c31ba1f748576498
parentcfcb475435a9efb3de794e93103b931d16e3a959 (diff)
downloadredis-3649568ff22dbb6ea3e26c722d115ba5635547ae.tar.gz
Modules: block on keys functions layout and mechanism.
-rw-r--r--src/module.c121
1 files changed, 104 insertions, 17 deletions
diff --git a/src/module.c b/src/module.c
index 971bf5c08..bdafc1590 100644
--- a/src/module.c
+++ b/src/module.c
@@ -245,6 +245,15 @@ typedef struct RedisModuleBlockedClient {
client *reply_client; /* Fake client used to accumulate replies
in thread safe contexts. */
int dbid; /* Database number selected by the original client. */
+ int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */
+ int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *keyname,
+ void *privdata); /* When blocking on keys, even if the
+ key is signaled as ready, maybe it
+ was modified afterward before the
+ client unblocks. So we always
+ need a callback that tells us if
+ the key is ready in order to serve
+ the next blocked client. */
} RedisModuleBlockedClient;
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
@@ -3989,23 +3998,26 @@ void unblockClientFromModule(client *c) {
resetClient(c);
}
-/* Block a client in the context of a blocking command, returning an handle
- * which will be used, later, in order to unblock the client with a call to
- * RedisModule_UnblockClient(). The arguments specify callback functions
- * and a timeout after which the client is unblocked.
+/* Block a client in the context of a module: this function implements both
+ * RM_BlockClient() and RM_BlockClientOnKeys() depending on the fact the
+ * keys are passed or not.
*
- * The callbacks are called in the following contexts:
+ * When not blocking for keys, the keys, numkeys, is_key_ready callback
+ * and privdata parameters are not needed. The privdata in that case must
+ * be NULL, since later is RM_UnblockClient() that will provide some private
+ * data that the reply callback will receive.
*
- * reply_callback: called after a successful RedisModule_UnblockClient()
- * call in order to reply to the client and unblock it.
- *
- * reply_timeout: called when the timeout is reached in order to send an
- * error to the client.
+ * Instead when blocking for keys, normally RM_UnblockClient() will not be
+ * called (because the client will unblock when the key is modified), so
+ * 'privdata' should be provided in that case, so that once the client is
+ * unlocked and the reply callback is called, it will receive its associated
+ * private data.
*
- * free_privdata: called in order to free the private data that is passed
- * by RedisModule_UnblockClient() call.
+ * Even when blocking on keys, RM_UnblockClient() can be called however, but
+ * in that case the privdata argument is disregarded, because we pass the
+ * reply callback the privdata that is set here while blocking.
*/
-RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
+RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *key, void *privdata), void *privdata) {
client *c = ctx->client;
int islua = c->flags & CLIENT_LUA;
int ismulti = c->flags & CLIENT_MULTI;
@@ -4018,17 +4030,20 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
* commands from Lua or MULTI. We actually create an already aborted
* (client set to NULL) blocked client handle, and actually reply with
* an error. */
+ mstime_t timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
bc->client = (islua || ismulti) ? NULL : c;
bc->module = ctx->module;
bc->reply_callback = reply_callback;
bc->timeout_callback = timeout_callback;
bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
bc->free_privdata = free_privdata;
- bc->privdata = NULL;
+ bc->privdata = privdata;
bc->reply_client = createClient(NULL);
bc->reply_client->flags |= CLIENT_MODULE;
bc->dbid = c->db->id;
- c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
+ bc->is_key_ready = is_key_ready;
+ bc->blocked_on_keys = keys != NULL;
+ c->bpop.timeout = timeout;
if (islua || ismulti) {
c->bpop.module_blocked_handle = NULL;
@@ -4036,11 +4051,83 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
"Blocking module command called from Lua script" :
"Blocking module command called from transaction");
} else {
- blockClient(c,BLOCKED_MODULE);
+ if (keys) {
+ blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,NULL,NULL);
+ } else {
+ blockClient(c,BLOCKED_MODULE);
+ }
}
return bc;
}
+/* Block a client in the context of a blocking command, returning an handle
+ * which will be used, later, in order to unblock the client with a call to
+ * RedisModule_UnblockClient(). The arguments specify callback functions
+ * and a timeout after which the client is unblocked.
+ *
+ * The callbacks are called in the following contexts:
+ *
+ * reply_callback: called after a successful RedisModule_UnblockClient()
+ * call in order to reply to the client and unblock it.
+ *
+ * reply_timeout: called when the timeout is reached in order to send an
+ * error to the client.
+ *
+ * free_privdata: called in order to free the private data that is passed
+ * by RedisModule_UnblockClient() call.
+ */
+RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
+ return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL,NULL);
+}
+
+/* This call is similar to RedisModule_BlockClient(), however in this case we
+ * don't just block the client, but also ask Redis to unblock it automatically
+ * once certain keys become "ready", that is, contain more data.
+ *
+ * Basically this is similar to what a typical Redis command usually does,
+ * like BLPOP or ZPOPMAX: the client blocks if it cannot be served ASAP,
+ * and later when the key receives new data (a list push for instance), the
+ * client is unblocked and served.
+ *
+ * However in the case of this module API, when the client is unblocked?
+ *
+ * 1. If you block ok a key of a type that has blocking operations associated,
+ * like a list, a sorted set, a stream, and so forth, the client may be
+ * unblocked once the relevant key is targeted by an operation that normally
+ * unblocks the native blocking operations for that type. So if we block
+ * on a list key, an RPUSH command may unblock our client and so forth.
+ * 2. If you are implementing your native data type, or if you want to add new
+ * unblocking conditions in addition to "1", you can call the modules API
+ * RedisModule_SignalKeyAsReady().
+ *
+ * Anyway we can't be sure if the client should be unblocked just because the
+ * key is signaled as ready: for instance a successive operation may change the
+ * key, or a client in queue before this one can be served, modifying the key
+ * as well and making it empty again. So when blocking for keys, we need to
+ * register a callback called is_key_ready. This callback gets called with
+ * a context, selected with the right database, and the key name: if it
+ * returns 1, then we proceed calling the reply callback, otherwise if the
+ * is_key_ready callback returns 0 the client is not unblocked, since the
+ * key is yet not ready.
+ *
+ * Thanks to this system we can setup complex blocking scenarios, like
+ * unblocking a client only if a list contains at least 5 items or other
+ * more fancy logics.
+ *
+ * Note that another difference with RedisModule_BlockClient(), is that here
+ * we pass the private data directly when blocking the client: such private
+ * data will be later provided both to the is_key_ready callback, and to the
+ * reply callback. Normally in RedisModule_BlockClient() the private data
+ * to reply to the client is passed when calling RedisModule_UnblockClient()
+ * but here the unblocking is performed by Redis itself, so we need to have
+ * some private data before hand. The private data is used to store any
+ * information about the specific unblocking operation that you are
+ * implementing. Such information will be freed using the free_privdata
+ * callback provided by the user. */
+RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *keyname, void *privdata), void *privdata) {
+ return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,is_key_ready,privdata);
+}
+
/* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
* the reply callbacks to be called in order to reply to the client.
* The 'privdata' argument will be accessible by the reply callback, so
@@ -4054,7 +4141,7 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
* Note: this function can be called from threads spawned by the module. */
int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
pthread_mutex_lock(&moduleUnblockedClientsMutex);
- bc->privdata = privdata;
+ if (!bc->blocked_on_keys) bc->privdata = privdata;
listAddNodeTail(moduleUnblockedClients,bc);
if (write(server.module_blocked_pipe[1],"A",1) != 1) {
/* Ignore the error, this is best-effort. */