summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-06-05 17:20:59 +0200
committerantirez <antirez@gmail.com>2020-06-10 10:40:18 +0200
commit9bd8f02fe1ead3b3dca0a7b04706fd131bf5a192 (patch)
treed112c81267bf4608aa0353938b412727c87f30fe
parentc2e591de9ef83aaa059494e44c9ec2cbb4ef0434 (diff)
downloadredis-9bd8f02fe1ead3b3dca0a7b04706fd131bf5a192.tar.gz
TCC: simplify sync execution.
-rw-r--r--src/module.c34
1 files changed, 19 insertions, 15 deletions
diff --git a/src/module.c b/src/module.c
index e8d06baaf..df1fbec1e 100644
--- a/src/module.c
+++ b/src/module.c
@@ -372,14 +372,11 @@ typedef struct RedisModuleUser {
/* The Redis core itself registeres a module in order to use module services
* for things like blocking clients for threaded execution of slow commands. */
RedisModule *coremodule;
+
/* The CoreModuleBlockedClients counter is the number of clients currently
- * blocked on a threaded commad. The CoreModuleAbortedClients instead is
- * the count of the clients that were about to start a thread but had
- * to unblock the blocked client handle ASAP, for instance because there was
- * an error creating the thread. We need to take the aborted clients count
- * to make sure to execute moduleHandleBlockedClients(). */
+ * blocked on a threaded commad: this allows us to limit the maximum number
+ * of threaded executions at a given time. */
_Atomic unsigned long CoreModuleBlockedClients = 0;
-_Atomic unsigned long CoreModuleAbortedClients = 0;
unsigned long CoreModuleThreadsMax = 50;
/* --------------------------------------------------------------------------
@@ -4582,12 +4579,23 @@ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
return REDISMODULE_OK;
}
+/* Low level API to abort a blocked client operation. After we call
+ * moduleBlockClient(), if later there aren't the condition for really
+ * going forward with the plan of blocking (for instance the pthread
+ * API returns a failure creating a thread), we may "undo" the operation
+ * and avoid any callback to be called, with the exception of the
+ * free_privdata callback (in case we specify a privdata that is
+ * not NULL). */
+int moduleAbortBlock(RedisModuleBlockedClient *bc, void *privdata) {
+ bc->reply_callback = NULL;
+ bc->disconnect_callback = NULL;
+ return RM_UnblockClient(bc,privdata);
+}
+
/* Abort a blocked client blocking operation: the client will be unblocked
* without firing any callback. */
int RM_AbortBlock(RedisModuleBlockedClient *bc) {
- bc->reply_callback = NULL;
- bc->disconnect_callback = NULL;
- return RM_UnblockClient(bc,NULL);
+ return moduleAbortBlock(bc,NULL);
}
/* Set a callback that will be called if a blocked client disconnects
@@ -4619,8 +4627,7 @@ void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnec
* When this happens the RedisModuleBlockedClient structure in the queue
* will have the 'client' field set to NULL. */
void moduleHandleBlockedClients(void) {
- if (moduleCount() == 0 && CoreModuleBlockedClients == 0 &&
- CoreModuleAbortedClients == 0) return;
+ if (moduleCount() == 0 && CoreModuleBlockedClients == 0) return;
listNode *ln;
RedisModuleBlockedClient *bc;
@@ -4702,7 +4709,6 @@ void moduleHandleBlockedClients(void) {
pthread_mutex_lock(&moduleUnblockedClientsMutex);
}
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
- CoreModuleAbortedClients = 0;
}
/* Called when our client timed out. After this function unblockClient()
@@ -7328,11 +7334,9 @@ void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, voi
/* Try to spawn the thread that will actually execute the command. */
pthread_t tid;
if (pthread_create(&tid,NULL,threadedCoreCommandEnty,tcpd) != 0) {
- RM_AbortBlock(bc);
- CoreModuleAbortedClients++;
/* Execute the command synchronously if we can't spawn a thread.. */
callback(c,tcpd->options);
- threadedCoreCommandFreePrivdata(&ctx,tcpd);
+ moduleAbortBlock(bc,tcpd);
}
moduleFreeContext(&ctx);
}