summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-05-20 17:27:42 +0200
committerantirez <antirez@gmail.com>2020-06-10 10:40:18 +0200
commit66ade8636edd189a1ce50fa50797fb35850b1231 (patch)
treefd687d0103f6902875d261467cc213882fcbad6a
parent2ebcd63d6a011dc46df26d27b860209a28efe727 (diff)
downloadredis-66ade8636edd189a1ce50fa50797fb35850b1231.tar.gz
Threaded core commands: genesis.
-rw-r--r--src/module.c109
-rw-r--r--src/t_string.c5
2 files changed, 111 insertions, 3 deletions
diff --git a/src/module.c b/src/module.c
index e3a338dad..1efade996 100644
--- a/src/module.c
+++ b/src/module.c
@@ -369,6 +369,9 @@ typedef struct RedisModuleUser {
user *user; /* Reference to the real redis user */
} RedisModuleUser;
+/* The Redis core itself registeres a module in order to use module services
+ * for things like blocking clients for threaded execution of slow commands. */
+RedisModule *coremodule;
/* --------------------------------------------------------------------------
* Prototypes
@@ -7198,6 +7201,106 @@ void processModuleLoadingProgressEvent(int is_aof) {
}
/* --------------------------------------------------------------------------
+ * Module support for Redis core operations
+ *
+ * Here we define the interfaces that the core itself can use when
+ * using abstractions implemented by the modules interface, like the
+ * ability to run threaded operations.
+ * -------------------------------------------------------------------------- */
+typedef void (*coreThreadedCommandCallback)(client *c, robj **objv, int objc);
+
+/* This sturcture is used in order to pass state from the main thread to
+ * the thread that will execute the command, and then to the function
+ * that is responsible of freeing the private data. */
+typedef struct {
+ RedisModuleBlockedClient *bc; /* Blocked client handle. */
+ coreThreadedCommandCallback callback; /* Threaded command callback. */
+ robj **objv; /* Vector of Redis objects. */
+ int objc; /* Number of objects. */
+ int freecount; /* How many we need to free. */
+} tcprivdata;
+
+/* Free the private data allocated for a client blocked because of a
+ * threaded Redis core command execution. */
+void threadedCoreCommandFreePrivdata(RedisModuleCtx *ctx, void *privdata) {
+ UNUSED(ctx);
+ tcprivdata *tcpd = privdata;
+ for (int j = 0; j < tcpd->freecount; j++)
+ decrRefCount(tcpd->objv[j]);
+ zfree(tcpd);
+}
+
+/* The entry point of the threaded execution. */
+void *threadedCoreCommandEnty(void *argptr) {
+ tcprivdata *tcpd = argptr;
+ /* We use the blocked command handle reply client as client to pass to the
+ * command implementation: this is a client that is not referenced
+ * at all in the Redis main thread, it just holds a copy of the
+ * original command execution argv/argc pair. This way the command can
+ * use Redis low level API to accumulate the reply there, and later when
+ * the client is unblocked, the reply will be concatenated to the
+ * real client. */
+ tcpd->callback(tcpd->bc->reply_client,tcpd->objv,tcpd->objc);
+ RM_UnblockClient(tcpd->bc,NULL);
+ return NULL;
+}
+
+/* This function is called by the Redis core when we want to execute the
+ * actual implementation of a long running command in a thread. The callback
+ * will have many limits: it cannot touch the keyspace, but can only work
+ * with the passed arguments (that are stored by copying them).
+ *
+ * This strategy works well when the input is small (fast to copy) compared
+ * to the total execution time of the command. In the future the ability to
+ * lock specific keys in the main thread will also be implemented, so that
+ * the threaded part of a command can also operate on key values without
+ * any race condition.
+ *
+ * The objv and objc parameters are used in order to pass objects to the
+ * thread that will execute the command. They must be either copies of
+ * objects so that the thread is the only owner, or Redis should have
+ * some other machanism in order to make sure that there are no race
+ * conditions. At the end of the execution of the command, the first
+ * 'freecount' objects of the objv array will be freed. */
+void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, robj **objv, int objc, int freecount) {
+ RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+ ctx.module = coremodule;
+ ctx.client = c;
+
+ /* We pass the arguments to the thread entry point using an array
+ * of void pointers. */
+ tcprivdata *tcpd = zmalloc(sizeof(*tcpd));
+
+ /* Block the client. */
+ RedisModuleBlockedClient *bc = RM_BlockClient(&ctx,NULL,NULL,threadedCoreCommandFreePrivdata,0);
+ tcpd->bc = bc;
+ tcpd->callback = callback;
+ tcpd->objv = objv;
+ tcpd->objc = objc;
+ tcpd->freecount = freecount;
+ bc->privdata = tcpd;
+
+ /* We need the thread to work on a copy of the client argument
+ * vector. */
+ client *copy = bc->reply_client;
+ copy->argc = c->argc;
+ copy->argv = zmalloc(sizeof(robj*) * c->argc);
+ for (int j = 0; j < c->argc; c++)
+ copy->argv[j] = createStringObject(c->argv[j]->ptr,
+ sdslen(c->argv[j]->ptr));
+
+ /* Try to spawn the thread that will actually execute the command. */
+ pthread_t tid;
+ if (pthread_create(&tid,NULL,threadedCoreCommandEnty,tcpd) != 0) {
+ RM_AbortBlock(bc);
+ /* Execute the command synchronously if we can't spawn a thread.. */
+ callback(c,tcpd->objv,tcpd->objc);
+ threadedCoreCommandFreePrivdata(&ctx,tcpd);
+ }
+ moduleFreeContext(&ctx);
+}
+
+/* --------------------------------------------------------------------------
* Modules API internals
* -------------------------------------------------------------------------- */
@@ -7267,6 +7370,12 @@ void moduleInitModulesSystem(void) {
/* Our thread-safe contexts GIL must start with already locked:
* it is just unlocked when it's safe. */
pthread_mutex_lock(&moduleGIL);
+
+ /* Create the Redis core module. */
+ RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+ RM_SetModuleAttribs(&ctx,"rediscore",1,1);
+ coremodule = ctx.module;
+ moduleFreeContext(&ctx);
}
/* Load all the modules in the server.loadmodule_queue list, which is
diff --git a/src/t_string.c b/src/t_string.c
index 5306069bf..6d27ef6aa 100644
--- a/src/t_string.c
+++ b/src/t_string.c
@@ -494,9 +494,8 @@ void stralgoCommand(client *c) {
}
}
-/* STRALGO <algo> [IDX] [MINMATCHLEN <len>] [WITHMATCHLEN]
- * STRINGS <string> <string> | KEYS <keya> <keyb>
- */
+/* STRALGO LCS [IDX] [MINMATCHLEN <len>] [WITHMATCHLEN]
+ * STRINGS <string> <string> | KEYS <keya> <keyb> */
void stralgoLCS(client *c) {
uint32_t i, j;
long long minmatchlen = 0;