diff options
-rw-r--r-- | src/module.c | 34 |
1 files changed, 19 insertions, 15 deletions
diff --git a/src/module.c b/src/module.c index e8d06baaf..df1fbec1e 100644 --- a/src/module.c +++ b/src/module.c @@ -372,14 +372,11 @@ typedef struct RedisModuleUser { /* The Redis core itself registeres a module in order to use module services * for things like blocking clients for threaded execution of slow commands. */ RedisModule *coremodule; + /* The CoreModuleBlockedClients counter is the number of clients currently - * blocked on a threaded commad. The CoreModuleAbortedClients instead is - * the count of the clients that were about to start a thread but had - * to unblock the blocked client handle ASAP, for instance because there was - * an error creating the thread. We need to take the aborted clients count - * to make sure to execute moduleHandleBlockedClients(). */ + * blocked on a threaded commad: this allows us to limit the maximum number + * of threaded executions at a given time. */ _Atomic unsigned long CoreModuleBlockedClients = 0; -_Atomic unsigned long CoreModuleAbortedClients = 0; unsigned long CoreModuleThreadsMax = 50; /* -------------------------------------------------------------------------- @@ -4582,12 +4579,23 @@ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) { return REDISMODULE_OK; } +/* Low level API to abort a blocked client operation. After we call + * moduleBlockClient(), if later there aren't the condition for really + * going forward with the plan of blocking (for instance the pthread + * API returns a failure creating a thread), we may "undo" the operation + * and avoid any callback to be called, with the exception of the + * free_privdata callback (in case we specify a privdata that is + * not NULL). */ +int moduleAbortBlock(RedisModuleBlockedClient *bc, void *privdata) { + bc->reply_callback = NULL; + bc->disconnect_callback = NULL; + return RM_UnblockClient(bc,privdata); +} + /* Abort a blocked client blocking operation: the client will be unblocked * without firing any callback. */ int RM_AbortBlock(RedisModuleBlockedClient *bc) { - bc->reply_callback = NULL; - bc->disconnect_callback = NULL; - return RM_UnblockClient(bc,NULL); + return moduleAbortBlock(bc,NULL); } /* Set a callback that will be called if a blocked client disconnects @@ -4619,8 +4627,7 @@ void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnec * When this happens the RedisModuleBlockedClient structure in the queue * will have the 'client' field set to NULL. */ void moduleHandleBlockedClients(void) { - if (moduleCount() == 0 && CoreModuleBlockedClients == 0 && - CoreModuleAbortedClients == 0) return; + if (moduleCount() == 0 && CoreModuleBlockedClients == 0) return; listNode *ln; RedisModuleBlockedClient *bc; @@ -4702,7 +4709,6 @@ void moduleHandleBlockedClients(void) { pthread_mutex_lock(&moduleUnblockedClientsMutex); } pthread_mutex_unlock(&moduleUnblockedClientsMutex); - CoreModuleAbortedClients = 0; } /* Called when our client timed out. After this function unblockClient() @@ -7328,11 +7334,9 @@ void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, voi /* Try to spawn the thread that will actually execute the command. */ pthread_t tid; if (pthread_create(&tid,NULL,threadedCoreCommandEnty,tcpd) != 0) { - RM_AbortBlock(bc); - CoreModuleAbortedClients++; /* Execute the command synchronously if we can't spawn a thread.. */ callback(c,tcpd->options); - threadedCoreCommandFreePrivdata(&ctx,tcpd); + moduleAbortBlock(bc,tcpd); } moduleFreeContext(&ctx); } |