diff options
author | Michael Widenius <monty@askmonty.org> | 2009-03-13 00:27:35 +0200 |
---|---|---|
committer | Michael Widenius <monty@askmonty.org> | 2009-03-13 00:27:35 +0200 |
commit | 4fe342500953e9cf337aae462fb512a7cec176f8 (patch) | |
tree | af9359248fb5e1174bbd409aa9444c34378bc52d /sql/scheduler.cc | |
parent | 44e6c2f253a26251613529e144545f194e34d9df (diff) | |
download | mariadb-git-4fe342500953e9cf337aae462fb512a7cec176f8.tar.gz |
Added "pool-of-threads" handling (with libevent)
This is a backport of code from MySQL 6.0 with cleanups and extensions
The following new options are supported
configure options:
--with-libevent ; Enable use of libevent, which is needed for pool of threads
mysqld options:
--thread-handling=pool-of-threads ; Use a pool of threads to handle queries
--thread-pool-size=# ; Define how many threads should be created to handle all queries
--extra-port=# ; Extra tcp port that uses the old one-thread-per-connection method
--extra-max-connections=# ; Number of connections to accept to 'extra-port'
--test-ignore-wrong-options ; Ignore setting an enum value to a wrong option (for mysql-test-run)
BUILD/SETUP.sh:
Added libevents (and thus pool-of-threads) to max builds
CMakeLists.txt:
Added libevent
Makefile.am:
Added libevents
config/ac-macros/libevent.m4:
Libevent code for configure
config/ac-macros/libevent_configure.m4:
Libevent code for configure
configure.in:
Added libevents
dbug/dbug.c:
Added _db_is_pushed(); Needed for pool-of-threads code
extra/Makefile.am:
Added libevents
extra/libevent:
Libevent initial code
extra/libevent/CMakeLists.txt:
Libevent initial code
extra/libevent/Makefile.am:
Libevent initial code
extra/libevent/README:
Libevent initial code
extra/libevent/WIN32-Code:
Libevent initial code
extra/libevent/WIN32-Code/config.h:
Libevent initial code
extra/libevent/WIN32-Code/misc.c:
Libevent initial code
extra/libevent/WIN32-Code/misc.h:
Libevent initial code
extra/libevent/WIN32-Code/tree.h:
Libevent initial code
extra/libevent/WIN32-Code/win32.c:
Libevent initial code
extra/libevent/buffer.c:
Libevent initial code
extra/libevent/compat:
Libevent initial code
extra/libevent/compat/sys:
Libevent initial code
extra/libevent/compat/sys/_time.h:
Libevent initial code
extra/libevent/compat/sys/queue.h:
Libevent initial code
extra/libevent/compat/sys/tree.h:
Libevent initial code
extra/libevent/devpoll.c:
Libevent initial code
extra/libevent/epoll.c:
Libevent initial code
extra/libevent/epoll_sub.c:
Libevent initial code
extra/libevent/evbuffer.c:
Libevent initial code
extra/libevent/evdns.c:
Libevent initial code
extra/libevent/evdns.h:
Libevent initial code
extra/libevent/event-config.h:
Libevent initial code
extra/libevent/event-internal.h:
Libevent initial code
extra/libevent/event.c:
Libevent initial code
extra/libevent/event.h:
Libevent initial code
extra/libevent/event_tagging.c:
Libevent initial code
extra/libevent/evhttp.h:
Libevent initial code
extra/libevent/evport.c:
Libevent initial code
extra/libevent/evrpc-internal.h:
Libevent initial code
extra/libevent/evrpc.c:
Libevent initial code
extra/libevent/evrpc.h:
Libevent initial code
extra/libevent/evsignal.h:
Libevent initial code
extra/libevent/evutil.c:
Libevent initial code
extra/libevent/evutil.h:
Libevent initial code
extra/libevent/http-internal.h:
Libevent initial code
extra/libevent/http.c:
Libevent initial code
extra/libevent/kqueue.c:
Libevent initial code
extra/libevent/log.c:
Libevent initial code
extra/libevent/log.h:
Libevent initial code
extra/libevent/min_heap.h:
Libevent initial code
extra/libevent/poll.c:
Libevent initial code
extra/libevent/select.c:
Libevent initial code
extra/libevent/signal.c:
Libevent initial code
extra/libevent/strlcpy-internal.h:
Libevent initial code
extra/libevent/strlcpy.c:
Libevent initial code
include/config-win.h:
Libevent support
include/my_dbug.h:
ADded _db_is_pushed
include/mysql.h.pp:
Update to handle new prototypes
include/typelib.h:
Split find_type_or_exit() into two functions
include/violite.h:
Added vio_is_pending()
libmysqld/Makefile.am:
Added libevent
mysql-test/include/have_pool_of_threads.inc:
Added test for pool-of-threads
mysql-test/mysql-test-run.pl:
Don't abort based on time and don't retry test cases when run under --gdb or --debug
mysql-test/r/crash_commit_before.result:
USE GLOBAL for debug variable
mysql-test/r/have_pool_of_threads.require:
Added test for pool-of-threads
mysql-test/r/pool_of_threads.result:
Added test for pool-of-threads
mysql-test/r/subselect_debug.result:
USE GLOBAL for debug variable
mysql-test/t/crash_commit_before.test:
USE GLOBAL for debug variable
mysql-test/t/merge-big.test:
USE GLOBAL for debug variable
mysql-test/t/pool_of_threads-master.opt:
Added test for pool-of-threads
mysql-test/t/pool_of_threads.test:
Added test for pool-of-threads
mysys/typelib.c:
Split find_type_or_exit() into find_type_with_warning()
sql/Makefile.am:
Added libevent
sql/handler.cc:
Indentation fix.
Fixed memory loss bug
Fixed crash on exit when handler plugin failed
sql/mysql_priv.h:
Added extra_max_connections and mysqld_extra_port
Added extern functions from sql_connect.cc
sql/mysqld.cc:
Added support for new mysqld options
Added code for 'extra-port' and 'extra-max-connections'
Split some functions into smaller pieces to be able to reuse code
Added code for test-ignore-wrong-options
sql/scheduler.cc:
Updated schduler code from MySQL 6.0
sql/scheduler.h:
Updated schduler code from MySQL 6.0
sql/set_var.cc:
Added support for changing "extra_max_connections"
sql/sql_class.cc:
Iniitalize thread schduler options in THD
sql/sql_class.h:
Added to extra_port and scheduler to 'THD'
sql/sql_connect.cc:
Use thd->schduler to check number of connections and terminate connection
Made some local functions global (for scheduler.cc)
vio/viosocket.c:
Added 'vio_pending', needed for scheduler..c
Diffstat (limited to 'sql/scheduler.cc')
-rw-r--r-- | sql/scheduler.cc | 678 |
1 files changed, 674 insertions, 4 deletions
diff --git a/sql/scheduler.cc b/sql/scheduler.cc index b05bdf4756f..be068859a6f 100644 --- a/sql/scheduler.cc +++ b/sql/scheduler.cc @@ -22,6 +22,9 @@ #endif #include <mysql_priv.h> +#if MYSQL_VERSION_ID >= 60000 +#include "sql_audit.h" +#endif /* 'Dummy' functions to be used when we don't need any handling for a scheduler @@ -29,7 +32,7 @@ */ static bool init_dummy(void) {return 0;} -static void post_kill_dummy(THD* thd) {} +static void post_kill_dummy(THD *thd) {} static void end_dummy(void) {} static bool end_thread_dummy(THD *thd, bool cache_thread) { return 0; } @@ -63,9 +66,12 @@ static bool no_threads_end(THD *thd, bool put_in_cache) Initailize scheduler for --thread-handling=no-threads */ -void one_thread_scheduler(scheduler_functions* func) +void one_thread_scheduler(scheduler_functions *func) { func->max_threads= 1; + max_connections= 1; + func->max_connections= &max_connections; + func->connection_count= &connection_count; #ifndef EMBEDDED_LIBRARY func->add_connection= handle_connection_in_main_thread; #endif @@ -79,10 +85,674 @@ void one_thread_scheduler(scheduler_functions* func) */ #ifndef EMBEDDED_LIBRARY -void one_thread_per_connection_scheduler(scheduler_functions* func) +void one_thread_per_connection_scheduler(scheduler_functions *func, + ulong *arg_max_connections, + uint *arg_connection_count) { - func->max_threads= max_connections; + func->max_threads= *arg_max_connections + 1; + func->max_connections= arg_max_connections; + func->connection_count= arg_connection_count; func->add_connection= create_thread_to_handle_connection; func->end_thread= one_thread_per_connection_end; } #endif /* EMBEDDED_LIBRARY */ + + +#if defined(HAVE_LIBEVENT) && HAVE_POOL_OF_THREADS == 1 + +#include "event.h" + +static struct event_base *base; + +static uint created_threads, killed_threads; +static bool kill_pool_threads; + +static struct event thd_add_event; +static struct event thd_kill_event; + +static pthread_mutex_t LOCK_thd_add; /* protects thds_need_adding */ +static LIST *thds_need_adding; /* list of thds to add to libevent queue */ + +static int thd_add_pair[2]; /* pipe to signal add a connection to libevent*/ +static int thd_kill_pair[2]; /* pipe to signal kill a connection in libevent */ + +/* + LOCK_event_loop protects the non-thread safe libevent calls (event_add and + event_del) and thds_need_processing and thds_waiting_for_io. +*/ +static pthread_mutex_t LOCK_event_loop; +static LIST *thds_need_processing; /* list of thds that needs some processing */ +static LIST *thds_waiting_for_io; /* list of thds with added events */ + +pthread_handler_t libevent_thread_proc(void *arg); +static void libevent_end(); +static bool libevent_needs_immediate_processing(THD *thd); +static void libevent_connection_close(THD *thd); +static bool libevent_should_close_connection(THD* thd); +static void libevent_thd_add(THD* thd); +void libevent_io_callback(int Fd, short Operation, void *ctx); +void libevent_add_thd_callback(int Fd, short Operation, void *ctx); +void libevent_kill_thd_callback(int Fd, short Operation, void *ctx); + + +/* + Create a pipe and set to non-blocking. + Returns TRUE if there is an error. +*/ + +static bool init_socketpair(int sock_pair[]) +{ + sock_pair[0]= sock_pair[1]= -1; + return (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, sock_pair) < 0 || + evutil_make_socket_nonblocking(sock_pair[0]) == -1 || + evutil_make_socket_nonblocking(sock_pair[1]) == -1); +} + +static void close_socketpair(int sock_pair[]) +{ + if (sock_pair[0] != -1) + EVUTIL_CLOSESOCKET(sock_pair[0]); + if (sock_pair[1] != -1) + EVUTIL_CLOSESOCKET(sock_pair[1]); +} + +/* + thd_scheduler keeps the link between THD and events. + It's embedded in the THD class. +*/ + +thd_scheduler::thd_scheduler() + : logged_in(FALSE), io_event(NULL), thread_attached(FALSE) +{ +#ifndef DBUG_OFF + dbug_explain[0]= '\0'; + set_explain= FALSE; +#endif +} + + +thd_scheduler::~thd_scheduler() +{ + my_free(io_event, MYF(MY_ALLOW_ZERO_PTR)); +} + + +bool thd_scheduler::init(THD *parent_thd) +{ + io_event= + (struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME)); + + if (!io_event) + { + sql_print_error("Memory allocation error in thd_scheduler::init\n"); + return TRUE; + } + + event_set(io_event, (int)parent_thd->net.vio->sd, EV_READ, + libevent_io_callback, (void*)parent_thd); + + list.data= parent_thd; + + return FALSE; +} + + +/* + Attach/associate the connection with the OS thread, for command processing. +*/ + +bool thd_scheduler::thread_attach() +{ + DBUG_ASSERT(!thread_attached); + THD* thd = (THD*)list.data; + if (libevent_should_close_connection(thd) || + setup_connection_thread_globals(thd)) + { + return TRUE; + } + my_errno= 0; + thd->mysys_var->abort= 0; + thread_attached= TRUE; +#ifndef DBUG_OFF + /* + When we attach the thread for a connection for the first time, + we know that there is no session value set yet. Thus + the initial setting of set_explain to FALSE is OK. + */ + if (set_explain) + DBUG_SET(dbug_explain); +#endif + return FALSE; +} + + +/* + Detach/disassociate the connection with the OS thread. +*/ + +void thd_scheduler::thread_detach() +{ + if (thread_attached) + { + THD* thd = (THD*)list.data; + pthread_mutex_lock(&thd->LOCK_delete); + thd->mysys_var= NULL; + pthread_mutex_unlock(&thd->LOCK_delete); + thread_attached= FALSE; +#ifndef DBUG_OFF + /* + If during the session @@session.dbug was assigned, the + dbug options/state has been pushed. Check if this is the + case, to be able to restore the state when we attach this + logical connection to a physical thread. + */ + if (_db_is_pushed_()) + { + set_explain= TRUE; + if (DBUG_EXPLAIN(dbug_explain, sizeof(dbug_explain))) + sql_print_error("thd_scheduler: DBUG_EXPLAIN buffer is too small"); + } + /* DBUG_POP() is a no-op in case there is no session state */ + DBUG_POP(); +#endif + } +} + +/** + Create all threads for the thread pool + + NOTES + After threads are created we wait until all threads has signaled that + they have started before we return + + RETURN + 0 ok + 1 We got an error creating the thread pool + In this case we will abort all created threads +*/ + +static bool libevent_init(void) +{ + uint i; + DBUG_ENTER("libevent_init"); + + base= (struct event_base *) event_init(); + + created_threads= 0; + killed_threads= 0; + kill_pool_threads= FALSE; + + pthread_mutex_init(&LOCK_event_loop, NULL); + pthread_mutex_init(&LOCK_thd_add, NULL); + + /* set up sockets used to add new thds to the event pool */ + if (init_socketpair(thd_add_pair)) + { + sql_print_error("init_socketpair(thd_add_spair) error in libevent_init"); + DBUG_RETURN(1); + } + /* set up sockets used to kill thds in the event queue */ + if (init_socketpair(thd_kill_pair)) + { + sql_print_error("init_socketpair(thd_kill_pair) error in libevent_init"); + close_socketpair(thd_add_pair); + DBUG_RETURN(1); + } + event_set(&thd_add_event, thd_add_pair[0], EV_READ|EV_PERSIST, + libevent_add_thd_callback, NULL); + event_set(&thd_kill_event, thd_kill_pair[0], EV_READ|EV_PERSIST, + libevent_kill_thd_callback, NULL); + + if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL)) + { + sql_print_error("thd_add_event event_add error in libevent_init"); + libevent_end(); + DBUG_RETURN(1); + } + /* Set up the thread pool */ + created_threads= killed_threads= 0; + pthread_mutex_lock(&LOCK_thread_count); + + for (i= 0; i < thread_pool_size; i++) + { + pthread_t thread; + int error; + if ((error= pthread_create(&thread, &connection_attrib, + libevent_thread_proc, 0))) + { + sql_print_error("Can't create completion port thread (error %d)", + error); + pthread_mutex_unlock(&LOCK_thread_count); + libevent_end(); // Cleanup + DBUG_RETURN(TRUE); + } + } + + /* Wait until all threads are created */ + while (created_threads != thread_pool_size) + pthread_cond_wait(&COND_thread_count,&LOCK_thread_count); + pthread_mutex_unlock(&LOCK_thread_count); + + DBUG_PRINT("info", ("%u threads created", (uint) thread_pool_size)); + DBUG_RETURN(FALSE); +} + + +/* + This is called when data is ready on the socket. + + NOTES + This is only called by the thread that owns LOCK_event_loop. + + We add the thd that got the data to thds_need_processing, and + cause the libevent event_loop() to terminate. Then this same thread will + return from event_loop and pick the thd value back up for processing. +*/ + +void libevent_io_callback(int, short, void *ctx) +{ + safe_mutex_assert_owner(&LOCK_event_loop); + THD *thd= (THD*)ctx; + thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->event_scheduler.list); + thds_need_processing= list_add(thds_need_processing, &thd->event_scheduler.list); +} + +/* + This is called when we have a thread we want to be killed. + + NOTES + This is only called by the thread that owns LOCK_event_loop. +*/ + +void libevent_kill_thd_callback(int Fd, short, void*) +{ + safe_mutex_assert_owner(&LOCK_event_loop); + + /* clear the pending events */ + char c; + while (recv(Fd, &c, sizeof(c), 0) == sizeof(c)) + {} + + LIST* list= thds_waiting_for_io; + while (list) + { + THD *thd= (THD*)list->data; + list= list_rest(list); + if (thd->killed == THD::KILL_CONNECTION) + { + /* + Delete from libevent and add to the processing queue. + */ + event_del(thd->event_scheduler.io_event); + thds_waiting_for_io= list_delete(thds_waiting_for_io, + &thd->event_scheduler.list); + thds_need_processing= list_add(thds_need_processing, + &thd->event_scheduler.list); + } + } +} + + +/* + This is used to add connections to the pool. This callback is invoked from + the libevent event_loop() call whenever the thd_add_pair[1] has a byte + written to it. + + NOTES + This is only called by the thread that owns LOCK_event_loop. +*/ + +void libevent_add_thd_callback(int Fd, short, void *) +{ + safe_mutex_assert_owner(&LOCK_event_loop); + + /* clear the pending events */ + char c; + while (recv(Fd, &c, sizeof(c), 0) == sizeof(c)) + {} + + pthread_mutex_lock(&LOCK_thd_add); + while (thds_need_adding) + { + /* pop the first thd off the list */ + THD* thd= (THD*)thds_need_adding->data; + thds_need_adding= list_delete(thds_need_adding, thds_need_adding); + + pthread_mutex_unlock(&LOCK_thd_add); + + if (!thd->event_scheduler.logged_in || libevent_should_close_connection(thd)) + { + /* + Add thd to thds_need_processing list. If it needs closing we'll close + it outside of event_loop(). + */ + thds_need_processing= list_add(thds_need_processing, + &thd->event_scheduler.list); + } + else + { + /* Add to libevent */ + if (event_add(thd->event_scheduler.io_event, NULL)) + { + sql_print_error("event_add error in libevent_add_thd_callback"); + libevent_connection_close(thd); + } + else + { + thds_waiting_for_io= list_add(thds_waiting_for_io, + &thd->event_scheduler.list); + } + } + pthread_mutex_lock(&LOCK_thd_add); + } + pthread_mutex_unlock(&LOCK_thd_add); +} + + +/** + Notify the thread pool about a new connection + + NOTES + LOCK_thread_count is locked on entry. This function MUST unlock it! +*/ + +static void libevent_add_connection(THD *thd) +{ + DBUG_ENTER("libevent_add_connection"); + DBUG_PRINT("enter", ("thd: %p thread_id: %lu", + thd, thd->thread_id)); + + if (thd->event_scheduler.init(thd)) + { + sql_print_error("Scheduler init error in libevent_add_new_connection"); + pthread_mutex_unlock(&LOCK_thread_count); + libevent_connection_close(thd); + DBUG_VOID_RETURN; + } + threads.append(thd); + libevent_thd_add(thd); + + pthread_mutex_unlock(&LOCK_thread_count); + DBUG_VOID_RETURN; +} + + +/** + @brief Signal a waiting connection it's time to die. + + @details This function will signal libevent the THD should be killed. + Either the global LOCK_thd_count or the THD's LOCK_delete must be locked + upon entry. + + @param[in] thd The connection to kill +*/ + +static void libevent_post_kill_notification(THD *) +{ + /* + Note, we just wake up libevent with an event that a THD should be killed, + It will search its list of thds for thd->killed == KILL_CONNECTION to + find the THDs it should kill. + + So we don't actually tell it which one and we don't actually use the + THD being passed to us, but that's just a design detail that could change + later. + */ + char c= 0; + send(thd_kill_pair[1], &c, sizeof(c), 0); +} + + +/* + Close and delete a connection. +*/ + +static void libevent_connection_close(THD *thd) +{ + DBUG_ENTER("libevent_connection_close"); + DBUG_PRINT("enter", ("thd: %p", thd)); + + thd->killed= THD::KILL_CONNECTION; // Avoid error messages + + if (thd->net.vio->sd >= 0) // not already closed + { + end_connection(thd); + close_connection(thd, 0, 1); + } + thd->event_scheduler.thread_detach(); + unlink_thd(thd); /* locks LOCK_thread_count and deletes thd */ + pthread_mutex_unlock(&LOCK_thread_count); + + DBUG_VOID_RETURN; +} + + +/* + Returns true if we should close and delete a THD connection. +*/ + +static bool libevent_should_close_connection(THD* thd) +{ + return thd->net.error || + thd->net.vio == 0 || + thd->killed == THD::KILL_CONNECTION; +} + + +/* + libevent_thread_proc is the outer loop of each thread in the thread pool. + These procs only return/terminate on shutdown (kill_pool_threads == true). +*/ + +pthread_handler_t libevent_thread_proc(void *arg) +{ + if (init_new_connection_handler_thread()) + { + my_thread_global_end(); + sql_print_error("libevent_thread_proc: my_thread_init() failed"); + exit(1); + } + DBUG_ENTER("libevent_thread_proc"); + + /* + Signal libevent_init() when all threads has been created and are ready to + receive events. + */ + (void) pthread_mutex_lock(&LOCK_thread_count); + created_threads++; + thread_created++; + if (created_threads == thread_pool_size) + (void) pthread_cond_signal(&COND_thread_count); + (void) pthread_mutex_unlock(&LOCK_thread_count); + + for (;;) + { + THD *thd= NULL; + (void) pthread_mutex_lock(&LOCK_event_loop); + + /* get thd(s) to process */ + while (!thds_need_processing) + { + if (kill_pool_threads) + { + /* the flag that we should die has been set */ + (void) pthread_mutex_unlock(&LOCK_event_loop); + goto thread_exit; + } + event_loop(EVLOOP_ONCE); + } + + /* pop the first thd off the list */ + thd= (THD*)thds_need_processing->data; + thds_need_processing= list_delete(thds_need_processing, + thds_need_processing); + + (void) pthread_mutex_unlock(&LOCK_event_loop); + + /* now we process the connection (thd) */ + + /* set up the thd<->thread links. */ + thd->thread_stack= (char*) &thd; + + if (thd->event_scheduler.thread_attach()) + { + libevent_connection_close(thd); + continue; + } + + /* is the connection logged in yet? */ + if (!thd->event_scheduler.logged_in) + { + DBUG_PRINT("info", ("init new connection. sd: %d", + thd->net.vio->sd)); + lex_start(thd); + if (login_connection(thd)) + { + /* Failed to log in */ + libevent_connection_close(thd); + continue; + } + else + { + /* login successful */ +#if MYSQL_VERSION_ID >= 60000 + MYSQL_CONNECTION_START(thd->thread_id, thd->security_ctx->priv_user, + (char *) thd->security_ctx->host_or_ip); +#endif + thd->event_scheduler.logged_in= TRUE; + prepare_new_connection_state(thd); + if (!libevent_needs_immediate_processing(thd)) + continue; /* New connection is now waiting for data in libevent*/ + } + } + + do + { + /* Process a query */ + if (do_command(thd)) + { + libevent_connection_close(thd); + break; + } + } while (libevent_needs_immediate_processing(thd)); + } + +thread_exit: + DBUG_PRINT("exit", ("ending thread")); + (void) pthread_mutex_lock(&LOCK_thread_count); + killed_threads++; + pthread_cond_broadcast(&COND_thread_count); + (void) pthread_mutex_unlock(&LOCK_thread_count); + my_thread_end(); + pthread_exit(0); + DBUG_RETURN(0); /* purify: deadcode */ +} + + +/* + Returns TRUE if the connection needs immediate processing and FALSE if + instead it's queued for libevent processing or closed, +*/ + +static bool libevent_needs_immediate_processing(THD *thd) +{ + if (libevent_should_close_connection(thd)) + { + libevent_connection_close(thd); + return FALSE; + } + /* + If more data in the socket buffer, return TRUE to process another command. + + Note: we cannot add for event processing because the whole request might + already be buffered and we wouldn't receive an event. + */ + if (vio_pending(thd->net.vio) > 0) + return TRUE; + + thd->event_scheduler.thread_detach(); + libevent_thd_add(thd); + return FALSE; +} + + +/* + Adds a THD to queued for libevent processing. + + This call does not actually register the event with libevent. + Instead, it places the THD onto a queue and signals libevent by writing + a byte into thd_add_pair, which will cause our libevent_add_thd_callback to + be invoked which will find the THD on the queue and add it to libevent. +*/ + +static void libevent_thd_add(THD* thd) +{ + char c= 0; + /* release any audit resources, this thd is going to sleep */ +#if MYSQL_VERSION_ID >= 60000 + mysql_audit_release(thd); +#endif + pthread_mutex_lock(&LOCK_thd_add); + /* queue for libevent */ + thds_need_adding= list_add(thds_need_adding, &thd->event_scheduler.list); + /* notify libevent */ + send(thd_add_pair[1], &c, sizeof(c), 0); + pthread_mutex_unlock(&LOCK_thd_add); +} + + +/** + Wait until all pool threads have been deleted for clean shutdown +*/ + +static void libevent_end() +{ + DBUG_ENTER("libevent_end"); + DBUG_PRINT("enter", ("created_threads: %d killed_threads: %u", + created_threads, killed_threads)); + + /* + check if initialized. This may not be the case if get an error at + startup + */ + if (!base) + DBUG_VOID_RETURN; + + (void) pthread_mutex_lock(&LOCK_thread_count); + + + kill_pool_threads= TRUE; + while (killed_threads != created_threads) + { + /* wake up the event loop */ + char c= 0; + send(thd_add_pair[1], &c, sizeof(c), 0); + pthread_cond_wait(&COND_thread_count, &LOCK_thread_count); + } + (void) pthread_mutex_unlock(&LOCK_thread_count); + + event_del(&thd_add_event); + close_socketpair(thd_add_pair); + event_del(&thd_kill_event); + close_socketpair(thd_kill_pair); + event_base_free(base); + base= 0; + + (void) pthread_mutex_destroy(&LOCK_event_loop); + (void) pthread_mutex_destroy(&LOCK_thd_add); + DBUG_VOID_RETURN; +} + + +void pool_of_threads_scheduler(scheduler_functions* func) +{ + func->max_threads= thread_pool_size; + func->max_connections= &max_connections; + func->connection_count= &connection_count; + func->init= libevent_init; + func->end= libevent_end; + func->post_kill_notification= libevent_post_kill_notification; + func->add_connection= libevent_add_connection; +} + +#endif |