summaryrefslogtreecommitdiff
path: root/src/module.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/module.c')
-rw-r--r--src/module.c235
1 files changed, 204 insertions, 31 deletions
diff --git a/src/module.c b/src/module.c
index 6ce57f69c..cf980a38a 100644
--- a/src/module.c
+++ b/src/module.c
@@ -394,6 +394,7 @@ typedef struct RedisModuleServerInfoData {
#define REDISMODULE_ARGV_CALL_REPLIES_AS_ERRORS (1<<8)
#define REDISMODULE_ARGV_RESPECT_DENY_OOM (1<<9)
#define REDISMODULE_ARGV_DRY_RUN (1<<10)
+#define REDISMODULE_ARGV_ALLOW_BLOCK (1<<11)
/* Determine whether Redis should signalModifiedKey implicitly.
* In case 'ctx' has no 'module' member (and therefore no module->options),
@@ -469,6 +470,15 @@ struct ModuleConfig {
RedisModule *module;
};
+typedef struct RedisModuleAsyncRMCallPromise{
+ size_t ref_count;
+ void *private_data;
+ RedisModule *module;
+ RedisModuleOnUnblocked on_unblocked;
+ client *c;
+ RedisModuleCtx *ctx;
+} RedisModuleAsyncRMCallPromise;
+
/* --------------------------------------------------------------------------
* Prototypes
* -------------------------------------------------------------------------- */
@@ -492,7 +502,7 @@ static struct redisCommandArg *moduleCopyCommandArgs(RedisModuleCommandArg *args
const RedisModuleCommandInfoVersion *version);
static redisCommandArgType moduleConvertArgType(RedisModuleCommandArgType type, int *error);
static int moduleConvertArgFlags(int flags);
-
+void moduleCreateContext(RedisModuleCtx *out_ctx, RedisModule *module, int ctx_flags);
/* --------------------------------------------------------------------------
* ## Heap allocation raw functions
*
@@ -619,6 +629,16 @@ client *moduleAllocTempClient(user *user) {
return c;
}
+static void freeRedisModuleAsyncRMCallPromise(RedisModuleAsyncRMCallPromise *promise) {
+ if (--promise->ref_count > 0) {
+ return;
+ }
+ /* When the promise is finally freed it can not have a client attached to it.
+ * Either releasing the client or RM_CallReplyPromiseAbort would have removed it. */
+ serverAssert(!promise->c);
+ zfree(promise);
+}
+
void moduleReleaseTempClient(client *c) {
if (moduleTempClientCount == moduleTempClientCap) {
moduleTempClientCap = moduleTempClientCap ? moduleTempClientCap*2 : 32;
@@ -632,6 +652,12 @@ void moduleReleaseTempClient(client *c) {
c->flags = CLIENT_MODULE;
c->user = NULL; /* Root user */
c->cmd = c->lastcmd = c->realcmd = NULL;
+ if (c->bstate.async_rm_call_handle) {
+ RedisModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle;
+ promise->c = NULL; /* Remove the client from the promise so it will no longer be possible to abort it. */
+ freeRedisModuleAsyncRMCallPromise(promise);
+ c->bstate.async_rm_call_handle = NULL;
+ }
moduleTempClients[moduleTempClientCount++] = c;
}
@@ -771,7 +797,7 @@ void modulePostExecutionUnitOperations() {
void moduleFreeContext(RedisModuleCtx *ctx) {
/* See comment in moduleCreateContext */
if (!(ctx->flags & (REDISMODULE_CTX_THREAD_SAFE|REDISMODULE_CTX_COMMAND))) {
- server.execution_nesting--;
+ exitExecutionUnit();
postExecutionUnitOperations();
}
autoMemoryCollect(ctx);
@@ -796,6 +822,42 @@ void moduleFreeContext(RedisModuleCtx *ctx) {
freeClient(ctx->client);
}
+static CallReply *moduleParseReply(client *c, RedisModuleCtx *ctx) {
+ /* Convert the result of the Redis command into a module reply. */
+ sds proto = sdsnewlen(c->buf,c->bufpos);
+ c->bufpos = 0;
+ while(listLength(c->reply)) {
+ clientReplyBlock *o = listNodeValue(listFirst(c->reply));
+
+ proto = sdscatlen(proto,o->buf,o->used);
+ listDelNode(c->reply,listFirst(c->reply));
+ }
+ CallReply *reply = callReplyCreate(proto, c->deferred_reply_errors, ctx);
+ c->deferred_reply_errors = NULL; /* now the responsibility of the reply object. */
+ return reply;
+}
+
+void moduleCallCommandUnblockedHandler(client *c) {
+ RedisModuleCtx ctx;
+ RedisModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle;
+ serverAssert(promise);
+ RedisModule *module = promise->module;
+ if (!promise->on_unblocked) {
+ moduleReleaseTempClient(c);
+ return; /* module did not set any unblock callback. */
+ }
+ moduleCreateContext(&ctx, module, REDISMODULE_CTX_TEMP_CLIENT);
+ selectDb(ctx.client, c->db->id);
+
+ CallReply *reply = moduleParseReply(c, &ctx);
+ module->in_call++;
+ promise->on_unblocked(&ctx, reply, promise->private_data);
+ module->in_call--;
+
+ moduleFreeContext(&ctx);
+ moduleReleaseTempClient(c);
+}
+
/* Create a module ctx and keep track of the nesting level.
*
* Note: When creating ctx for threads (RM_GetThreadSafeContext and
@@ -832,7 +894,7 @@ void moduleCreateContext(RedisModuleCtx *out_ctx, RedisModule *module, int ctx_f
* 2. If we are running in a thread (execution_nesting will be dealt with
* when locking/unlocking the GIL) */
if (!(ctx_flags & (REDISMODULE_CTX_THREAD_SAFE|REDISMODULE_CTX_COMMAND))) {
- server.execution_nesting++;
+ enterExecutionUnit(1, 0);
}
}
@@ -5632,9 +5694,20 @@ void RM_FreeCallReply(RedisModuleCallReply *reply) {
/* This is a wrapper for the recursive free reply function. This is needed
* in order to have the first level function to return on nested replies,
* but only if called by the module API. */
- RedisModuleCtx *ctx = callReplyGetPrivateData(reply);
+
+ RedisModuleCtx *ctx = NULL;
+ if(callReplyType(reply) == REDISMODULE_REPLY_PROMISE) {
+ RedisModuleAsyncRMCallPromise *promise = callReplyGetPrivateData(reply);
+ ctx = promise->ctx;
+ freeRedisModuleAsyncRMCallPromise(promise);
+ } else {
+ ctx = callReplyGetPrivateData(reply);
+ }
+
freeCallReply(reply);
- autoMemoryFreed(ctx,REDISMODULE_AM_REPLY,reply);
+ if (ctx) {
+ autoMemoryFreed(ctx,REDISMODULE_AM_REPLY,reply);
+ }
}
/* Return the reply type as one of the following:
@@ -5651,7 +5724,8 @@ void RM_FreeCallReply(RedisModuleCallReply *reply) {
* - REDISMODULE_REPLY_DOUBLE
* - REDISMODULE_REPLY_BIG_NUMBER
* - REDISMODULE_REPLY_VERBATIM_STRING
- * - REDISMODULE_REPLY_ATTRIBUTE */
+ * - REDISMODULE_REPLY_ATTRIBUTE
+ * - REDISMODULE_REPLY_PROMISE */
int RM_CallReplyType(RedisModuleCallReply *reply) {
return callReplyType(reply);
}
@@ -5734,6 +5808,39 @@ int RM_CallReplyAttributeElement(RedisModuleCallReply *reply, size_t idx, RedisM
return REDISMODULE_ERR;
}
+/* Set unblock handler (callback and private data) on the given promise RedisModuleCallReply.
+ * The given reply must be of promise type (REDISMODULE_REPLY_PROMISE). */
+void RM_CallReplyPromiseSetUnblockHandler(RedisModuleCallReply *reply, RedisModuleOnUnblocked on_unblock, void *private_data) {
+ RedisModuleAsyncRMCallPromise *promise = callReplyGetPrivateData(reply);
+ promise->on_unblocked = on_unblock;
+ promise->private_data = private_data;
+}
+
+/* Abort the execution of a given promise RedisModuleCallReply.
+ * return REDMODULE_OK in case the abort was done successfully and REDISMODULE_ERR
+ * if its not possible to abort the execution (execution already finished).
+ * In case the execution was aborted (REDMODULE_OK was returned), the private_data out parameter
+ * will be set with the value of the private data that was given on 'RM_CallReplyPromiseSetUnblockHandler'
+ * so the caller will be able to release the private data.
+ *
+ * If the execution was aborted successfully, it is promised that the unblock handler will not be called.
+ * That said, it is possible that the abort operation will successes but the operation will still continue.
+ * This can happened if, for example, a module implements some blocking command and does not respect the
+ * disconnect callback. For pure Redis commands this can not happened.*/
+int RM_CallReplyPromiseAbort(RedisModuleCallReply *reply, void **private_data) {
+ RedisModuleAsyncRMCallPromise *promise = callReplyGetPrivateData(reply);
+ if (!promise->c) return REDISMODULE_ERR; /* Promise can not be aborted, either already aborted or already finished. */
+ if (!(promise->c->flags & CLIENT_BLOCKED)) return REDISMODULE_ERR; /* Client is not blocked anymore, can not abort it. */
+
+ /* Client is still blocked, remove it from any blocking state and release it. */
+ if (private_data) *private_data = promise->private_data;
+ promise->private_data = NULL;
+ promise->on_unblocked = NULL;
+ unblockClient(promise->c, 0);
+ moduleReleaseTempClient(promise->c);
+ return REDISMODULE_OK;
+}
+
/* Return the pointer and length of a string or error reply. */
const char *RM_CallReplyStringPtr(RedisModuleCallReply *reply, size_t *len) {
size_t private_len;
@@ -5781,6 +5888,7 @@ void RM_SetContextUser(RedisModuleCtx *ctx, const RedisModuleUser *user) {
* "0" -> REDISMODULE_ARGV_RESP_AUTO
* "C" -> REDISMODULE_ARGV_RUN_AS_USER
* "M" -> REDISMODULE_ARGV_RESPECT_DENY_OOM
+ * "K" -> REDISMODULE_ARGV_ALLOW_BLOCK
*
* On error (format specifier error) NULL is returned and nothing is
* allocated. On success the argument vector is returned. */
@@ -5855,6 +5963,8 @@ robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int
if (flags) (*flags) |= REDISMODULE_ARGV_CALL_REPLIES_AS_ERRORS;
} else if (*p == 'D') {
if (flags) (*flags) |= (REDISMODULE_ARGV_DRY_RUN | REDISMODULE_ARGV_CALL_REPLIES_AS_ERRORS);
+ } else if (*p == 'K') {
+ if (flags) (*flags) |= REDISMODULE_ARGV_ALLOW_BLOCK;
} else {
goto fmterr;
}
@@ -5919,6 +6029,34 @@ fmterr:
* If everything succeeded, it will return with a NULL, otherwise it will
* return with a CallReply object denoting the error, as if it was called with
* the 'E' code.
+ * * 'K' -- Allow running blocking commands. If enabled and the command gets blocked, a
+ * special REDISMODULE_REPLY_PROMISE will be returned. This reply type
+ * indicates that the command was blocked and the reply will be given asynchronously.
+ * The module can use this reply object to set a handler which will be called when
+ * the command gets unblocked using RedisModule_CallReplyPromiseSetUnblockHandler.
+ * The handler must be set immediately after the command invocation (without releasing
+ * the Redis lock in between). If the handler is not set, the blocking command will
+ * still continue its execution but the reply will be ignored (fire and forget),
+ * notice that this is dangerous in case of role change, as explained below.
+ * The module can use RedisModule_CallReplyPromiseAbort to abort the command invocation
+ * if it was not yet finished (see RedisModule_CallReplyPromiseAbort documentation for more
+ * details). It is also the module's responsibility to abort the execution on role change, either by using
+ * server event (to get notified when the instance becomes a replica) or relying on the disconnect
+ * callback of the original client. Failing to do so can result in a write operation on a replica.
+ * Unlike other call replies, promise call reply **must** be freed while the Redis GIL is locked.
+ * Notice that on unblocking, the only promise is that the unblock handler will be called,
+ * If the blocking RM_Call caused the module to also block some real client (using RM_BlockClient),
+ * it is the module responsibility to unblock this client on the unblock handler.
+ * On the unblock handler it is only allowed to perform the following:
+ * * Calling additional Redis commands using RM_Call
+ * * Open keys using RM_OpenKey
+ * * Replicate data to the replica or AOF
+ *
+ * Specifically, it is not allowed to call any Redis module API which are client related such as:
+ * * RM_Reply* API's
+ * * RM_BlockClient
+ * * RM_GetCurrentUserName
+ *
* * **...**: The actual arguments to the Redis command.
*
* On success a RedisModuleCallReply object is returned, otherwise
@@ -5978,8 +6116,10 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
c = moduleAllocTempClient(user);
- /* We do not want to allow block, the module do not expect it */
- c->flags |= CLIENT_DENY_BLOCKING;
+ if (!(flags & REDISMODULE_ARGV_ALLOW_BLOCK)) {
+ /* We do not want to allow block, the module do not expect it */
+ c->flags |= CLIENT_DENY_BLOCKING;
+ }
c->db = ctx->client->db;
c->argv = argv;
/* We have to assign argv_len, which is equal to argc in that case (RM_Call)
@@ -6212,24 +6352,39 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
call(c,call_flags);
server.replication_allowed = prev_replication_allowed;
- serverAssert((c->flags & CLIENT_BLOCKED) == 0);
-
- /* Convert the result of the Redis command into a module reply. */
- sds proto = sdsnewlen(c->buf,c->bufpos);
- c->bufpos = 0;
- while(listLength(c->reply)) {
- clientReplyBlock *o = listNodeValue(listFirst(c->reply));
-
- proto = sdscatlen(proto,o->buf,o->used);
- listDelNode(c->reply,listFirst(c->reply));
+ if (c->flags & CLIENT_BLOCKED) {
+ serverAssert(flags & REDISMODULE_ARGV_ALLOW_BLOCK);
+ serverAssert(ctx->module);
+ RedisModuleAsyncRMCallPromise *promise = zmalloc(sizeof(RedisModuleAsyncRMCallPromise));
+ *promise = (RedisModuleAsyncRMCallPromise) {
+ /* We start with ref_count value of 2 because this object is held
+ * by the promise CallReply and the fake client that was used to execute the command. */
+ .ref_count = 2,
+ .module = ctx->module,
+ .on_unblocked = NULL,
+ .private_data = NULL,
+ .c = c,
+ .ctx = (ctx->flags & REDISMODULE_CTX_AUTO_MEMORY) ? ctx : NULL,
+ };
+ reply = callReplyCreatePromise(promise);
+ c->bstate.async_rm_call_handle = promise;
+ if (!(call_flags & CMD_CALL_PROPAGATE_AOF)) {
+ /* No need for AOF propagation, set the relevant flags of the client */
+ c->flags |= CLIENT_MODULE_PREVENT_AOF_PROP;
+ }
+ if (!(call_flags & CMD_CALL_PROPAGATE_REPL)) {
+ /* No need for replication propagation, set the relevant flags of the client */
+ c->flags |= CLIENT_MODULE_PREVENT_REPL_PROP;
+ }
+ c = NULL; /* Make sure not to free the client */
+ } else {
+ reply = moduleParseReply(c, (ctx->flags & REDISMODULE_CTX_AUTO_MEMORY) ? ctx : NULL);
}
- reply = callReplyCreate(proto, c->deferred_reply_errors, ctx);
- c->deferred_reply_errors = NULL; /* now the responsibility of the reply object. */
cleanup:
if (reply) autoMemoryAdd(ctx,REDISMODULE_AM_REPLY,reply);
if (ctx->module) ctx->module->in_call--;
- moduleReleaseTempClient(c);
+ if (c) moduleReleaseTempClient(c);
return reply;
}
@@ -7710,6 +7865,16 @@ RedisModuleBlockedClient *RM_BlockClientOnAuth(RedisModuleCtx *ctx, RedisModuleA
return bc;
}
+/* Get the private data that was previusely set on a blocked client */
+void *RM_BlockClientGetPrivateData(RedisModuleBlockedClient *blocked_client) {
+ return blocked_client->privdata;
+}
+
+/* Set private data on a blocked client */
+void RM_BlockClientSetPrivateData(RedisModuleBlockedClient *blocked_client, void *private_data) {
+ blocked_client->privdata = private_data;
+}
+
/* This call is similar to RedisModule_BlockClient(), however in this case we
* don't just block the client, but also ask Redis to unblock it automatically
* once certain keys become "ready", that is, contain more data.
@@ -7959,7 +8124,7 @@ void moduleHandleBlockedClients(void) {
* to NULL, because if we reached this point, the client was
* properly unblocked by the module. */
bc->disconnect_callback = NULL;
- unblockClient(c);
+ unblockClient(c, 1);
/* Put the client in the list of clients that need to write
* if there are pending replies here. This is needed since
* during a non blocking command the client may receive output. */
@@ -8145,8 +8310,7 @@ void moduleGILAfterLock() {
serverAssert(server.execution_nesting == 0);
/* Bump up the nesting level to prevent immediate propagation
* of possible RM_Call from th thread */
- server.execution_nesting++;
- updateCachedTime(0);
+ enterExecutionUnit(1, 0);
}
/* Acquire the server lock before executing a thread safe API call.
@@ -8184,7 +8348,7 @@ void moduleGILBeforeUnlock() {
/* Restore nesting level and propagate pending commands
* (because it's unclear when thread safe contexts are
* released we have to propagate here). */
- server.execution_nesting--;
+ exitExecutionUnit();
postExecutionUnitOperations();
}
@@ -8294,8 +8458,10 @@ int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNoti
void firePostExecutionUnitJobs() {
/* Avoid propagation of commands.
* In that way, postExecutionUnitOperations will prevent
- * recursive calls to firePostExecutionUnitJobs. */
- server.execution_nesting++;
+ * recursive calls to firePostExecutionUnitJobs.
+ * This is a special case where we need to increase 'execution_nesting'
+ * but we do not want to update the cached time */
+ enterExecutionUnit(0, 0);
while (listLength(modulePostExecUnitJobs) > 0) {
listNode *ln = listFirst(modulePostExecUnitJobs);
RedisModulePostExecUnitJob *job = listNodeValue(ln);
@@ -8311,7 +8477,7 @@ void firePostExecutionUnitJobs() {
moduleFreeContext(&ctx);
zfree(job);
}
- server.execution_nesting--;
+ exitExecutionUnit();
}
/* When running inside a key space notification callback, it is dangerous and highly discouraged to perform any write
@@ -8374,8 +8540,11 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
*
* In order to do that we increment the execution_nesting counter, thus
* preventing postExecutionUnitOperations (from within moduleFreeContext)
- * from propagating commands from CB. */
- server.execution_nesting++;
+ * from propagating commands from CB.
+ *
+ * This is a special case where we need to increase 'execution_nesting'
+ * but we do not want to update the cached time */
+ enterExecutionUnit(0, 0);
listIter li;
listNode *ln;
@@ -8406,7 +8575,7 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
}
}
- server.execution_nesting--;
+ exitExecutionUnit();
}
/* Unsubscribe any notification subscribers this module has upon unloading */
@@ -13087,6 +13256,8 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(CallReplySetElement);
REGISTER_API(CallReplyMapElement);
REGISTER_API(CallReplyAttributeElement);
+ REGISTER_API(CallReplyPromiseSetUnblockHandler);
+ REGISTER_API(CallReplyPromiseAbort);
REGISTER_API(CallReplyAttribute);
REGISTER_API(CallReplyType);
REGISTER_API(CallReplyLength);
@@ -13201,6 +13372,8 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(GetKeyNameFromDigest);
REGISTER_API(GetDbIdFromDigest);
REGISTER_API(BlockClient);
+ REGISTER_API(BlockClientGetPrivateData);
+ REGISTER_API(BlockClientSetPrivateData);
REGISTER_API(BlockClientOnAuth);
REGISTER_API(UnblockClient);
REGISTER_API(IsBlockedReplyRequest);