summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSalvatore Sanfilippo <antirez@gmail.com>2019-11-04 10:37:06 +0100
committerGitHub <noreply@github.com>2019-11-04 10:37:06 +0100
commitda44c548977dc2ce5bb0bd8c9d77a1c84b609e6b (patch)
treef6ab96938c5f17c8bd92416e3a1df98aa91768e9
parent51c3ff8d75d9c784297ad587a31a37acd89499d8 (diff)
parentfdaea2a7a7eed1499f46bb98552f8d8bb8dc7e9d (diff)
downloadredis-da44c548977dc2ce5bb0bd8c9d77a1c84b609e6b.tar.gz
Merge branch 'unstable' into module_hooks
-rw-r--r--src/blocked.c48
-rw-r--r--src/module.c230
-rw-r--r--src/modules/hellotype.c76
-rw-r--r--src/redismodule.h6
-rw-r--r--src/server.h3
5 files changed, 334 insertions, 29 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 867f03de6..14c2ff830 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -174,6 +174,7 @@ void unblockClient(client *c) {
} else if (c->btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
} else if (c->btype == BLOCKED_MODULE) {
+ if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
unblockClientFromModule(c);
} else {
serverPanic("Unknown btype in unblockClient().");
@@ -430,6 +431,49 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
}
}
+/* Helper function for handleClientsBlockedOnKeys(). This function is called
+ * in order to check if we can serve clients blocked by modules using
+ * RM_BlockClientOnKeys(), when the corresponding key was signaled as ready:
+ * our goal here is to call the RedisModuleBlockedClient reply() callback to
+ * see if the key is really able to serve the client, and in that case,
+ * unblock it. */
+void serveClientsBlockedOnKeyByModule(readyList *rl) {
+ dictEntry *de;
+
+ /* We serve clients in the same order they blocked for
+ * this key, from the first blocked to the last. */
+ de = dictFind(rl->db->blocking_keys,rl->key);
+ if (de) {
+ list *clients = dictGetVal(de);
+ int numclients = listLength(clients);
+
+ while(numclients--) {
+ listNode *clientnode = listFirst(clients);
+ client *receiver = clientnode->value;
+
+ /* Put at the tail, so that at the next call
+ * we'll not run into it again: clients here may not be
+ * ready to be served, so they'll remain in the list
+ * sometimes. We want also be able to skip clients that are
+ * not blocked for the MODULE type safely. */
+ listDelNode(clients,clientnode);
+ listAddNodeTail(clients,receiver);
+
+ if (receiver->btype != BLOCKED_MODULE) continue;
+
+ /* Note that if *this* client cannot be served by this key,
+ * it does not mean that another client that is next into the
+ * list cannot be served as well: they may be blocked by
+ * different modules with different triggers to consider if a key
+ * is ready or not. This means we can't exit the loop but need
+ * to continue after the first failure. */
+ if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
+
+ moduleUnblockClient(receiver);
+ }
+ }
+}
+
/* This function should be called by Redis every time a single command,
* a MULTI/EXEC block, or a Lua script, terminated its execution after
* being called by a client. It handles serving clients blocked in
@@ -480,6 +524,10 @@ void handleClientsBlockedOnKeys(void) {
serveClientsBlockedOnSortedSetKey(o,rl);
else if (o->type == OBJ_STREAM)
serveClientsBlockedOnStreamKey(o,rl);
+ /* We want to serve clients blocked on module keys
+ * regardless of the object type: we don't know what the
+ * module is trying to accomplish right now. */
+ serveClientsBlockedOnKeyByModule(rl);
}
/* Free this item. */
diff --git a/src/module.c b/src/module.c
index 57e388881..d985eba16 100644
--- a/src/module.c
+++ b/src/module.c
@@ -140,6 +140,9 @@ struct RedisModuleCtx {
void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */
int postponed_arrays_count; /* Number of entries in postponed_arrays. */
void *blocked_privdata; /* Privdata set when unblocking a client. */
+ RedisModuleString *blocked_ready_key; /* Key ready when the reply callback
+ gets called for clients blocked
+ on keys. */
/* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */
int *keys_pos;
@@ -153,7 +156,7 @@ struct RedisModuleCtx {
};
typedef struct RedisModuleCtx RedisModuleCtx;
-#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL, {0}}
+#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, NULL, 0, NULL, {0}}
#define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
#define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
#define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2)
@@ -245,6 +248,8 @@ typedef struct RedisModuleBlockedClient {
client *reply_client; /* Fake client used to accumulate replies
in thread safe contexts. */
int dbid; /* Database number selected by the original client. */
+ int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */
+ int unblocked; /* Already on the moduleUnblocked list. */
} RedisModuleBlockedClient;
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
@@ -4034,23 +4039,26 @@ void unblockClientFromModule(client *c) {
resetClient(c);
}
-/* Block a client in the context of a blocking command, returning an handle
- * which will be used, later, in order to unblock the client with a call to
- * RedisModule_UnblockClient(). The arguments specify callback functions
- * and a timeout after which the client is unblocked.
+/* Block a client in the context of a module: this function implements both
+ * RM_BlockClient() and RM_BlockClientOnKeys() depending on the fact the
+ * keys are passed or not.
*
- * The callbacks are called in the following contexts:
+ * When not blocking for keys, the keys, numkeys, and privdata parameters are
+ * not needed. The privdata in that case must be NULL, since later is
+ * RM_UnblockClient() that will provide some private data that the reply
+ * callback will receive.
*
- * reply_callback: called after a successful RedisModule_UnblockClient()
- * call in order to reply to the client and unblock it.
+ * Instead when blocking for keys, normally RM_UnblockClient() will not be
+ * called (because the client will unblock when the key is modified), so
+ * 'privdata' should be provided in that case, so that once the client is
+ * unlocked and the reply callback is called, it will receive its associated
+ * private data.
*
- * reply_timeout: called when the timeout is reached in order to send an
- * error to the client.
- *
- * free_privdata: called in order to free the private data that is passed
- * by RedisModule_UnblockClient() call.
+ * Even when blocking on keys, RM_UnblockClient() can be called however, but
+ * in that case the privdata argument is disregarded, because we pass the
+ * reply callback the privdata that is set here while blocking.
*/
-RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
+RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
client *c = ctx->client;
int islua = c->flags & CLIENT_LUA;
int ismulti = c->flags & CLIENT_MULTI;
@@ -4063,17 +4071,20 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
* commands from Lua or MULTI. We actually create an already aborted
* (client set to NULL) blocked client handle, and actually reply with
* an error. */
+ mstime_t timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
bc->client = (islua || ismulti) ? NULL : c;
bc->module = ctx->module;
bc->reply_callback = reply_callback;
bc->timeout_callback = timeout_callback;
bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
bc->free_privdata = free_privdata;
- bc->privdata = NULL;
+ bc->privdata = privdata;
bc->reply_client = createClient(NULL);
bc->reply_client->flags |= CLIENT_MODULE;
bc->dbid = c->db->id;
- c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
+ bc->blocked_on_keys = keys != NULL;
+ bc->unblocked = 0;
+ c->bpop.timeout = timeout;
if (islua || ismulti) {
c->bpop.module_blocked_handle = NULL;
@@ -4081,11 +4092,150 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
"Blocking module command called from Lua script" :
"Blocking module command called from transaction");
} else {
- blockClient(c,BLOCKED_MODULE);
+ if (keys) {
+ blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,NULL,NULL);
+ } else {
+ blockClient(c,BLOCKED_MODULE);
+ }
}
return bc;
}
+/* This function is called from module.c in order to check if a module
+ * blocked for BLOCKED_MODULE and subtype 'on keys' (bc->blocked_on_keys true)
+ * can really be unblocked, since the module was able to serve the client.
+ * If the callback returns REDISMODULE_OK, then the client can be unblocked,
+ * otherwise the client remains blocked and we'll retry again when one of
+ * the keys it blocked for becomes "ready" again. */
+int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
+ int served = 0;
+ RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+ /* Protect against re-processing: don't serve clients that are already
+ * in the unblocking list for any reason (including RM_UnblockClient()
+ * explicit call). */
+ if (bc->unblocked) return REDISMODULE_ERR;
+ RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+ ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
+ ctx.blocked_ready_key = key;
+ ctx.blocked_privdata = bc->privdata;
+ ctx.module = bc->module;
+ ctx.client = bc->client;
+ ctx.blocked_client = bc;
+ if (bc->reply_callback(&ctx,(void**)c->argv,c->argc) == REDISMODULE_OK)
+ served = 1;
+ moduleFreeContext(&ctx);
+ return served;
+}
+
+/* Block a client in the context of a blocking command, returning an handle
+ * which will be used, later, in order to unblock the client with a call to
+ * RedisModule_UnblockClient(). The arguments specify callback functions
+ * and a timeout after which the client is unblocked.
+ *
+ * The callbacks are called in the following contexts:
+ *
+ * reply_callback: called after a successful RedisModule_UnblockClient()
+ * call in order to reply to the client and unblock it.
+ *
+ * reply_timeout: called when the timeout is reached in order to send an
+ * error to the client.
+ *
+ * free_privdata: called in order to free the private data that is passed
+ * by RedisModule_UnblockClient() call.
+ */
+RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
+ return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL);
+}
+
+/* This call is similar to RedisModule_BlockClient(), however in this case we
+ * don't just block the client, but also ask Redis to unblock it automatically
+ * once certain keys become "ready", that is, contain more data.
+ *
+ * Basically this is similar to what a typical Redis command usually does,
+ * like BLPOP or ZPOPMAX: the client blocks if it cannot be served ASAP,
+ * and later when the key receives new data (a list push for instance), the
+ * client is unblocked and served.
+ *
+ * However in the case of this module API, when the client is unblocked?
+ *
+ * 1. If you block ok a key of a type that has blocking operations associated,
+ * like a list, a sorted set, a stream, and so forth, the client may be
+ * unblocked once the relevant key is targeted by an operation that normally
+ * unblocks the native blocking operations for that type. So if we block
+ * on a list key, an RPUSH command may unblock our client and so forth.
+ * 2. If you are implementing your native data type, or if you want to add new
+ * unblocking conditions in addition to "1", you can call the modules API
+ * RedisModule_SignalKeyAsReady().
+ *
+ * Anyway we can't be sure if the client should be unblocked just because the
+ * key is signaled as ready: for instance a successive operation may change the
+ * key, or a client in queue before this one can be served, modifying the key
+ * as well and making it empty again. So when a client is blocked with
+ * RedisModule_BlockClientOnKeys() the reply callback is not called after
+ * RM_UnblockCLient() is called, but every time a key is signaled as ready:
+ * if the reply callback can serve the client, it returns REDISMODULE_OK
+ * and the client is unblocked, otherwise it will return REDISMODULE_ERR
+ * and we'll try again later.
+ *
+ * The reply callback can access the key that was signaled as ready by
+ * calling the API RedisModule_GetBlockedClientReadyKey(), that returns
+ * just the string name of the key as a RedisModuleString object.
+ *
+ * Thanks to this system we can setup complex blocking scenarios, like
+ * unblocking a client only if a list contains at least 5 items or other
+ * more fancy logics.
+ *
+ * Note that another difference with RedisModule_BlockClient(), is that here
+ * we pass the private data directly when blocking the client: it will
+ * be accessible later in the reply callback. Normally when blocking with
+ * RedisModule_BlockClient() the private data to reply to the client is
+ * passed when calling RedisModule_UnblockClient() but here the unblocking
+ * is performed by Redis itself, so we need to have some private data before
+ * hand. The private data is used to store any information about the specific
+ * unblocking operation that you are implementing. Such information will be
+ * freed using the free_privdata callback provided by the user.
+ *
+ * However the reply callback will be able to access the argument vector of
+ * the command, so the private data is often not needed. */
+RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
+ return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata);
+}
+
+/* This function is used in order to potentially unblock a client blocked
+ * on keys with RedisModule_BlockClientOnKeys(). When this function is called,
+ * all the clients blocked for this key will get their reply callback called,
+ * and if the callback returns REDISMODULE_OK the client will be unblocked. */
+void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) {
+ signalKeyAsReady(ctx->client->db, key);
+}
+
+/* Implements RM_UnblockClient() and moduleUnblockClient(). */
+int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) {
+ pthread_mutex_lock(&moduleUnblockedClientsMutex);
+ if (!bc->blocked_on_keys) bc->privdata = privdata;
+ bc->unblocked = 1;
+ listAddNodeTail(moduleUnblockedClients,bc);
+ if (write(server.module_blocked_pipe[1],"A",1) != 1) {
+ /* Ignore the error, this is best-effort. */
+ }
+ pthread_mutex_unlock(&moduleUnblockedClientsMutex);
+ return REDISMODULE_OK;
+}
+
+/* This API is used by the Redis core to unblock a client that was blocked
+ * by a module. */
+void moduleUnblockClient(client *c) {
+ RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+ moduleUnblockClientByHandle(bc,NULL);
+}
+
+/* Return true if the client 'c' was blocked by a module using
+ * RM_BlockClientOnKeys(). */
+int moduleClientIsBlockedOnKeys(client *c) {
+ RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+ return bc->blocked_on_keys;
+}
+
/* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
* the reply callbacks to be called in order to reply to the client.
* The 'privdata' argument will be accessible by the reply callback, so
@@ -4096,15 +4246,25 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
* needs to be passed to the client, included but not limited some slow
* to compute reply or some reply obtained via networking.
*
- * Note: this function can be called from threads spawned by the module. */
+ * Note 1: this function can be called from threads spawned by the module.
+ *
+ * Note 2: when we unblock a client that is blocked for keys using
+ * the API RedisModule_BlockClientOnKeys(), the privdata argument here is
+ * not used, and the reply callback is called with the privdata pointer that
+ * was passed when blocking the client.
+ *
+ * Unblocking a client that was blocked for keys using this API will still
+ * require the client to get some reply, so the function will use the
+ * "timeout" handler in order to do so. */
int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
- pthread_mutex_lock(&moduleUnblockedClientsMutex);
- bc->privdata = privdata;
- listAddNodeTail(moduleUnblockedClients,bc);
- if (write(server.module_blocked_pipe[1],"A",1) != 1) {
- /* Ignore the error, this is best-effort. */
- }
- pthread_mutex_unlock(&moduleUnblockedClientsMutex);
+ if (bc->blocked_on_keys) {
+ /* In theory the user should always pass the timeout handler as an
+ * argument, but better to be safe than sorry. */
+ if (bc->timeout_callback == NULL) return REDISMODULE_ERR;
+ if (bc->unblocked) return REDISMODULE_OK;
+ if (bc->client) moduleBlockedClientTimedOut(bc->client);
+ }
+ moduleUnblockClientByHandle(bc,privdata);
return REDISMODULE_OK;
}
@@ -4164,16 +4324,19 @@ void moduleHandleBlockedClients(void) {
* touch the shared list. */
/* Call the reply callback if the client is valid and we have
- * any callback. */
- if (c && bc->reply_callback) {
+ * any callback. However the callback is not called if the client
+ * was blocked on keys (RM_BlockClientOnKeys()), because we already
+ * called such callback in moduleTryServeClientBlockedOnKey() when
+ * the key was signaled as ready. */
+ if (c && !bc->blocked_on_keys && bc->reply_callback) {
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
ctx.blocked_privdata = bc->privdata;
+ ctx.blocked_ready_key = NULL;
ctx.module = bc->module;
ctx.client = bc->client;
ctx.blocked_client = bc;
bc->reply_callback(&ctx,(void**)c->argv,c->argc);
- moduleHandlePropagationAfterCommandCallback(&ctx);
moduleFreeContext(&ctx);
}
@@ -4261,6 +4424,12 @@ void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) {
return ctx->blocked_privdata;
}
+/* Get the key that is ready when the reply callback is called in the context
+ * of a client blocked by RedisModule_BlockClientOnKeys(). */
+RedisModuleString *RM_GetBlockedClientReadyKey(RedisModuleCtx *ctx) {
+ return ctx->blocked_ready_key;
+}
+
/* Get the blocked client associated with a given context.
* This is useful in the reply and timeout callbacks of blocked clients,
* before sometimes the module has the blocked client handle references
@@ -4315,7 +4484,7 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
ctx->client = createClient(NULL);
if (bc) {
selectDb(ctx->client,bc->dbid);
- ctx->client->id = bc->client->id;
+ if (bc->client) ctx->client->id = bc->client->id;
}
return ctx;
}
@@ -6702,4 +6871,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(InfoAddFieldULongLong);
REGISTER_API(GetClientInfoById);
REGISTER_API(SubscribeToServerEvent);
+ REGISTER_API(BlockClientOnKeys);
+ REGISTER_API(SignalKeyAsReady);
+ REGISTER_API(GetBlockedClientReadyKey);
}
diff --git a/src/modules/hellotype.c b/src/modules/hellotype.c
index ba634c4a1..4f2d1d730 100644
--- a/src/modules/hellotype.c
+++ b/src/modules/hellotype.c
@@ -129,6 +129,7 @@ int HelloTypeInsert_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
/* Insert the new element. */
HelloTypeInsert(hto,value);
+ RedisModule_SignalKeyAsReady(ctx,argv[1]);
RedisModule_ReplyWithLongLong(ctx,hto->len);
RedisModule_ReplicateVerbatim(ctx);
@@ -190,6 +191,77 @@ int HelloTypeLen_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
return REDISMODULE_OK;
}
+/* ====================== Example of a blocking command ==================== */
+
+/* Reply callback for blocking command HELLOTYPE.BRANGE, this will get
+ * called when the key we blocked for is ready: we need to check if we
+ * can really serve the client, and reply OK or ERR accordingly. */
+int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
+ RedisModuleKey *key = RedisModule_OpenKey(ctx,keyname,REDISMODULE_READ);
+ int type = RedisModule_KeyType(key);
+ if (type != REDISMODULE_KEYTYPE_MODULE ||
+ RedisModule_ModuleTypeGetType(key) != HelloType)
+ {
+ RedisModule_CloseKey(key);
+ return REDISMODULE_ERR;
+ }
+
+ /* In case the key is able to serve our blocked client, let's directly
+ * use our original command implementation to make this example simpler. */
+ RedisModule_CloseKey(key);
+ return HelloTypeRange_RedisCommand(ctx,argv,argc-1);
+}
+
+/* Timeout callback for blocking command HELLOTYPE.BRANGE */
+int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ return RedisModule_ReplyWithSimpleString(ctx,"Request timedout");
+}
+
+/* Private data freeing callback for HELLOTYPE.BRANGE command. */
+void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) {
+ REDISMODULE_NOT_USED(ctx);
+ RedisModule_Free(privdata);
+}
+
+/* HELLOTYPE.BRANGE key first count timeout -- This is a blocking verison of
+ * the RANGE operation, in order to show how to use the API
+ * RedisModule_BlockClientOnKeys(). */
+int HelloTypeBRange_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 5) return RedisModule_WrongArity(ctx);
+ RedisModule_AutoMemory(ctx); /* Use automatic memory management. */
+ RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1],
+ REDISMODULE_READ|REDISMODULE_WRITE);
+ int type = RedisModule_KeyType(key);
+ if (type != REDISMODULE_KEYTYPE_EMPTY &&
+ RedisModule_ModuleTypeGetType(key) != HelloType)
+ {
+ return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE);
+ }
+
+ /* Parse the timeout before even trying to serve the client synchronously,
+ * so that we always fail ASAP on syntax errors. */
+ long long timeout;
+ if (RedisModule_StringToLongLong(argv[4],&timeout) != REDISMODULE_OK) {
+ return RedisModule_ReplyWithError(ctx,
+ "ERR invalid timeout parameter");
+ }
+
+ /* Can we serve the reply synchronously? */
+ if (type != REDISMODULE_KEYTYPE_EMPTY) {
+ return HelloTypeRange_RedisCommand(ctx,argv,argc-1);
+ }
+
+ /* Otherwise let's block on the key. */
+ void *privdata = RedisModule_Alloc(100);
+ RedisModule_BlockClientOnKeys(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout,argv+1,1,privdata);
+ return REDISMODULE_OK;
+}
/* ========================== "hellotype" type methods ======================= */
@@ -282,5 +354,9 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
HelloTypeLen_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx,"hellotype.brange",
+ HelloTypeBRange_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
return REDISMODULE_OK;
}
diff --git a/src/redismodule.h b/src/redismodule.h
index 7ae1ad692..0049f394e 100644
--- a/src/redismodule.h
+++ b/src/redismodule.h
@@ -573,6 +573,9 @@ int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldDouble)(RedisModuleInfoCtx *ctx
int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldLongLong)(RedisModuleInfoCtx *ctx, char *field, long long value);
int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldULongLong)(RedisModuleInfoCtx *ctx, char *field, unsigned long long value);
int REDISMODULE_API_FUNC(RedisModule_SubscribeToServerEvent)(RedisModuleCtx *ctx, RedisModuleEvent event, RedisModuleEventCallback callback);
+RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata);
+void REDISMODULE_API_FUNC(RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key);
+RedisModuleString *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientReadyKey)(RedisModuleCtx *ctx);
/* Experimental APIs */
#ifdef REDISMODULE_EXPERIMENTAL_API
@@ -773,6 +776,9 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(InfoAddFieldULongLong);
REDISMODULE_GET_API(GetClientInfoById);
REDISMODULE_GET_API(SubscribeToServerEvent);
+ REDISMODULE_GET_API(BlockClientOnKeys);
+ REDISMODULE_GET_API(SignalKeyAsReady);
+ REDISMODULE_GET_API(GetBlockedClientReadyKey);
#ifdef REDISMODULE_EXPERIMENTAL_API
REDISMODULE_GET_API(GetThreadSafeContext);
diff --git a/src/server.h b/src/server.h
index b52d57a05..fba24b195 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1603,6 +1603,9 @@ int moduleAllDatatypesHandleErrors();
sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections);
void moduleFireServerEvent(uint64_t eid, int subid, void *data);
void processModuleLoadingProgressEvent(int is_aof);
+int moduleTryServeClientBlockedOnKey(client *c, robj *key);
+void moduleUnblockClient(client *c);
+int moduleClientIsBlockedOnKeys(client *c);
/* Utils */
long long ustime(void);