summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2019-10-31 11:35:05 +0100
committerantirez <antirez@gmail.com>2019-10-31 11:35:07 +0100
commit91f4bdc9f9d80ba7431f093cb58b6c49f4021d0b (patch)
tree4bdaf5c41b30572af3829c745b9dbf11ac0dcbbe
parent4534960b293d41fd5193c1d8a51a19453c2aabf6 (diff)
downloadredis-91f4bdc9f9d80ba7431f093cb58b6c49f4021d0b.tar.gz
Modules: block on keys: use a better interface.
Using the is_key_ready() callback plus the reply callback later, creates different issues AFAIK: 1. More complex API. 2. We need to call the reply callback() ASAP if the is_key_ready() interface returned success, however the internals do not work in that way, so when the reply callback is called the setup could be different. To fix that, there is to break the current design that handles the unblocked clients asyncrhonously, and run the list ASAP.
-rw-r--r--src/blocked.c3
-rw-r--r--src/module.c74
-rw-r--r--src/modules/hellotype.c26
-rw-r--r--src/redismodule.h4
-rw-r--r--src/server.h3
5 files changed, 65 insertions, 45 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 2b91c1b44..fb58f850b 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -174,6 +174,7 @@ void unblockClient(client *c) {
} else if (c->btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
} else if (c->btype == BLOCKED_MODULE) {
+ if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
unblockClientFromModule(c);
} else {
serverPanic("Unknown btype in unblockClient().");
@@ -449,7 +450,7 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
listNode *clientnode = listFirst(clients);
client *receiver = clientnode->value;
- if (!moduleIsKeyReady(receiver, rl->key)) continue;
+ if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
moduleUnblockClient(receiver);
}
diff --git a/src/module.c b/src/module.c
index a11d3a306..8837ae017 100644
--- a/src/module.c
+++ b/src/module.c
@@ -140,6 +140,9 @@ struct RedisModuleCtx {
void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */
int postponed_arrays_count; /* Number of entries in postponed_arrays. */
void *blocked_privdata; /* Privdata set when unblocking a client. */
+ RedisModuleString *blocked_ready_key; /* Key ready when the reply callback
+ gets called for clients blocked
+ on keys. */
/* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */
int *keys_pos;
@@ -153,7 +156,7 @@ struct RedisModuleCtx {
};
typedef struct RedisModuleCtx RedisModuleCtx;
-#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL, {0}}
+#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, NULL, 0, NULL, {0}}
#define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
#define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
#define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2)
@@ -246,14 +249,6 @@ typedef struct RedisModuleBlockedClient {
in thread safe contexts. */
int dbid; /* Database number selected by the original client. */
int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */
- int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *keyname,
- void *privdata); /* When blocking on keys, even if the
- key is signaled as ready, maybe it
- was modified afterward before the
- client unblocks. So we always
- need a callback that tells us if
- the key is ready in order to serve
- the next blocked client. */
} RedisModuleBlockedClient;
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
@@ -4002,10 +3997,10 @@ void unblockClientFromModule(client *c) {
* RM_BlockClient() and RM_BlockClientOnKeys() depending on the fact the
* keys are passed or not.
*
- * When not blocking for keys, the keys, numkeys, is_key_ready callback
- * and privdata parameters are not needed. The privdata in that case must
- * be NULL, since later is RM_UnblockClient() that will provide some private
- * data that the reply callback will receive.
+ * When not blocking for keys, the keys, numkeys, and privdata parameters are
+ * not needed. The privdata in that case must be NULL, since later is
+ * RM_UnblockClient() that will provide some private data that the reply
+ * callback will receive.
*
* Instead when blocking for keys, normally RM_UnblockClient() will not be
* called (because the client will unblock when the key is modified), so
@@ -4017,7 +4012,7 @@ void unblockClientFromModule(client *c) {
* in that case the privdata argument is disregarded, because we pass the
* reply callback the privdata that is set here while blocking.
*/
-RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *key, void *privdata), void *privdata) {
+RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
client *c = ctx->client;
int islua = c->flags & CLIENT_LUA;
int ismulti = c->flags & CLIENT_MULTI;
@@ -4041,7 +4036,6 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
bc->reply_client = createClient(NULL);
bc->reply_client->flags |= CLIENT_MODULE;
bc->dbid = c->db->id;
- bc->is_key_ready = is_key_ready;
bc->blocked_on_keys = keys != NULL;
c->bpop.timeout = timeout;
@@ -4062,20 +4056,24 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
/* This function is called from module.c in order to check if a module
* blocked for BLOCKED_MODULE and subtype 'on keys' (bc->blocked_on_keys true)
- * can really be unblocked since the key is ready. */
-int moduleIsKeyReady(client *c, robj *key) {
+ * can really be unblocked, since the module was able to serve the client.
+ * If the callback returns REDISMODULE_OK, then the client can be unblocked,
+ * otherwise the client remains blocked and we'll retry again when one of
+ * the keys it blocked for becomes "ready" again. */
+int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
+ int served = 0;
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+ ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
+ ctx.blocked_ready_key = key;
+ ctx.blocked_privdata = bc->privdata;
ctx.module = bc->module;
ctx.client = bc->client;
- ctx.blocked_privdata = bc->privdata; /* In case the callback uses the
- API to get the pointer to the
- privdata, even if we provide it
- as argument. */
- selectDb(ctx.client, bc->dbid);
- int ready = bc->is_key_ready(&ctx, key, bc->privdata);
+ ctx.blocked_client = bc;
+ if (bc->reply_callback(&ctx,(void**)c->argv,c->argc) == REDISMODULE_OK)
+ served = 1;
moduleFreeContext(&ctx);
- return ready;
+ return served;
}
/* Block a client in the context of a blocking command, returning an handle
@@ -4095,7 +4093,7 @@ int moduleIsKeyReady(client *c, robj *key) {
* by RedisModule_UnblockClient() call.
*/
RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
- return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL,NULL);
+ return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL);
}
/* This call is similar to RedisModule_BlockClient(), however in this case we
@@ -4142,8 +4140,8 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
* information about the specific unblocking operation that you are
* implementing. Such information will be freed using the free_privdata
* callback provided by the user. */
-RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *keyname, void *privdata), void *privdata) {
- return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,is_key_ready,privdata);
+RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
+ return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata);
}
/* This function is used in order to potentially unblock a client blocked
@@ -4174,6 +4172,13 @@ void moduleUnblockClient(client *c) {
moduleUnblockClientByHandle(bc,NULL);
}
+/* Return true if the client 'c' was blocked by a module using
+ * RM_BlockClientOnKeys(). */
+int moduleClientIsBlockedOnKeys(client *c) {
+ RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+ return bc->blocked_on_keys;
+}
+
/* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
* the reply callbacks to be called in order to reply to the client.
* The 'privdata' argument will be accessible by the reply callback, so
@@ -4253,11 +4258,15 @@ void moduleHandleBlockedClients(void) {
* touch the shared list. */
/* Call the reply callback if the client is valid and we have
- * any callback. */
- if (c && bc->reply_callback) {
+ * any callback. However the callback is not called if the client
+ * was blocked on keys (RM_BlockClientOnKeys()), because we already
+ * called such callback in moduleTryServeClientBlockedOnKey() when
+ * the key was signaled as ready. */
+ if (c && !bc->blocked_on_keys && bc->reply_callback) {
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
ctx.blocked_privdata = bc->privdata;
+ ctx.blocked_ready_key = NULL;
ctx.module = bc->module;
ctx.client = bc->client;
ctx.blocked_client = bc;
@@ -4349,6 +4358,12 @@ void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) {
return ctx->blocked_privdata;
}
+/* Get the key that is ready when the reply callback is called in the context
+ * of a client blocked by RedisModule_BlockClientOnKeys(). */
+RedisModuleString *RM_GetBlockedClientReadyKey(RedisModuleCtx *ctx) {
+ return ctx->blocked_ready_key;
+}
+
/* Get the blocked client associated with a given context.
* This is useful in the reply and timeout callbacks of blocked clients,
* before sometimes the module has the blocked client handle references
@@ -6682,4 +6697,5 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(SubscribeToServerEvent);
REGISTER_API(BlockClientOnKeys);
REGISTER_API(SignalKeyAsReady);
+ REGISTER_API(GetBlockedClientReadyKey);
}
diff --git a/src/modules/hellotype.c b/src/modules/hellotype.c
index 084408798..dafbadbe5 100644
--- a/src/modules/hellotype.c
+++ b/src/modules/hellotype.c
@@ -193,26 +193,26 @@ int HelloTypeLen_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
/* ====================== Example of a blocking command ==================== */
-/* Is_key_ready callback for blocking command HELLOTYPE.BRANGE */
-int HelloBlock_IsKeyReady(RedisModuleCtx *ctx, RedisModuleString *keyname, void *privdata) {
- REDISMODULE_NOT_USED(privdata);
+/* Reply callback for blocking command HELLOTYPE.BRANGE, this will get
+ * called when the key we blocked for is ready: we need to check if we
+ * can really serve the client, and reply OK or ERR accordingly. */
+int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
- RedisModule_AutoMemory(ctx); /* Use automatic memory management. */
+ RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
RedisModuleKey *key = RedisModule_OpenKey(ctx,keyname,REDISMODULE_READ);
int type = RedisModule_KeyType(key);
if (type != REDISMODULE_KEYTYPE_MODULE ||
RedisModule_ModuleTypeGetType(key) != HelloType)
{
- return 0;
- } else {
- return 1;
+ RedisModule_CloseKey(key);
+ return REDISMODULE_ERR;
}
-}
-/* Reply callback for blocking command HELLOTYPE.BRANGE */
-int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- REDISMODULE_NOT_USED(argv);
- REDISMODULE_NOT_USED(argc);
+ /* In case the key is able to serve our blocked client, let's directly
+ * use our original command implementation to make this example simpler. */
+ RedisModule_CloseKey(key);
return HelloTypeRange_RedisCommand(ctx,argv,argc-1);
}
@@ -251,7 +251,7 @@ int HelloTypeBRange_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
}
void *privdata = RedisModule_Alloc(100);
- RedisModule_BlockClientOnKeys(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout,argv+1,1,HelloBlock_IsKeyReady,privdata);
+ RedisModule_BlockClientOnKeys(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout,argv+1,1,privdata);
return REDISMODULE_OK;
}
diff --git a/src/redismodule.h b/src/redismodule.h
index 5b4c31b19..1b284770b 100644
--- a/src/redismodule.h
+++ b/src/redismodule.h
@@ -492,8 +492,9 @@ int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldDouble)(RedisModuleInfoCtx *ctx
int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldLongLong)(RedisModuleInfoCtx *ctx, char *field, long long value);
int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldULongLong)(RedisModuleInfoCtx *ctx, char *field, unsigned long long value);
int REDISMODULE_API_FUNC(RedisModule_SubscribeToServerEvent)(RedisModuleCtx *ctx, RedisModuleEvent event, RedisModuleEventCallback callback);
-RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *keyname, void *privdata), void *privdata);
+RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata);
void REDISMODULE_API_FUNC(RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key);
+RedisModuleString *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientReadyKey)(RedisModuleCtx *ctx);
/* Experimental APIs */
#ifdef REDISMODULE_EXPERIMENTAL_API
@@ -692,6 +693,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(SubscribeToServerEvent);
REDISMODULE_GET_API(BlockClientOnKeys);
REDISMODULE_GET_API(SignalKeyAsReady);
+ REDISMODULE_GET_API(GetBlockedClientReadyKey);
#ifdef REDISMODULE_EXPERIMENTAL_API
REDISMODULE_GET_API(GetThreadSafeContext);
diff --git a/src/server.h b/src/server.h
index 3a2bb1c7b..f724f7d64 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1602,8 +1602,9 @@ ssize_t rdbSaveModulesAux(rio *rdb, int when);
int moduleAllDatatypesHandleErrors();
sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections);
void moduleFireServerEvent(uint64_t eid, int subid, void *data);
-int moduleIsKeyReady(client *c, robj *key);
+int moduleTryServeClientBlockedOnKey(client *c, robj *key);
void moduleUnblockClient(client *c);
+int moduleClientIsBlockedOnKeys(client *c);
/* Utils */
long long ustime(void);