summaryrefslogtreecommitdiff
path: root/sql/threadpool_common.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/threadpool_common.cc')
-rw-r--r--sql/threadpool_common.cc67
1 files changed, 55 insertions, 12 deletions
diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc
index 168579d984b..07555ac21ed 100644
--- a/sql/threadpool_common.cc
+++ b/sql/threadpool_common.cc
@@ -23,6 +23,8 @@
#include <sql_audit.h>
#include <debug_sync.h>
#include <threadpool.h>
+#include <sql_class.h>
+#include <sql_parse.h>
#ifdef WITH_WSREP
#include "wsrep_trans_observer.h"
@@ -51,7 +53,7 @@ TP_STATISTICS tp_stats;
static void threadpool_remove_connection(THD *thd);
-static int threadpool_process_request(THD *thd);
+static dispatch_command_return threadpool_process_request(THD *thd);
static THD* threadpool_add_connection(CONNECT *connect, TP_connection *c);
extern bool do_command(THD*);
@@ -195,10 +197,30 @@ void tp_callback(TP_connection *c)
}
c->connect= 0;
}
- else if (threadpool_process_request(thd))
+ else
{
- /* QUIT or an error occurred. */
- goto error;
+retry:
+ switch(threadpool_process_request(thd))
+ {
+ case DISPATCH_COMMAND_WOULDBLOCK:
+ if (!thd->async_state.try_suspend())
+ {
+ /*
+ All async operations finished meanwhile, thus nobody is will wake up
+ this THD. Therefore, we'll resume "manually" here.
+ */
+ thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED;
+ goto retry;
+ }
+ worker_context.restore();
+ return;
+ case DISPATCH_COMMAND_CLOSE_CONNECTION:
+ /* QUIT or an error occurred. */
+ goto error;
+ case DISPATCH_COMMAND_SUCCESS:
+ break;
+ }
+ thd->async_state.m_state= thd_async_state::enum_async_state::NONE;
}
/* Set priority */
@@ -331,10 +353,13 @@ static bool has_unread_data(THD* thd)
/**
Process a single client request or a single batch.
*/
-static int threadpool_process_request(THD *thd)
+static dispatch_command_return threadpool_process_request(THD *thd)
{
- int retval= 0;
+ dispatch_command_return retval= DISPATCH_COMMAND_SUCCESS;
+
thread_attach(thd);
+ if(thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED)
+ goto resume;
if (thd->killed >= KILL_CONNECTION)
{
@@ -342,7 +367,7 @@ static int threadpool_process_request(THD *thd)
killed flag was set by timeout handler
or KILL command. Return error.
*/
- retval= 1;
+ retval= DISPATCH_COMMAND_CLOSE_CONNECTION;
if(thd->killed == KILL_WAIT_TIMEOUT)
handle_wait_timeout(thd);
goto end;
@@ -365,19 +390,27 @@ static int threadpool_process_request(THD *thd)
if (mysql_audit_release_required(thd))
mysql_audit_release(thd);
- if ((retval= do_command(thd)) != 0)
- goto end;
+resume:
+ retval= do_command(thd, false);
+ switch(retval)
+ {
+ case DISPATCH_COMMAND_WOULDBLOCK:
+ case DISPATCH_COMMAND_CLOSE_CONNECTION:
+ goto end;
+ case DISPATCH_COMMAND_SUCCESS:
+ break;
+ }
if (!thd_is_connection_alive(thd))
{
- retval= 1;
+ retval=DISPATCH_COMMAND_CLOSE_CONNECTION;
goto end;
}
set_thd_idle(thd);
if (!has_unread_data(thd))
- {
+ {
/* More info on this debug sync is in sql_parse.cc*/
DEBUG_SYNC(thd, "before_do_command_net_read");
goto end;
@@ -527,6 +560,15 @@ static void tp_post_kill_notification(THD *thd)
post_kill_notification(thd);
}
+/* Resume previously suspended THD */
+static void tp_resume(THD* thd)
+{
+ DBUG_ASSERT(thd->async_state.m_state == thd_async_state::enum_async_state::SUSPENDED);
+ thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED;
+ TP_connection* c = get_TP_connection(thd);
+ pool->resume(c);
+}
+
static scheduler_functions tp_scheduler_functions=
{
0, // max_threads
@@ -537,7 +579,8 @@ static scheduler_functions tp_scheduler_functions=
tp_wait_begin, // thd_wait_begin
tp_wait_end, // thd_wait_end
tp_post_kill_notification, // post kill notification
- tp_end // end
+ tp_end, // end
+ tp_resume
};
void pool_of_threads_scheduler(struct scheduler_functions *func,