summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-05-29 18:34:37 +0200
committerantirez <antirez@gmail.com>2020-06-10 10:40:18 +0200
commit2aef66d37cce103565dc294e8d62a589afae19ac (patch)
treee74c361170a250b21c540373a076b6563bddfadc
parent80efd2e8b5aaea8457fa69a51265529448728514 (diff)
downloadredis-2aef66d37cce103565dc294e8d62a589afae19ac.tar.gz
Threaded core commands: call freedata methdo + threads count.
-rw-r--r--src/module.c11
-rw-r--r--src/server.c2
-rw-r--r--src/server.h1
3 files changed, 12 insertions, 2 deletions
diff --git a/src/module.c b/src/module.c
index cf0f598de..eb8028349 100644
--- a/src/module.c
+++ b/src/module.c
@@ -373,6 +373,7 @@ typedef struct RedisModuleUser {
* for things like blocking clients for threaded execution of slow commands. */
RedisModule *coremodule;
_Atomic unsigned long CoreModuleBlockedClients = 0;
+unsigned long CoreModuleThreadsMax = 50;
/* --------------------------------------------------------------------------
* Prototypes
@@ -7245,7 +7246,7 @@ void *threadedCoreCommandEnty(void *argptr) {
* the client is unblocked, the reply will be concatenated to the
* real client. */
tcpd->callback(tcpd->bc->reply_client,tcpd->objv,tcpd->objc);
- RM_UnblockClient(tcpd->bc,NULL);
+ moduleUnblockClientByHandle(tcpd->bc,tcpd);
return NULL;
}
@@ -7297,7 +7298,9 @@ void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, rob
/* Try to spawn the thread that will actually execute the command. */
pthread_t tid;
- if (pthread_create(&tid,NULL,threadedCoreCommandEnty,tcpd) != 0) {
+ if (CoreModuleBlockedClients >= CoreModuleThreadsMax ||
+ pthread_create(&tid,NULL,threadedCoreCommandEnty,tcpd) != 0)
+ {
RM_AbortBlock(bc);
/* Execute the command synchronously if we can't spawn a thread.. */
callback(c,tcpd->objv,tcpd->objc);
@@ -7306,6 +7309,10 @@ void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, rob
moduleFreeContext(&ctx);
}
+unsigned long runningThreadedCommandsCount(void) {
+ return CoreModuleBlockedClients;
+}
+
/* --------------------------------------------------------------------------
* Modules API internals
* -------------------------------------------------------------------------- */
diff --git a/src/server.c b/src/server.c
index d5ddefae7..f41ac3974 100644
--- a/src/server.c
+++ b/src/server.c
@@ -4068,11 +4068,13 @@ sds genRedisInfoString(const char *section) {
"client_recent_max_output_buffer:%zu\r\n"
"blocked_clients:%d\r\n"
"tracking_clients:%d\r\n"
+ "blocked_for_core_thread:%lu\r\n"
"clients_in_timeout_table:%llu\r\n",
listLength(server.clients)-listLength(server.slaves),
maxin, maxout,
server.blocked_clients,
server.tracking_clients,
+ runningThreadedCommandsCount(),
(unsigned long long) raxSize(server.clients_timeout_table));
}
diff --git a/src/server.h b/src/server.h
index 62ad92cae..16460b209 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1584,6 +1584,7 @@ void moduleNotifyUserChanged(client *c);
/* Modules functionalities exported to core commands. */
typedef void (*coreThreadedCommandCallback)(client *c, robj **objv, int objc);
void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, robj **objv, int objc, int freecount);
+unsigned long runningThreadedCommandsCount(void);
/* Utils */
long long ustime(void);