From 68bc8c75d24f360383524b97017b35b71963541e Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 5 Jun 2020 13:09:17 +0200 Subject: TCC: fix unblocking on the synchronous case. Moreover the sync mode was optimized to avoid creating a module context, blocking the client in the 'bc' handle and so forth. We already know this is going a synchronous execution (unlike in the case of failing to spawn a thread), so we can just call the callback. --- src/module.c | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/src/module.c b/src/module.c index 0d189495d..e8d06baaf 100644 --- a/src/module.c +++ b/src/module.c @@ -372,7 +372,14 @@ typedef struct RedisModuleUser { /* The Redis core itself registeres a module in order to use module services * for things like blocking clients for threaded execution of slow commands. */ RedisModule *coremodule; +/* The CoreModuleBlockedClients counter is the number of clients currently + * blocked on a threaded commad. The CoreModuleAbortedClients instead is + * the count of the clients that were about to start a thread but had + * to unblock the blocked client handle ASAP, for instance because there was + * an error creating the thread. We need to take the aborted clients count + * to make sure to execute moduleHandleBlockedClients(). */ _Atomic unsigned long CoreModuleBlockedClients = 0; +_Atomic unsigned long CoreModuleAbortedClients = 0; unsigned long CoreModuleThreadsMax = 50; /* -------------------------------------------------------------------------- @@ -4612,7 +4619,8 @@ void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnec * When this happens the RedisModuleBlockedClient structure in the queue * will have the 'client' field set to NULL. */ void moduleHandleBlockedClients(void) { - if (moduleCount() == 0 && CoreModuleBlockedClients == 0) return; + if (moduleCount() == 0 && CoreModuleBlockedClients == 0 && + CoreModuleAbortedClients == 0) return; listNode *ln; RedisModuleBlockedClient *bc; @@ -4694,6 +4702,7 @@ void moduleHandleBlockedClients(void) { pthread_mutex_lock(&moduleUnblockedClientsMutex); } pthread_mutex_unlock(&moduleUnblockedClientsMutex); + CoreModuleAbortedClients = 0; } /* Called when our client timed out. After this function unblockClient() @@ -7270,6 +7279,19 @@ void *threadedCoreCommandEnty(void *argptr) { * The info pointed by 'options' must be allocated by the callback implementing * the threaded half of the command once it's done. */ void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, void *options) { + /* We have a fast path in case we are not in a situation where + * threaded execution is allowed: we can just call the callback + * synchronously and return. A note about server.loading: this is + * in case a threaded command is loaded via the AOF file. */ + int islua = c->flags & CLIENT_LUA; + int ismulti = c->flags & CLIENT_MULTI; + if (islua || ismulti || server.loading || + CoreModuleBlockedClients >= CoreModuleThreadsMax) + { + callback(c,options); + return; + } + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; ctx.module = coremodule; ctx.client = c; @@ -7303,19 +7325,11 @@ void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, voi copy->argv[j] = createStringObject(c->argv[j]->ptr, sdslen(c->argv[j]->ptr)); - /* Try to spawn the thread that will actually execute the command. - * There are many conditions where we perfer to perform a synchronous - * execution of the command. For instance in all the situations we - * can't block such as Lua script, MULTI/EXEC, or when loading the - * AOF file. */ - int islua = c->flags & CLIENT_LUA; - int ismulti = c->flags & CLIENT_MULTI; + /* Try to spawn the thread that will actually execute the command. */ pthread_t tid; - if (islua || ismulti || server.loading || - CoreModuleBlockedClients >= CoreModuleThreadsMax || - pthread_create(&tid,NULL,threadedCoreCommandEnty,tcpd) != 0) - { + if (pthread_create(&tid,NULL,threadedCoreCommandEnty,tcpd) != 0) { RM_AbortBlock(bc); + CoreModuleAbortedClients++; /* Execute the command synchronously if we can't spawn a thread.. */ callback(c,tcpd->options); threadedCoreCommandFreePrivdata(&ctx,tcpd); -- cgit v1.2.1