From 2aef66d37cce103565dc294e8d62a589afae19ac Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 29 May 2020 18:34:37 +0200 Subject: Threaded core commands: call freedata methdo + threads count. --- src/module.c | 11 +++++++++-- src/server.c | 2 ++ src/server.h | 1 + 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/module.c b/src/module.c index cf0f598de..eb8028349 100644 --- a/src/module.c +++ b/src/module.c @@ -373,6 +373,7 @@ typedef struct RedisModuleUser { * for things like blocking clients for threaded execution of slow commands. */ RedisModule *coremodule; _Atomic unsigned long CoreModuleBlockedClients = 0; +unsigned long CoreModuleThreadsMax = 50; /* -------------------------------------------------------------------------- * Prototypes @@ -7245,7 +7246,7 @@ void *threadedCoreCommandEnty(void *argptr) { * the client is unblocked, the reply will be concatenated to the * real client. */ tcpd->callback(tcpd->bc->reply_client,tcpd->objv,tcpd->objc); - RM_UnblockClient(tcpd->bc,NULL); + moduleUnblockClientByHandle(tcpd->bc,tcpd); return NULL; } @@ -7297,7 +7298,9 @@ void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, rob /* Try to spawn the thread that will actually execute the command. */ pthread_t tid; - if (pthread_create(&tid,NULL,threadedCoreCommandEnty,tcpd) != 0) { + if (CoreModuleBlockedClients >= CoreModuleThreadsMax || + pthread_create(&tid,NULL,threadedCoreCommandEnty,tcpd) != 0) + { RM_AbortBlock(bc); /* Execute the command synchronously if we can't spawn a thread.. */ callback(c,tcpd->objv,tcpd->objc); @@ -7306,6 +7309,10 @@ void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, rob moduleFreeContext(&ctx); } +unsigned long runningThreadedCommandsCount(void) { + return CoreModuleBlockedClients; +} + /* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */ diff --git a/src/server.c b/src/server.c index d5ddefae7..f41ac3974 100644 --- a/src/server.c +++ b/src/server.c @@ -4068,11 +4068,13 @@ sds genRedisInfoString(const char *section) { "client_recent_max_output_buffer:%zu\r\n" "blocked_clients:%d\r\n" "tracking_clients:%d\r\n" + "blocked_for_core_thread:%lu\r\n" "clients_in_timeout_table:%llu\r\n", listLength(server.clients)-listLength(server.slaves), maxin, maxout, server.blocked_clients, server.tracking_clients, + runningThreadedCommandsCount(), (unsigned long long) raxSize(server.clients_timeout_table)); } diff --git a/src/server.h b/src/server.h index 62ad92cae..16460b209 100644 --- a/src/server.h +++ b/src/server.h @@ -1584,6 +1584,7 @@ void moduleNotifyUserChanged(client *c); /* Modules functionalities exported to core commands. */ typedef void (*coreThreadedCommandCallback)(client *c, robj **objv, int objc); void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, robj **objv, int objc, int freecount); +unsigned long runningThreadedCommandsCount(void); /* Utils */ long long ustime(void); -- cgit v1.2.1