summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-06-05 13:09:17 +0200
committerantirez <antirez@gmail.com>2020-06-10 10:40:18 +0200
commit68bc8c75d24f360383524b97017b35b71963541e (patch)
treedf49bdb8159e7ee5baa73f139f65cdbfad21e3af
parent3efd8ac42d43114435728513677f4ba72dc04a81 (diff)
downloadredis-68bc8c75d24f360383524b97017b35b71963541e.tar.gz
TCC: fix unblocking on the synchronous case.
Moreover the sync mode was optimized to avoid creating a module context, blocking the client in the 'bc' handle and so forth. We already know this is going a synchronous execution (unlike in the case of failing to spawn a thread), so we can just call the callback.
-rw-r--r--src/module.c38
1 files changed, 26 insertions, 12 deletions
diff --git a/src/module.c b/src/module.c
index 0d189495d..e8d06baaf 100644
--- a/src/module.c
+++ b/src/module.c
@@ -372,7 +372,14 @@ typedef struct RedisModuleUser {
/* The Redis core itself registeres a module in order to use module services
* for things like blocking clients for threaded execution of slow commands. */
RedisModule *coremodule;
+/* The CoreModuleBlockedClients counter is the number of clients currently
+ * blocked on a threaded commad. The CoreModuleAbortedClients instead is
+ * the count of the clients that were about to start a thread but had
+ * to unblock the blocked client handle ASAP, for instance because there was
+ * an error creating the thread. We need to take the aborted clients count
+ * to make sure to execute moduleHandleBlockedClients(). */
_Atomic unsigned long CoreModuleBlockedClients = 0;
+_Atomic unsigned long CoreModuleAbortedClients = 0;
unsigned long CoreModuleThreadsMax = 50;
/* --------------------------------------------------------------------------
@@ -4612,7 +4619,8 @@ void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnec
* When this happens the RedisModuleBlockedClient structure in the queue
* will have the 'client' field set to NULL. */
void moduleHandleBlockedClients(void) {
- if (moduleCount() == 0 && CoreModuleBlockedClients == 0) return;
+ if (moduleCount() == 0 && CoreModuleBlockedClients == 0 &&
+ CoreModuleAbortedClients == 0) return;
listNode *ln;
RedisModuleBlockedClient *bc;
@@ -4694,6 +4702,7 @@ void moduleHandleBlockedClients(void) {
pthread_mutex_lock(&moduleUnblockedClientsMutex);
}
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
+ CoreModuleAbortedClients = 0;
}
/* Called when our client timed out. After this function unblockClient()
@@ -7270,6 +7279,19 @@ void *threadedCoreCommandEnty(void *argptr) {
* The info pointed by 'options' must be allocated by the callback implementing
* the threaded half of the command once it's done. */
void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, void *options) {
+ /* We have a fast path in case we are not in a situation where
+ * threaded execution is allowed: we can just call the callback
+ * synchronously and return. A note about server.loading: this is
+ * in case a threaded command is loaded via the AOF file. */
+ int islua = c->flags & CLIENT_LUA;
+ int ismulti = c->flags & CLIENT_MULTI;
+ if (islua || ismulti || server.loading ||
+ CoreModuleBlockedClients >= CoreModuleThreadsMax)
+ {
+ callback(c,options);
+ return;
+ }
+
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.module = coremodule;
ctx.client = c;
@@ -7303,19 +7325,11 @@ void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, voi
copy->argv[j] = createStringObject(c->argv[j]->ptr,
sdslen(c->argv[j]->ptr));
- /* Try to spawn the thread that will actually execute the command.
- * There are many conditions where we perfer to perform a synchronous
- * execution of the command. For instance in all the situations we
- * can't block such as Lua script, MULTI/EXEC, or when loading the
- * AOF file. */
- int islua = c->flags & CLIENT_LUA;
- int ismulti = c->flags & CLIENT_MULTI;
+ /* Try to spawn the thread that will actually execute the command. */
pthread_t tid;
- if (islua || ismulti || server.loading ||
- CoreModuleBlockedClients >= CoreModuleThreadsMax ||
- pthread_create(&tid,NULL,threadedCoreCommandEnty,tcpd) != 0)
- {
+ if (pthread_create(&tid,NULL,threadedCoreCommandEnty,tcpd) != 0) {
RM_AbortBlock(bc);
+ CoreModuleAbortedClients++;
/* Execute the command synchronously if we can't spawn a thread.. */
callback(c,tcpd->options);
threadedCoreCommandFreePrivdata(&ctx,tcpd);