summaryrefslogtreecommitdiff
path: root/sql/threadpool_common.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/threadpool_common.cc')
-rw-r--r--sql/threadpool_common.cc66
1 files changed, 54 insertions, 12 deletions
diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc
index c27f42b3d62..1f65a85c233 100644
--- a/sql/threadpool_common.cc
+++ b/sql/threadpool_common.cc
@@ -23,6 +23,8 @@
#include <sql_audit.h>
#include <debug_sync.h>
#include <threadpool.h>
+#include <sql_class.h>
+#include <sql_parse.h>
#ifdef WITH_WSREP
#include "wsrep_trans_observer.h"
@@ -51,7 +53,7 @@ TP_STATISTICS tp_stats;
static void threadpool_remove_connection(THD *thd);
-static int threadpool_process_request(THD *thd);
+static dispatch_command_return threadpool_process_request(THD *thd);
static THD* threadpool_add_connection(CONNECT *connect, TP_connection *c);
extern bool do_command(THD*);
@@ -195,10 +197,30 @@ void tp_callback(TP_connection *c)
}
c->connect= 0;
}
- else if (threadpool_process_request(thd))
+ else
{
- /* QUIT or an error occurred. */
- goto error;
+retry:
+ switch(threadpool_process_request(thd))
+ {
+ case DISPATCH_COMMAND_WOULDBLOCK:
+ if (!thd->async_state.try_suspend())
+ {
+ /*
+ All async operations finished meanwhile, thus nobody is will wake up
+ this THD (this is done at the end of, and therefore do "resume" manually.
+ */
+ thd->async_state.m_state = thd_async_state::enum_async_state::RESUME;
+ goto retry;
+ }
+ worker_context.restore();
+ return;
+ case DISPATCH_COMMAND_ERROR:
+ /* QUIT or an error occurred. */
+ goto error;
+ case DISPATCH_COMMAND_SUCCESS:
+ break;
+ }
+ thd->async_state.m_state= thd_async_state::enum_async_state::NONE;
}
/* Set priority */
@@ -321,10 +343,13 @@ static void handle_wait_timeout(THD *thd)
/**
Process a single client request or a single batch.
*/
-static int threadpool_process_request(THD *thd)
+static dispatch_command_return threadpool_process_request(THD *thd)
{
- int retval= 0;
+ dispatch_command_return retval= DISPATCH_COMMAND_SUCCESS;
+
thread_attach(thd);
+ if(thd->async_state.m_state == thd_async_state::enum_async_state::RESUME)
+ goto resume;
if (thd->killed >= KILL_CONNECTION)
{
@@ -332,7 +357,7 @@ static int threadpool_process_request(THD *thd)
killed flag was set by timeout handler
or KILL command. Return error.
*/
- retval= 1;
+ retval= DISPATCH_COMMAND_ERROR;
if(thd->killed == KILL_WAIT_TIMEOUT)
handle_wait_timeout(thd);
goto end;
@@ -356,12 +381,20 @@ static int threadpool_process_request(THD *thd)
if (mysql_audit_release_required(thd))
mysql_audit_release(thd);
- if ((retval= do_command(thd)) != 0)
- goto end;
+resume:
+ retval= do_command(thd, false);
+ switch(retval)
+ {
+ case DISPATCH_COMMAND_WOULDBLOCK:
+ case DISPATCH_COMMAND_ERROR:
+ goto end;
+ case DISPATCH_COMMAND_SUCCESS:
+ break;
+ }
if (!thd_is_connection_alive(thd))
{
- retval= 1;
+ retval=DISPATCH_COMMAND_ERROR;
goto end;
}
@@ -369,7 +402,7 @@ static int threadpool_process_request(THD *thd)
vio= thd->net.vio;
if (!vio->has_data(vio))
- {
+ {
/* More info on this debug sync is in sql_parse.cc*/
DEBUG_SYNC(thd, "before_do_command_net_read");
goto end;
@@ -505,6 +538,14 @@ static void tp_post_kill_notification(THD *thd)
post_kill_notification(thd);
}
+static void tp_resume(THD* thd)
+{
+ DBUG_ASSERT(thd->async_state.m_state == thd_async_state::enum_async_state::SUSPEND);
+ thd->async_state.m_state = thd_async_state::enum_async_state::RESUME;
+ TP_connection* c = get_TP_connection(thd);
+ pool->resume(c);
+}
+
static scheduler_functions tp_scheduler_functions=
{
0, // max_threads
@@ -515,7 +556,8 @@ static scheduler_functions tp_scheduler_functions=
tp_wait_begin, // thd_wait_begin
tp_wait_end, // thd_wait_end
tp_post_kill_notification, // post kill notification
- tp_end // end
+ tp_end, // end
+ tp_resume
};
void pool_of_threads_scheduler(struct scheduler_functions *func,