diff options
author | antirez <antirez@gmail.com> | 2020-05-20 17:27:42 +0200 |
---|---|---|
committer | antirez <antirez@gmail.com> | 2020-06-10 10:40:18 +0200 |
commit | 66ade8636edd189a1ce50fa50797fb35850b1231 (patch) | |
tree | fd687d0103f6902875d261467cc213882fcbad6a | |
parent | 2ebcd63d6a011dc46df26d27b860209a28efe727 (diff) | |
download | redis-66ade8636edd189a1ce50fa50797fb35850b1231.tar.gz |
Threaded core commands: genesis.
-rw-r--r-- | src/module.c | 109 | ||||
-rw-r--r-- | src/t_string.c | 5 |
2 files changed, 111 insertions, 3 deletions
diff --git a/src/module.c b/src/module.c index e3a338dad..1efade996 100644 --- a/src/module.c +++ b/src/module.c @@ -369,6 +369,9 @@ typedef struct RedisModuleUser { user *user; /* Reference to the real redis user */ } RedisModuleUser; +/* The Redis core itself registeres a module in order to use module services + * for things like blocking clients for threaded execution of slow commands. */ +RedisModule *coremodule; /* -------------------------------------------------------------------------- * Prototypes @@ -7198,6 +7201,106 @@ void processModuleLoadingProgressEvent(int is_aof) { } /* -------------------------------------------------------------------------- + * Module support for Redis core operations + * + * Here we define the interfaces that the core itself can use when + * using abstractions implemented by the modules interface, like the + * ability to run threaded operations. + * -------------------------------------------------------------------------- */ +typedef void (*coreThreadedCommandCallback)(client *c, robj **objv, int objc); + +/* This sturcture is used in order to pass state from the main thread to + * the thread that will execute the command, and then to the function + * that is responsible of freeing the private data. */ +typedef struct { + RedisModuleBlockedClient *bc; /* Blocked client handle. */ + coreThreadedCommandCallback callback; /* Threaded command callback. */ + robj **objv; /* Vector of Redis objects. */ + int objc; /* Number of objects. */ + int freecount; /* How many we need to free. */ +} tcprivdata; + +/* Free the private data allocated for a client blocked because of a + * threaded Redis core command execution. */ +void threadedCoreCommandFreePrivdata(RedisModuleCtx *ctx, void *privdata) { + UNUSED(ctx); + tcprivdata *tcpd = privdata; + for (int j = 0; j < tcpd->freecount; j++) + decrRefCount(tcpd->objv[j]); + zfree(tcpd); +} + +/* The entry point of the threaded execution. */ +void *threadedCoreCommandEnty(void *argptr) { + tcprivdata *tcpd = argptr; + /* We use the blocked command handle reply client as client to pass to the + * command implementation: this is a client that is not referenced + * at all in the Redis main thread, it just holds a copy of the + * original command execution argv/argc pair. This way the command can + * use Redis low level API to accumulate the reply there, and later when + * the client is unblocked, the reply will be concatenated to the + * real client. */ + tcpd->callback(tcpd->bc->reply_client,tcpd->objv,tcpd->objc); + RM_UnblockClient(tcpd->bc,NULL); + return NULL; +} + +/* This function is called by the Redis core when we want to execute the + * actual implementation of a long running command in a thread. The callback + * will have many limits: it cannot touch the keyspace, but can only work + * with the passed arguments (that are stored by copying them). + * + * This strategy works well when the input is small (fast to copy) compared + * to the total execution time of the command. In the future the ability to + * lock specific keys in the main thread will also be implemented, so that + * the threaded part of a command can also operate on key values without + * any race condition. + * + * The objv and objc parameters are used in order to pass objects to the + * thread that will execute the command. They must be either copies of + * objects so that the thread is the only owner, or Redis should have + * some other machanism in order to make sure that there are no race + * conditions. At the end of the execution of the command, the first + * 'freecount' objects of the objv array will be freed. */ +void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, robj **objv, int objc, int freecount) { + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + ctx.module = coremodule; + ctx.client = c; + + /* We pass the arguments to the thread entry point using an array + * of void pointers. */ + tcprivdata *tcpd = zmalloc(sizeof(*tcpd)); + + /* Block the client. */ + RedisModuleBlockedClient *bc = RM_BlockClient(&ctx,NULL,NULL,threadedCoreCommandFreePrivdata,0); + tcpd->bc = bc; + tcpd->callback = callback; + tcpd->objv = objv; + tcpd->objc = objc; + tcpd->freecount = freecount; + bc->privdata = tcpd; + + /* We need the thread to work on a copy of the client argument + * vector. */ + client *copy = bc->reply_client; + copy->argc = c->argc; + copy->argv = zmalloc(sizeof(robj*) * c->argc); + for (int j = 0; j < c->argc; c++) + copy->argv[j] = createStringObject(c->argv[j]->ptr, + sdslen(c->argv[j]->ptr)); + + /* Try to spawn the thread that will actually execute the command. */ + pthread_t tid; + if (pthread_create(&tid,NULL,threadedCoreCommandEnty,tcpd) != 0) { + RM_AbortBlock(bc); + /* Execute the command synchronously if we can't spawn a thread.. */ + callback(c,tcpd->objv,tcpd->objc); + threadedCoreCommandFreePrivdata(&ctx,tcpd); + } + moduleFreeContext(&ctx); +} + +/* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */ @@ -7267,6 +7370,12 @@ void moduleInitModulesSystem(void) { /* Our thread-safe contexts GIL must start with already locked: * it is just unlocked when it's safe. */ pthread_mutex_lock(&moduleGIL); + + /* Create the Redis core module. */ + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + RM_SetModuleAttribs(&ctx,"rediscore",1,1); + coremodule = ctx.module; + moduleFreeContext(&ctx); } /* Load all the modules in the server.loadmodule_queue list, which is diff --git a/src/t_string.c b/src/t_string.c index 5306069bf..6d27ef6aa 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -494,9 +494,8 @@ void stralgoCommand(client *c) { } } -/* STRALGO <algo> [IDX] [MINMATCHLEN <len>] [WITHMATCHLEN] - * STRINGS <string> <string> | KEYS <keya> <keyb> - */ +/* STRALGO LCS [IDX] [MINMATCHLEN <len>] [WITHMATCHLEN] + * STRINGS <string> <string> | KEYS <keya> <keyb> */ void stralgoLCS(client *c) { uint32_t i, j; long long minmatchlen = 0; |