summaryrefslogtreecommitdiff
path: root/sql/scheduler.cc
diff options
context:
space:
mode:
authorMichael Widenius <monty@askmonty.org>2009-03-13 00:27:35 +0200
committerMichael Widenius <monty@askmonty.org>2009-03-13 00:27:35 +0200
commit4fe342500953e9cf337aae462fb512a7cec176f8 (patch)
treeaf9359248fb5e1174bbd409aa9444c34378bc52d /sql/scheduler.cc
parent44e6c2f253a26251613529e144545f194e34d9df (diff)
downloadmariadb-git-4fe342500953e9cf337aae462fb512a7cec176f8.tar.gz
Added "pool-of-threads" handling (with libevent)
This is a backport of code from MySQL 6.0 with cleanups and extensions The following new options are supported configure options: --with-libevent ; Enable use of libevent, which is needed for pool of threads mysqld options: --thread-handling=pool-of-threads ; Use a pool of threads to handle queries --thread-pool-size=# ; Define how many threads should be created to handle all queries --extra-port=# ; Extra tcp port that uses the old one-thread-per-connection method --extra-max-connections=# ; Number of connections to accept to 'extra-port' --test-ignore-wrong-options ; Ignore setting an enum value to a wrong option (for mysql-test-run) BUILD/SETUP.sh: Added libevents (and thus pool-of-threads) to max builds CMakeLists.txt: Added libevent Makefile.am: Added libevents config/ac-macros/libevent.m4: Libevent code for configure config/ac-macros/libevent_configure.m4: Libevent code for configure configure.in: Added libevents dbug/dbug.c: Added _db_is_pushed(); Needed for pool-of-threads code extra/Makefile.am: Added libevents extra/libevent: Libevent initial code extra/libevent/CMakeLists.txt: Libevent initial code extra/libevent/Makefile.am: Libevent initial code extra/libevent/README: Libevent initial code extra/libevent/WIN32-Code: Libevent initial code extra/libevent/WIN32-Code/config.h: Libevent initial code extra/libevent/WIN32-Code/misc.c: Libevent initial code extra/libevent/WIN32-Code/misc.h: Libevent initial code extra/libevent/WIN32-Code/tree.h: Libevent initial code extra/libevent/WIN32-Code/win32.c: Libevent initial code extra/libevent/buffer.c: Libevent initial code extra/libevent/compat: Libevent initial code extra/libevent/compat/sys: Libevent initial code extra/libevent/compat/sys/_time.h: Libevent initial code extra/libevent/compat/sys/queue.h: Libevent initial code extra/libevent/compat/sys/tree.h: Libevent initial code extra/libevent/devpoll.c: Libevent initial code extra/libevent/epoll.c: Libevent initial code extra/libevent/epoll_sub.c: Libevent initial code extra/libevent/evbuffer.c: Libevent initial code extra/libevent/evdns.c: Libevent initial code extra/libevent/evdns.h: Libevent initial code extra/libevent/event-config.h: Libevent initial code extra/libevent/event-internal.h: Libevent initial code extra/libevent/event.c: Libevent initial code extra/libevent/event.h: Libevent initial code extra/libevent/event_tagging.c: Libevent initial code extra/libevent/evhttp.h: Libevent initial code extra/libevent/evport.c: Libevent initial code extra/libevent/evrpc-internal.h: Libevent initial code extra/libevent/evrpc.c: Libevent initial code extra/libevent/evrpc.h: Libevent initial code extra/libevent/evsignal.h: Libevent initial code extra/libevent/evutil.c: Libevent initial code extra/libevent/evutil.h: Libevent initial code extra/libevent/http-internal.h: Libevent initial code extra/libevent/http.c: Libevent initial code extra/libevent/kqueue.c: Libevent initial code extra/libevent/log.c: Libevent initial code extra/libevent/log.h: Libevent initial code extra/libevent/min_heap.h: Libevent initial code extra/libevent/poll.c: Libevent initial code extra/libevent/select.c: Libevent initial code extra/libevent/signal.c: Libevent initial code extra/libevent/strlcpy-internal.h: Libevent initial code extra/libevent/strlcpy.c: Libevent initial code include/config-win.h: Libevent support include/my_dbug.h: ADded _db_is_pushed include/mysql.h.pp: Update to handle new prototypes include/typelib.h: Split find_type_or_exit() into two functions include/violite.h: Added vio_is_pending() libmysqld/Makefile.am: Added libevent mysql-test/include/have_pool_of_threads.inc: Added test for pool-of-threads mysql-test/mysql-test-run.pl: Don't abort based on time and don't retry test cases when run under --gdb or --debug mysql-test/r/crash_commit_before.result: USE GLOBAL for debug variable mysql-test/r/have_pool_of_threads.require: Added test for pool-of-threads mysql-test/r/pool_of_threads.result: Added test for pool-of-threads mysql-test/r/subselect_debug.result: USE GLOBAL for debug variable mysql-test/t/crash_commit_before.test: USE GLOBAL for debug variable mysql-test/t/merge-big.test: USE GLOBAL for debug variable mysql-test/t/pool_of_threads-master.opt: Added test for pool-of-threads mysql-test/t/pool_of_threads.test: Added test for pool-of-threads mysys/typelib.c: Split find_type_or_exit() into find_type_with_warning() sql/Makefile.am: Added libevent sql/handler.cc: Indentation fix. Fixed memory loss bug Fixed crash on exit when handler plugin failed sql/mysql_priv.h: Added extra_max_connections and mysqld_extra_port Added extern functions from sql_connect.cc sql/mysqld.cc: Added support for new mysqld options Added code for 'extra-port' and 'extra-max-connections' Split some functions into smaller pieces to be able to reuse code Added code for test-ignore-wrong-options sql/scheduler.cc: Updated schduler code from MySQL 6.0 sql/scheduler.h: Updated schduler code from MySQL 6.0 sql/set_var.cc: Added support for changing "extra_max_connections" sql/sql_class.cc: Iniitalize thread schduler options in THD sql/sql_class.h: Added to extra_port and scheduler to 'THD' sql/sql_connect.cc: Use thd->schduler to check number of connections and terminate connection Made some local functions global (for scheduler.cc) vio/viosocket.c: Added 'vio_pending', needed for scheduler..c
Diffstat (limited to 'sql/scheduler.cc')
-rw-r--r--sql/scheduler.cc678
1 files changed, 674 insertions, 4 deletions
diff --git a/sql/scheduler.cc b/sql/scheduler.cc
index b05bdf4756f..be068859a6f 100644
--- a/sql/scheduler.cc
+++ b/sql/scheduler.cc
@@ -22,6 +22,9 @@
#endif
#include <mysql_priv.h>
+#if MYSQL_VERSION_ID >= 60000
+#include "sql_audit.h"
+#endif
/*
'Dummy' functions to be used when we don't need any handling for a scheduler
@@ -29,7 +32,7 @@
*/
static bool init_dummy(void) {return 0;}
-static void post_kill_dummy(THD* thd) {}
+static void post_kill_dummy(THD *thd) {}
static void end_dummy(void) {}
static bool end_thread_dummy(THD *thd, bool cache_thread) { return 0; }
@@ -63,9 +66,12 @@ static bool no_threads_end(THD *thd, bool put_in_cache)
Initailize scheduler for --thread-handling=no-threads
*/
-void one_thread_scheduler(scheduler_functions* func)
+void one_thread_scheduler(scheduler_functions *func)
{
func->max_threads= 1;
+ max_connections= 1;
+ func->max_connections= &max_connections;
+ func->connection_count= &connection_count;
#ifndef EMBEDDED_LIBRARY
func->add_connection= handle_connection_in_main_thread;
#endif
@@ -79,10 +85,674 @@ void one_thread_scheduler(scheduler_functions* func)
*/
#ifndef EMBEDDED_LIBRARY
-void one_thread_per_connection_scheduler(scheduler_functions* func)
+void one_thread_per_connection_scheduler(scheduler_functions *func,
+ ulong *arg_max_connections,
+ uint *arg_connection_count)
{
- func->max_threads= max_connections;
+ func->max_threads= *arg_max_connections + 1;
+ func->max_connections= arg_max_connections;
+ func->connection_count= arg_connection_count;
func->add_connection= create_thread_to_handle_connection;
func->end_thread= one_thread_per_connection_end;
}
#endif /* EMBEDDED_LIBRARY */
+
+
+#if defined(HAVE_LIBEVENT) && HAVE_POOL_OF_THREADS == 1
+
+#include "event.h"
+
+static struct event_base *base;
+
+static uint created_threads, killed_threads;
+static bool kill_pool_threads;
+
+static struct event thd_add_event;
+static struct event thd_kill_event;
+
+static pthread_mutex_t LOCK_thd_add; /* protects thds_need_adding */
+static LIST *thds_need_adding; /* list of thds to add to libevent queue */
+
+static int thd_add_pair[2]; /* pipe to signal add a connection to libevent*/
+static int thd_kill_pair[2]; /* pipe to signal kill a connection in libevent */
+
+/*
+ LOCK_event_loop protects the non-thread safe libevent calls (event_add and
+ event_del) and thds_need_processing and thds_waiting_for_io.
+*/
+static pthread_mutex_t LOCK_event_loop;
+static LIST *thds_need_processing; /* list of thds that needs some processing */
+static LIST *thds_waiting_for_io; /* list of thds with added events */
+
+pthread_handler_t libevent_thread_proc(void *arg);
+static void libevent_end();
+static bool libevent_needs_immediate_processing(THD *thd);
+static void libevent_connection_close(THD *thd);
+static bool libevent_should_close_connection(THD* thd);
+static void libevent_thd_add(THD* thd);
+void libevent_io_callback(int Fd, short Operation, void *ctx);
+void libevent_add_thd_callback(int Fd, short Operation, void *ctx);
+void libevent_kill_thd_callback(int Fd, short Operation, void *ctx);
+
+
+/*
+ Create a pipe and set to non-blocking.
+ Returns TRUE if there is an error.
+*/
+
+static bool init_socketpair(int sock_pair[])
+{
+ sock_pair[0]= sock_pair[1]= -1;
+ return (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, sock_pair) < 0 ||
+ evutil_make_socket_nonblocking(sock_pair[0]) == -1 ||
+ evutil_make_socket_nonblocking(sock_pair[1]) == -1);
+}
+
+static void close_socketpair(int sock_pair[])
+{
+ if (sock_pair[0] != -1)
+ EVUTIL_CLOSESOCKET(sock_pair[0]);
+ if (sock_pair[1] != -1)
+ EVUTIL_CLOSESOCKET(sock_pair[1]);
+}
+
+/*
+ thd_scheduler keeps the link between THD and events.
+ It's embedded in the THD class.
+*/
+
+thd_scheduler::thd_scheduler()
+ : logged_in(FALSE), io_event(NULL), thread_attached(FALSE)
+{
+#ifndef DBUG_OFF
+ dbug_explain[0]= '\0';
+ set_explain= FALSE;
+#endif
+}
+
+
+thd_scheduler::~thd_scheduler()
+{
+ my_free(io_event, MYF(MY_ALLOW_ZERO_PTR));
+}
+
+
+bool thd_scheduler::init(THD *parent_thd)
+{
+ io_event=
+ (struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
+
+ if (!io_event)
+ {
+ sql_print_error("Memory allocation error in thd_scheduler::init\n");
+ return TRUE;
+ }
+
+ event_set(io_event, (int)parent_thd->net.vio->sd, EV_READ,
+ libevent_io_callback, (void*)parent_thd);
+
+ list.data= parent_thd;
+
+ return FALSE;
+}
+
+
+/*
+ Attach/associate the connection with the OS thread, for command processing.
+*/
+
+bool thd_scheduler::thread_attach()
+{
+ DBUG_ASSERT(!thread_attached);
+ THD* thd = (THD*)list.data;
+ if (libevent_should_close_connection(thd) ||
+ setup_connection_thread_globals(thd))
+ {
+ return TRUE;
+ }
+ my_errno= 0;
+ thd->mysys_var->abort= 0;
+ thread_attached= TRUE;
+#ifndef DBUG_OFF
+ /*
+ When we attach the thread for a connection for the first time,
+ we know that there is no session value set yet. Thus
+ the initial setting of set_explain to FALSE is OK.
+ */
+ if (set_explain)
+ DBUG_SET(dbug_explain);
+#endif
+ return FALSE;
+}
+
+
+/*
+ Detach/disassociate the connection with the OS thread.
+*/
+
+void thd_scheduler::thread_detach()
+{
+ if (thread_attached)
+ {
+ THD* thd = (THD*)list.data;
+ pthread_mutex_lock(&thd->LOCK_delete);
+ thd->mysys_var= NULL;
+ pthread_mutex_unlock(&thd->LOCK_delete);
+ thread_attached= FALSE;
+#ifndef DBUG_OFF
+ /*
+ If during the session @@session.dbug was assigned, the
+ dbug options/state has been pushed. Check if this is the
+ case, to be able to restore the state when we attach this
+ logical connection to a physical thread.
+ */
+ if (_db_is_pushed_())
+ {
+ set_explain= TRUE;
+ if (DBUG_EXPLAIN(dbug_explain, sizeof(dbug_explain)))
+ sql_print_error("thd_scheduler: DBUG_EXPLAIN buffer is too small");
+ }
+ /* DBUG_POP() is a no-op in case there is no session state */
+ DBUG_POP();
+#endif
+ }
+}
+
+/**
+ Create all threads for the thread pool
+
+ NOTES
+ After threads are created we wait until all threads has signaled that
+ they have started before we return
+
+ RETURN
+ 0 ok
+ 1 We got an error creating the thread pool
+ In this case we will abort all created threads
+*/
+
+static bool libevent_init(void)
+{
+ uint i;
+ DBUG_ENTER("libevent_init");
+
+ base= (struct event_base *) event_init();
+
+ created_threads= 0;
+ killed_threads= 0;
+ kill_pool_threads= FALSE;
+
+ pthread_mutex_init(&LOCK_event_loop, NULL);
+ pthread_mutex_init(&LOCK_thd_add, NULL);
+
+ /* set up sockets used to add new thds to the event pool */
+ if (init_socketpair(thd_add_pair))
+ {
+ sql_print_error("init_socketpair(thd_add_spair) error in libevent_init");
+ DBUG_RETURN(1);
+ }
+ /* set up sockets used to kill thds in the event queue */
+ if (init_socketpair(thd_kill_pair))
+ {
+ sql_print_error("init_socketpair(thd_kill_pair) error in libevent_init");
+ close_socketpair(thd_add_pair);
+ DBUG_RETURN(1);
+ }
+ event_set(&thd_add_event, thd_add_pair[0], EV_READ|EV_PERSIST,
+ libevent_add_thd_callback, NULL);
+ event_set(&thd_kill_event, thd_kill_pair[0], EV_READ|EV_PERSIST,
+ libevent_kill_thd_callback, NULL);
+
+ if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
+ {
+ sql_print_error("thd_add_event event_add error in libevent_init");
+ libevent_end();
+ DBUG_RETURN(1);
+ }
+ /* Set up the thread pool */
+ created_threads= killed_threads= 0;
+ pthread_mutex_lock(&LOCK_thread_count);
+
+ for (i= 0; i < thread_pool_size; i++)
+ {
+ pthread_t thread;
+ int error;
+ if ((error= pthread_create(&thread, &connection_attrib,
+ libevent_thread_proc, 0)))
+ {
+ sql_print_error("Can't create completion port thread (error %d)",
+ error);
+ pthread_mutex_unlock(&LOCK_thread_count);
+ libevent_end(); // Cleanup
+ DBUG_RETURN(TRUE);
+ }
+ }
+
+ /* Wait until all threads are created */
+ while (created_threads != thread_pool_size)
+ pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
+ pthread_mutex_unlock(&LOCK_thread_count);
+
+ DBUG_PRINT("info", ("%u threads created", (uint) thread_pool_size));
+ DBUG_RETURN(FALSE);
+}
+
+
+/*
+ This is called when data is ready on the socket.
+
+ NOTES
+ This is only called by the thread that owns LOCK_event_loop.
+
+ We add the thd that got the data to thds_need_processing, and
+ cause the libevent event_loop() to terminate. Then this same thread will
+ return from event_loop and pick the thd value back up for processing.
+*/
+
+void libevent_io_callback(int, short, void *ctx)
+{
+ safe_mutex_assert_owner(&LOCK_event_loop);
+ THD *thd= (THD*)ctx;
+ thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->event_scheduler.list);
+ thds_need_processing= list_add(thds_need_processing, &thd->event_scheduler.list);
+}
+
+/*
+ This is called when we have a thread we want to be killed.
+
+ NOTES
+ This is only called by the thread that owns LOCK_event_loop.
+*/
+
+void libevent_kill_thd_callback(int Fd, short, void*)
+{
+ safe_mutex_assert_owner(&LOCK_event_loop);
+
+ /* clear the pending events */
+ char c;
+ while (recv(Fd, &c, sizeof(c), 0) == sizeof(c))
+ {}
+
+ LIST* list= thds_waiting_for_io;
+ while (list)
+ {
+ THD *thd= (THD*)list->data;
+ list= list_rest(list);
+ if (thd->killed == THD::KILL_CONNECTION)
+ {
+ /*
+ Delete from libevent and add to the processing queue.
+ */
+ event_del(thd->event_scheduler.io_event);
+ thds_waiting_for_io= list_delete(thds_waiting_for_io,
+ &thd->event_scheduler.list);
+ thds_need_processing= list_add(thds_need_processing,
+ &thd->event_scheduler.list);
+ }
+ }
+}
+
+
+/*
+ This is used to add connections to the pool. This callback is invoked from
+ the libevent event_loop() call whenever the thd_add_pair[1] has a byte
+ written to it.
+
+ NOTES
+ This is only called by the thread that owns LOCK_event_loop.
+*/
+
+void libevent_add_thd_callback(int Fd, short, void *)
+{
+ safe_mutex_assert_owner(&LOCK_event_loop);
+
+ /* clear the pending events */
+ char c;
+ while (recv(Fd, &c, sizeof(c), 0) == sizeof(c))
+ {}
+
+ pthread_mutex_lock(&LOCK_thd_add);
+ while (thds_need_adding)
+ {
+ /* pop the first thd off the list */
+ THD* thd= (THD*)thds_need_adding->data;
+ thds_need_adding= list_delete(thds_need_adding, thds_need_adding);
+
+ pthread_mutex_unlock(&LOCK_thd_add);
+
+ if (!thd->event_scheduler.logged_in || libevent_should_close_connection(thd))
+ {
+ /*
+ Add thd to thds_need_processing list. If it needs closing we'll close
+ it outside of event_loop().
+ */
+ thds_need_processing= list_add(thds_need_processing,
+ &thd->event_scheduler.list);
+ }
+ else
+ {
+ /* Add to libevent */
+ if (event_add(thd->event_scheduler.io_event, NULL))
+ {
+ sql_print_error("event_add error in libevent_add_thd_callback");
+ libevent_connection_close(thd);
+ }
+ else
+ {
+ thds_waiting_for_io= list_add(thds_waiting_for_io,
+ &thd->event_scheduler.list);
+ }
+ }
+ pthread_mutex_lock(&LOCK_thd_add);
+ }
+ pthread_mutex_unlock(&LOCK_thd_add);
+}
+
+
+/**
+ Notify the thread pool about a new connection
+
+ NOTES
+ LOCK_thread_count is locked on entry. This function MUST unlock it!
+*/
+
+static void libevent_add_connection(THD *thd)
+{
+ DBUG_ENTER("libevent_add_connection");
+ DBUG_PRINT("enter", ("thd: %p thread_id: %lu",
+ thd, thd->thread_id));
+
+ if (thd->event_scheduler.init(thd))
+ {
+ sql_print_error("Scheduler init error in libevent_add_new_connection");
+ pthread_mutex_unlock(&LOCK_thread_count);
+ libevent_connection_close(thd);
+ DBUG_VOID_RETURN;
+ }
+ threads.append(thd);
+ libevent_thd_add(thd);
+
+ pthread_mutex_unlock(&LOCK_thread_count);
+ DBUG_VOID_RETURN;
+}
+
+
+/**
+ @brief Signal a waiting connection it's time to die.
+
+ @details This function will signal libevent the THD should be killed.
+ Either the global LOCK_thd_count or the THD's LOCK_delete must be locked
+ upon entry.
+
+ @param[in] thd The connection to kill
+*/
+
+static void libevent_post_kill_notification(THD *)
+{
+ /*
+ Note, we just wake up libevent with an event that a THD should be killed,
+ It will search its list of thds for thd->killed == KILL_CONNECTION to
+ find the THDs it should kill.
+
+ So we don't actually tell it which one and we don't actually use the
+ THD being passed to us, but that's just a design detail that could change
+ later.
+ */
+ char c= 0;
+ send(thd_kill_pair[1], &c, sizeof(c), 0);
+}
+
+
+/*
+ Close and delete a connection.
+*/
+
+static void libevent_connection_close(THD *thd)
+{
+ DBUG_ENTER("libevent_connection_close");
+ DBUG_PRINT("enter", ("thd: %p", thd));
+
+ thd->killed= THD::KILL_CONNECTION; // Avoid error messages
+
+ if (thd->net.vio->sd >= 0) // not already closed
+ {
+ end_connection(thd);
+ close_connection(thd, 0, 1);
+ }
+ thd->event_scheduler.thread_detach();
+ unlink_thd(thd); /* locks LOCK_thread_count and deletes thd */
+ pthread_mutex_unlock(&LOCK_thread_count);
+
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Returns true if we should close and delete a THD connection.
+*/
+
+static bool libevent_should_close_connection(THD* thd)
+{
+ return thd->net.error ||
+ thd->net.vio == 0 ||
+ thd->killed == THD::KILL_CONNECTION;
+}
+
+
+/*
+ libevent_thread_proc is the outer loop of each thread in the thread pool.
+ These procs only return/terminate on shutdown (kill_pool_threads == true).
+*/
+
+pthread_handler_t libevent_thread_proc(void *arg)
+{
+ if (init_new_connection_handler_thread())
+ {
+ my_thread_global_end();
+ sql_print_error("libevent_thread_proc: my_thread_init() failed");
+ exit(1);
+ }
+ DBUG_ENTER("libevent_thread_proc");
+
+ /*
+ Signal libevent_init() when all threads has been created and are ready to
+ receive events.
+ */
+ (void) pthread_mutex_lock(&LOCK_thread_count);
+ created_threads++;
+ thread_created++;
+ if (created_threads == thread_pool_size)
+ (void) pthread_cond_signal(&COND_thread_count);
+ (void) pthread_mutex_unlock(&LOCK_thread_count);
+
+ for (;;)
+ {
+ THD *thd= NULL;
+ (void) pthread_mutex_lock(&LOCK_event_loop);
+
+ /* get thd(s) to process */
+ while (!thds_need_processing)
+ {
+ if (kill_pool_threads)
+ {
+ /* the flag that we should die has been set */
+ (void) pthread_mutex_unlock(&LOCK_event_loop);
+ goto thread_exit;
+ }
+ event_loop(EVLOOP_ONCE);
+ }
+
+ /* pop the first thd off the list */
+ thd= (THD*)thds_need_processing->data;
+ thds_need_processing= list_delete(thds_need_processing,
+ thds_need_processing);
+
+ (void) pthread_mutex_unlock(&LOCK_event_loop);
+
+ /* now we process the connection (thd) */
+
+ /* set up the thd<->thread links. */
+ thd->thread_stack= (char*) &thd;
+
+ if (thd->event_scheduler.thread_attach())
+ {
+ libevent_connection_close(thd);
+ continue;
+ }
+
+ /* is the connection logged in yet? */
+ if (!thd->event_scheduler.logged_in)
+ {
+ DBUG_PRINT("info", ("init new connection. sd: %d",
+ thd->net.vio->sd));
+ lex_start(thd);
+ if (login_connection(thd))
+ {
+ /* Failed to log in */
+ libevent_connection_close(thd);
+ continue;
+ }
+ else
+ {
+ /* login successful */
+#if MYSQL_VERSION_ID >= 60000
+ MYSQL_CONNECTION_START(thd->thread_id, thd->security_ctx->priv_user,
+ (char *) thd->security_ctx->host_or_ip);
+#endif
+ thd->event_scheduler.logged_in= TRUE;
+ prepare_new_connection_state(thd);
+ if (!libevent_needs_immediate_processing(thd))
+ continue; /* New connection is now waiting for data in libevent*/
+ }
+ }
+
+ do
+ {
+ /* Process a query */
+ if (do_command(thd))
+ {
+ libevent_connection_close(thd);
+ break;
+ }
+ } while (libevent_needs_immediate_processing(thd));
+ }
+
+thread_exit:
+ DBUG_PRINT("exit", ("ending thread"));
+ (void) pthread_mutex_lock(&LOCK_thread_count);
+ killed_threads++;
+ pthread_cond_broadcast(&COND_thread_count);
+ (void) pthread_mutex_unlock(&LOCK_thread_count);
+ my_thread_end();
+ pthread_exit(0);
+ DBUG_RETURN(0); /* purify: deadcode */
+}
+
+
+/*
+ Returns TRUE if the connection needs immediate processing and FALSE if
+ instead it's queued for libevent processing or closed,
+*/
+
+static bool libevent_needs_immediate_processing(THD *thd)
+{
+ if (libevent_should_close_connection(thd))
+ {
+ libevent_connection_close(thd);
+ return FALSE;
+ }
+ /*
+ If more data in the socket buffer, return TRUE to process another command.
+
+ Note: we cannot add for event processing because the whole request might
+ already be buffered and we wouldn't receive an event.
+ */
+ if (vio_pending(thd->net.vio) > 0)
+ return TRUE;
+
+ thd->event_scheduler.thread_detach();
+ libevent_thd_add(thd);
+ return FALSE;
+}
+
+
+/*
+ Adds a THD to queued for libevent processing.
+
+ This call does not actually register the event with libevent.
+ Instead, it places the THD onto a queue and signals libevent by writing
+ a byte into thd_add_pair, which will cause our libevent_add_thd_callback to
+ be invoked which will find the THD on the queue and add it to libevent.
+*/
+
+static void libevent_thd_add(THD* thd)
+{
+ char c= 0;
+ /* release any audit resources, this thd is going to sleep */
+#if MYSQL_VERSION_ID >= 60000
+ mysql_audit_release(thd);
+#endif
+ pthread_mutex_lock(&LOCK_thd_add);
+ /* queue for libevent */
+ thds_need_adding= list_add(thds_need_adding, &thd->event_scheduler.list);
+ /* notify libevent */
+ send(thd_add_pair[1], &c, sizeof(c), 0);
+ pthread_mutex_unlock(&LOCK_thd_add);
+}
+
+
+/**
+ Wait until all pool threads have been deleted for clean shutdown
+*/
+
+static void libevent_end()
+{
+ DBUG_ENTER("libevent_end");
+ DBUG_PRINT("enter", ("created_threads: %d killed_threads: %u",
+ created_threads, killed_threads));
+
+ /*
+ check if initialized. This may not be the case if get an error at
+ startup
+ */
+ if (!base)
+ DBUG_VOID_RETURN;
+
+ (void) pthread_mutex_lock(&LOCK_thread_count);
+
+
+ kill_pool_threads= TRUE;
+ while (killed_threads != created_threads)
+ {
+ /* wake up the event loop */
+ char c= 0;
+ send(thd_add_pair[1], &c, sizeof(c), 0);
+ pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
+ }
+ (void) pthread_mutex_unlock(&LOCK_thread_count);
+
+ event_del(&thd_add_event);
+ close_socketpair(thd_add_pair);
+ event_del(&thd_kill_event);
+ close_socketpair(thd_kill_pair);
+ event_base_free(base);
+ base= 0;
+
+ (void) pthread_mutex_destroy(&LOCK_event_loop);
+ (void) pthread_mutex_destroy(&LOCK_thd_add);
+ DBUG_VOID_RETURN;
+}
+
+
+void pool_of_threads_scheduler(scheduler_functions* func)
+{
+ func->max_threads= thread_pool_size;
+ func->max_connections= &max_connections;
+ func->connection_count= &connection_count;
+ func->init= libevent_init;
+ func->end= libevent_end;
+ func->post_kill_notification= libevent_post_kill_notification;
+ func->add_connection= libevent_add_connection;
+}
+
+#endif