diff options
Diffstat (limited to 'src/module.c')
-rw-r--r-- | src/module.c | 235 |
1 files changed, 204 insertions, 31 deletions
diff --git a/src/module.c b/src/module.c index 6ce57f69c..cf980a38a 100644 --- a/src/module.c +++ b/src/module.c @@ -394,6 +394,7 @@ typedef struct RedisModuleServerInfoData { #define REDISMODULE_ARGV_CALL_REPLIES_AS_ERRORS (1<<8) #define REDISMODULE_ARGV_RESPECT_DENY_OOM (1<<9) #define REDISMODULE_ARGV_DRY_RUN (1<<10) +#define REDISMODULE_ARGV_ALLOW_BLOCK (1<<11) /* Determine whether Redis should signalModifiedKey implicitly. * In case 'ctx' has no 'module' member (and therefore no module->options), @@ -469,6 +470,15 @@ struct ModuleConfig { RedisModule *module; }; +typedef struct RedisModuleAsyncRMCallPromise{ + size_t ref_count; + void *private_data; + RedisModule *module; + RedisModuleOnUnblocked on_unblocked; + client *c; + RedisModuleCtx *ctx; +} RedisModuleAsyncRMCallPromise; + /* -------------------------------------------------------------------------- * Prototypes * -------------------------------------------------------------------------- */ @@ -492,7 +502,7 @@ static struct redisCommandArg *moduleCopyCommandArgs(RedisModuleCommandArg *args const RedisModuleCommandInfoVersion *version); static redisCommandArgType moduleConvertArgType(RedisModuleCommandArgType type, int *error); static int moduleConvertArgFlags(int flags); - +void moduleCreateContext(RedisModuleCtx *out_ctx, RedisModule *module, int ctx_flags); /* -------------------------------------------------------------------------- * ## Heap allocation raw functions * @@ -619,6 +629,16 @@ client *moduleAllocTempClient(user *user) { return c; } +static void freeRedisModuleAsyncRMCallPromise(RedisModuleAsyncRMCallPromise *promise) { + if (--promise->ref_count > 0) { + return; + } + /* When the promise is finally freed it can not have a client attached to it. + * Either releasing the client or RM_CallReplyPromiseAbort would have removed it. */ + serverAssert(!promise->c); + zfree(promise); +} + void moduleReleaseTempClient(client *c) { if (moduleTempClientCount == moduleTempClientCap) { moduleTempClientCap = moduleTempClientCap ? moduleTempClientCap*2 : 32; @@ -632,6 +652,12 @@ void moduleReleaseTempClient(client *c) { c->flags = CLIENT_MODULE; c->user = NULL; /* Root user */ c->cmd = c->lastcmd = c->realcmd = NULL; + if (c->bstate.async_rm_call_handle) { + RedisModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle; + promise->c = NULL; /* Remove the client from the promise so it will no longer be possible to abort it. */ + freeRedisModuleAsyncRMCallPromise(promise); + c->bstate.async_rm_call_handle = NULL; + } moduleTempClients[moduleTempClientCount++] = c; } @@ -771,7 +797,7 @@ void modulePostExecutionUnitOperations() { void moduleFreeContext(RedisModuleCtx *ctx) { /* See comment in moduleCreateContext */ if (!(ctx->flags & (REDISMODULE_CTX_THREAD_SAFE|REDISMODULE_CTX_COMMAND))) { - server.execution_nesting--; + exitExecutionUnit(); postExecutionUnitOperations(); } autoMemoryCollect(ctx); @@ -796,6 +822,42 @@ void moduleFreeContext(RedisModuleCtx *ctx) { freeClient(ctx->client); } +static CallReply *moduleParseReply(client *c, RedisModuleCtx *ctx) { + /* Convert the result of the Redis command into a module reply. */ + sds proto = sdsnewlen(c->buf,c->bufpos); + c->bufpos = 0; + while(listLength(c->reply)) { + clientReplyBlock *o = listNodeValue(listFirst(c->reply)); + + proto = sdscatlen(proto,o->buf,o->used); + listDelNode(c->reply,listFirst(c->reply)); + } + CallReply *reply = callReplyCreate(proto, c->deferred_reply_errors, ctx); + c->deferred_reply_errors = NULL; /* now the responsibility of the reply object. */ + return reply; +} + +void moduleCallCommandUnblockedHandler(client *c) { + RedisModuleCtx ctx; + RedisModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle; + serverAssert(promise); + RedisModule *module = promise->module; + if (!promise->on_unblocked) { + moduleReleaseTempClient(c); + return; /* module did not set any unblock callback. */ + } + moduleCreateContext(&ctx, module, REDISMODULE_CTX_TEMP_CLIENT); + selectDb(ctx.client, c->db->id); + + CallReply *reply = moduleParseReply(c, &ctx); + module->in_call++; + promise->on_unblocked(&ctx, reply, promise->private_data); + module->in_call--; + + moduleFreeContext(&ctx); + moduleReleaseTempClient(c); +} + /* Create a module ctx and keep track of the nesting level. * * Note: When creating ctx for threads (RM_GetThreadSafeContext and @@ -832,7 +894,7 @@ void moduleCreateContext(RedisModuleCtx *out_ctx, RedisModule *module, int ctx_f * 2. If we are running in a thread (execution_nesting will be dealt with * when locking/unlocking the GIL) */ if (!(ctx_flags & (REDISMODULE_CTX_THREAD_SAFE|REDISMODULE_CTX_COMMAND))) { - server.execution_nesting++; + enterExecutionUnit(1, 0); } } @@ -5632,9 +5694,20 @@ void RM_FreeCallReply(RedisModuleCallReply *reply) { /* This is a wrapper for the recursive free reply function. This is needed * in order to have the first level function to return on nested replies, * but only if called by the module API. */ - RedisModuleCtx *ctx = callReplyGetPrivateData(reply); + + RedisModuleCtx *ctx = NULL; + if(callReplyType(reply) == REDISMODULE_REPLY_PROMISE) { + RedisModuleAsyncRMCallPromise *promise = callReplyGetPrivateData(reply); + ctx = promise->ctx; + freeRedisModuleAsyncRMCallPromise(promise); + } else { + ctx = callReplyGetPrivateData(reply); + } + freeCallReply(reply); - autoMemoryFreed(ctx,REDISMODULE_AM_REPLY,reply); + if (ctx) { + autoMemoryFreed(ctx,REDISMODULE_AM_REPLY,reply); + } } /* Return the reply type as one of the following: @@ -5651,7 +5724,8 @@ void RM_FreeCallReply(RedisModuleCallReply *reply) { * - REDISMODULE_REPLY_DOUBLE * - REDISMODULE_REPLY_BIG_NUMBER * - REDISMODULE_REPLY_VERBATIM_STRING - * - REDISMODULE_REPLY_ATTRIBUTE */ + * - REDISMODULE_REPLY_ATTRIBUTE + * - REDISMODULE_REPLY_PROMISE */ int RM_CallReplyType(RedisModuleCallReply *reply) { return callReplyType(reply); } @@ -5734,6 +5808,39 @@ int RM_CallReplyAttributeElement(RedisModuleCallReply *reply, size_t idx, RedisM return REDISMODULE_ERR; } +/* Set unblock handler (callback and private data) on the given promise RedisModuleCallReply. + * The given reply must be of promise type (REDISMODULE_REPLY_PROMISE). */ +void RM_CallReplyPromiseSetUnblockHandler(RedisModuleCallReply *reply, RedisModuleOnUnblocked on_unblock, void *private_data) { + RedisModuleAsyncRMCallPromise *promise = callReplyGetPrivateData(reply); + promise->on_unblocked = on_unblock; + promise->private_data = private_data; +} + +/* Abort the execution of a given promise RedisModuleCallReply. + * return REDMODULE_OK in case the abort was done successfully and REDISMODULE_ERR + * if its not possible to abort the execution (execution already finished). + * In case the execution was aborted (REDMODULE_OK was returned), the private_data out parameter + * will be set with the value of the private data that was given on 'RM_CallReplyPromiseSetUnblockHandler' + * so the caller will be able to release the private data. + * + * If the execution was aborted successfully, it is promised that the unblock handler will not be called. + * That said, it is possible that the abort operation will successes but the operation will still continue. + * This can happened if, for example, a module implements some blocking command and does not respect the + * disconnect callback. For pure Redis commands this can not happened.*/ +int RM_CallReplyPromiseAbort(RedisModuleCallReply *reply, void **private_data) { + RedisModuleAsyncRMCallPromise *promise = callReplyGetPrivateData(reply); + if (!promise->c) return REDISMODULE_ERR; /* Promise can not be aborted, either already aborted or already finished. */ + if (!(promise->c->flags & CLIENT_BLOCKED)) return REDISMODULE_ERR; /* Client is not blocked anymore, can not abort it. */ + + /* Client is still blocked, remove it from any blocking state and release it. */ + if (private_data) *private_data = promise->private_data; + promise->private_data = NULL; + promise->on_unblocked = NULL; + unblockClient(promise->c, 0); + moduleReleaseTempClient(promise->c); + return REDISMODULE_OK; +} + /* Return the pointer and length of a string or error reply. */ const char *RM_CallReplyStringPtr(RedisModuleCallReply *reply, size_t *len) { size_t private_len; @@ -5781,6 +5888,7 @@ void RM_SetContextUser(RedisModuleCtx *ctx, const RedisModuleUser *user) { * "0" -> REDISMODULE_ARGV_RESP_AUTO * "C" -> REDISMODULE_ARGV_RUN_AS_USER * "M" -> REDISMODULE_ARGV_RESPECT_DENY_OOM + * "K" -> REDISMODULE_ARGV_ALLOW_BLOCK * * On error (format specifier error) NULL is returned and nothing is * allocated. On success the argument vector is returned. */ @@ -5855,6 +5963,8 @@ robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int if (flags) (*flags) |= REDISMODULE_ARGV_CALL_REPLIES_AS_ERRORS; } else if (*p == 'D') { if (flags) (*flags) |= (REDISMODULE_ARGV_DRY_RUN | REDISMODULE_ARGV_CALL_REPLIES_AS_ERRORS); + } else if (*p == 'K') { + if (flags) (*flags) |= REDISMODULE_ARGV_ALLOW_BLOCK; } else { goto fmterr; } @@ -5919,6 +6029,34 @@ fmterr: * If everything succeeded, it will return with a NULL, otherwise it will * return with a CallReply object denoting the error, as if it was called with * the 'E' code. + * * 'K' -- Allow running blocking commands. If enabled and the command gets blocked, a + * special REDISMODULE_REPLY_PROMISE will be returned. This reply type + * indicates that the command was blocked and the reply will be given asynchronously. + * The module can use this reply object to set a handler which will be called when + * the command gets unblocked using RedisModule_CallReplyPromiseSetUnblockHandler. + * The handler must be set immediately after the command invocation (without releasing + * the Redis lock in between). If the handler is not set, the blocking command will + * still continue its execution but the reply will be ignored (fire and forget), + * notice that this is dangerous in case of role change, as explained below. + * The module can use RedisModule_CallReplyPromiseAbort to abort the command invocation + * if it was not yet finished (see RedisModule_CallReplyPromiseAbort documentation for more + * details). It is also the module's responsibility to abort the execution on role change, either by using + * server event (to get notified when the instance becomes a replica) or relying on the disconnect + * callback of the original client. Failing to do so can result in a write operation on a replica. + * Unlike other call replies, promise call reply **must** be freed while the Redis GIL is locked. + * Notice that on unblocking, the only promise is that the unblock handler will be called, + * If the blocking RM_Call caused the module to also block some real client (using RM_BlockClient), + * it is the module responsibility to unblock this client on the unblock handler. + * On the unblock handler it is only allowed to perform the following: + * * Calling additional Redis commands using RM_Call + * * Open keys using RM_OpenKey + * * Replicate data to the replica or AOF + * + * Specifically, it is not allowed to call any Redis module API which are client related such as: + * * RM_Reply* API's + * * RM_BlockClient + * * RM_GetCurrentUserName + * * * **...**: The actual arguments to the Redis command. * * On success a RedisModuleCallReply object is returned, otherwise @@ -5978,8 +6116,10 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch c = moduleAllocTempClient(user); - /* We do not want to allow block, the module do not expect it */ - c->flags |= CLIENT_DENY_BLOCKING; + if (!(flags & REDISMODULE_ARGV_ALLOW_BLOCK)) { + /* We do not want to allow block, the module do not expect it */ + c->flags |= CLIENT_DENY_BLOCKING; + } c->db = ctx->client->db; c->argv = argv; /* We have to assign argv_len, which is equal to argc in that case (RM_Call) @@ -6212,24 +6352,39 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch call(c,call_flags); server.replication_allowed = prev_replication_allowed; - serverAssert((c->flags & CLIENT_BLOCKED) == 0); - - /* Convert the result of the Redis command into a module reply. */ - sds proto = sdsnewlen(c->buf,c->bufpos); - c->bufpos = 0; - while(listLength(c->reply)) { - clientReplyBlock *o = listNodeValue(listFirst(c->reply)); - - proto = sdscatlen(proto,o->buf,o->used); - listDelNode(c->reply,listFirst(c->reply)); + if (c->flags & CLIENT_BLOCKED) { + serverAssert(flags & REDISMODULE_ARGV_ALLOW_BLOCK); + serverAssert(ctx->module); + RedisModuleAsyncRMCallPromise *promise = zmalloc(sizeof(RedisModuleAsyncRMCallPromise)); + *promise = (RedisModuleAsyncRMCallPromise) { + /* We start with ref_count value of 2 because this object is held + * by the promise CallReply and the fake client that was used to execute the command. */ + .ref_count = 2, + .module = ctx->module, + .on_unblocked = NULL, + .private_data = NULL, + .c = c, + .ctx = (ctx->flags & REDISMODULE_CTX_AUTO_MEMORY) ? ctx : NULL, + }; + reply = callReplyCreatePromise(promise); + c->bstate.async_rm_call_handle = promise; + if (!(call_flags & CMD_CALL_PROPAGATE_AOF)) { + /* No need for AOF propagation, set the relevant flags of the client */ + c->flags |= CLIENT_MODULE_PREVENT_AOF_PROP; + } + if (!(call_flags & CMD_CALL_PROPAGATE_REPL)) { + /* No need for replication propagation, set the relevant flags of the client */ + c->flags |= CLIENT_MODULE_PREVENT_REPL_PROP; + } + c = NULL; /* Make sure not to free the client */ + } else { + reply = moduleParseReply(c, (ctx->flags & REDISMODULE_CTX_AUTO_MEMORY) ? ctx : NULL); } - reply = callReplyCreate(proto, c->deferred_reply_errors, ctx); - c->deferred_reply_errors = NULL; /* now the responsibility of the reply object. */ cleanup: if (reply) autoMemoryAdd(ctx,REDISMODULE_AM_REPLY,reply); if (ctx->module) ctx->module->in_call--; - moduleReleaseTempClient(c); + if (c) moduleReleaseTempClient(c); return reply; } @@ -7710,6 +7865,16 @@ RedisModuleBlockedClient *RM_BlockClientOnAuth(RedisModuleCtx *ctx, RedisModuleA return bc; } +/* Get the private data that was previusely set on a blocked client */ +void *RM_BlockClientGetPrivateData(RedisModuleBlockedClient *blocked_client) { + return blocked_client->privdata; +} + +/* Set private data on a blocked client */ +void RM_BlockClientSetPrivateData(RedisModuleBlockedClient *blocked_client, void *private_data) { + blocked_client->privdata = private_data; +} + /* This call is similar to RedisModule_BlockClient(), however in this case we * don't just block the client, but also ask Redis to unblock it automatically * once certain keys become "ready", that is, contain more data. @@ -7959,7 +8124,7 @@ void moduleHandleBlockedClients(void) { * to NULL, because if we reached this point, the client was * properly unblocked by the module. */ bc->disconnect_callback = NULL; - unblockClient(c); + unblockClient(c, 1); /* Put the client in the list of clients that need to write * if there are pending replies here. This is needed since * during a non blocking command the client may receive output. */ @@ -8145,8 +8310,7 @@ void moduleGILAfterLock() { serverAssert(server.execution_nesting == 0); /* Bump up the nesting level to prevent immediate propagation * of possible RM_Call from th thread */ - server.execution_nesting++; - updateCachedTime(0); + enterExecutionUnit(1, 0); } /* Acquire the server lock before executing a thread safe API call. @@ -8184,7 +8348,7 @@ void moduleGILBeforeUnlock() { /* Restore nesting level and propagate pending commands * (because it's unclear when thread safe contexts are * released we have to propagate here). */ - server.execution_nesting--; + exitExecutionUnit(); postExecutionUnitOperations(); } @@ -8294,8 +8458,10 @@ int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNoti void firePostExecutionUnitJobs() { /* Avoid propagation of commands. * In that way, postExecutionUnitOperations will prevent - * recursive calls to firePostExecutionUnitJobs. */ - server.execution_nesting++; + * recursive calls to firePostExecutionUnitJobs. + * This is a special case where we need to increase 'execution_nesting' + * but we do not want to update the cached time */ + enterExecutionUnit(0, 0); while (listLength(modulePostExecUnitJobs) > 0) { listNode *ln = listFirst(modulePostExecUnitJobs); RedisModulePostExecUnitJob *job = listNodeValue(ln); @@ -8311,7 +8477,7 @@ void firePostExecutionUnitJobs() { moduleFreeContext(&ctx); zfree(job); } - server.execution_nesting--; + exitExecutionUnit(); } /* When running inside a key space notification callback, it is dangerous and highly discouraged to perform any write @@ -8374,8 +8540,11 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) * * In order to do that we increment the execution_nesting counter, thus * preventing postExecutionUnitOperations (from within moduleFreeContext) - * from propagating commands from CB. */ - server.execution_nesting++; + * from propagating commands from CB. + * + * This is a special case where we need to increase 'execution_nesting' + * but we do not want to update the cached time */ + enterExecutionUnit(0, 0); listIter li; listNode *ln; @@ -8406,7 +8575,7 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) } } - server.execution_nesting--; + exitExecutionUnit(); } /* Unsubscribe any notification subscribers this module has upon unloading */ @@ -13087,6 +13256,8 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(CallReplySetElement); REGISTER_API(CallReplyMapElement); REGISTER_API(CallReplyAttributeElement); + REGISTER_API(CallReplyPromiseSetUnblockHandler); + REGISTER_API(CallReplyPromiseAbort); REGISTER_API(CallReplyAttribute); REGISTER_API(CallReplyType); REGISTER_API(CallReplyLength); @@ -13201,6 +13372,8 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(GetKeyNameFromDigest); REGISTER_API(GetDbIdFromDigest); REGISTER_API(BlockClient); + REGISTER_API(BlockClientGetPrivateData); + REGISTER_API(BlockClientSetPrivateData); REGISTER_API(BlockClientOnAuth); REGISTER_API(UnblockClient); REGISTER_API(IsBlockedReplyRequest); |