/* Copyright (C) 2007 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ /* Implementation for the thread scheduler */ #ifdef USE_PRAGMA_INTERFACE #pragma implementation #endif #include #if MYSQL_VERSION_ID >= 60000 #include "sql_audit.h" #endif /* 'Dummy' functions to be used when we don't need any handling for a scheduler event */ static bool init_dummy(void) {return 0;} static void post_kill_dummy(THD *thd) {} static void end_dummy(void) {} static bool end_thread_dummy(THD *thd, bool cache_thread) { return 0; } /* Initialize default scheduler with dummy functions so that setup functions only need to declare those that are relvant for their usage */ scheduler_functions::scheduler_functions() :init(init_dummy), init_new_connection_thread(init_new_connection_handler_thread), add_connection(0), // Must be defined post_kill_notification(post_kill_dummy), end_thread(end_thread_dummy), end(end_dummy) {} /* End connection, in case when we are using 'no-threads' */ static bool no_threads_end(THD *thd, bool put_in_cache) { unlink_thd(thd); pthread_mutex_unlock(&LOCK_thread_count); return 1; // Abort handle_one_connection } /* Initailize scheduler for --thread-handling=no-threads */ void one_thread_scheduler(scheduler_functions *func) { func->max_threads= 1; max_connections= 1; func->max_connections= &max_connections; func->connection_count= &connection_count; #ifndef EMBEDDED_LIBRARY func->add_connection= handle_connection_in_main_thread; #endif func->init_new_connection_thread= init_dummy; func->end_thread= no_threads_end; } /* Initialize scheduler for --thread-handling=one-thread-per-connection */ #ifndef EMBEDDED_LIBRARY void one_thread_per_connection_scheduler(scheduler_functions *func, ulong *arg_max_connections, uint *arg_connection_count) { func->max_threads= *arg_max_connections + 1; func->max_connections= arg_max_connections; func->connection_count= arg_connection_count; func->add_connection= create_thread_to_handle_connection; func->end_thread= one_thread_per_connection_end; } #endif /* EMBEDDED_LIBRARY */ #if defined(HAVE_LIBEVENT) && HAVE_POOL_OF_THREADS == 1 #include "event.h" static struct event_base *base; static uint created_threads, killed_threads; static bool kill_pool_threads; static struct event thd_add_event; static struct event thd_kill_event; static pthread_mutex_t LOCK_thd_add; /* protects thds_need_adding */ static LIST *thds_need_adding; /* list of thds to add to libevent queue */ static int thd_add_pair[2]; /* pipe to signal add a connection to libevent*/ static int thd_kill_pair[2]; /* pipe to signal kill a connection in libevent */ /* LOCK_event_loop protects the non-thread safe libevent calls (event_add and event_del) and thds_need_processing and thds_waiting_for_io. */ static pthread_mutex_t LOCK_event_loop; static LIST *thds_need_processing; /* list of thds that needs some processing */ static LIST *thds_waiting_for_io; /* list of thds with added events */ pthread_handler_t libevent_thread_proc(void *arg); static void libevent_end(); static bool libevent_needs_immediate_processing(THD *thd); static void libevent_connection_close(THD *thd); static bool libevent_should_close_connection(THD* thd); static void libevent_thd_add(THD* thd); void libevent_io_callback(int Fd, short Operation, void *ctx); void libevent_add_thd_callback(int Fd, short Operation, void *ctx); void libevent_kill_thd_callback(int Fd, short Operation, void *ctx); /* Create a pipe and set to non-blocking. Returns TRUE if there is an error. */ static bool init_socketpair(int sock_pair[]) { sock_pair[0]= sock_pair[1]= -1; return (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, sock_pair) < 0 || evutil_make_socket_nonblocking(sock_pair[0]) == -1 || evutil_make_socket_nonblocking(sock_pair[1]) == -1); } static void close_socketpair(int sock_pair[]) { if (sock_pair[0] != -1) EVUTIL_CLOSESOCKET(sock_pair[0]); if (sock_pair[1] != -1) EVUTIL_CLOSESOCKET(sock_pair[1]); } /* thd_scheduler keeps the link between THD and events. It's embedded in the THD class. */ thd_scheduler::thd_scheduler() : logged_in(FALSE), io_event(NULL), thread_attached(FALSE) { #ifndef DBUG_OFF dbug_explain[0]= '\0'; set_explain= FALSE; #endif } thd_scheduler::~thd_scheduler() { my_free(io_event, MYF(MY_ALLOW_ZERO_PTR)); } bool thd_scheduler::init(THD *parent_thd) { io_event= (struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME)); if (!io_event) { sql_print_error("Memory allocation error in thd_scheduler::init\n"); return TRUE; } event_set(io_event, (int)parent_thd->net.vio->sd, EV_READ, libevent_io_callback, (void*)parent_thd); list.data= parent_thd; return FALSE; } /* Attach/associate the connection with the OS thread, for command processing. */ bool thd_scheduler::thread_attach() { DBUG_ASSERT(!thread_attached); THD* thd = (THD*)list.data; if (libevent_should_close_connection(thd) || setup_connection_thread_globals(thd)) { return TRUE; } my_errno= 0; thd->mysys_var->abort= 0; thread_attached= TRUE; #ifndef DBUG_OFF /* When we attach the thread for a connection for the first time, we know that there is no session value set yet. Thus the initial setting of set_explain to FALSE is OK. */ if (set_explain) DBUG_SET(dbug_explain); #endif return FALSE; } /* Detach/disassociate the connection with the OS thread. */ void thd_scheduler::thread_detach() { if (thread_attached) { THD* thd = (THD*)list.data; thd->reset_globals(); thread_attached= FALSE; #ifndef DBUG_OFF /* If during the session @@session.dbug was assigned, the dbug options/state has been pushed. Check if this is the case, to be able to restore the state when we attach this logical connection to a physical thread. */ if (_db_is_pushed_()) { set_explain= TRUE; if (DBUG_EXPLAIN(dbug_explain, sizeof(dbug_explain))) sql_print_error("thd_scheduler: DBUG_EXPLAIN buffer is too small"); } /* DBUG_POP() is a no-op in case there is no session state */ DBUG_POP(); #endif } } /** Create all threads for the thread pool NOTES After threads are created we wait until all threads has signaled that they have started before we return RETURN 0 ok 1 We got an error creating the thread pool In this case we will abort all created threads */ static bool libevent_init(void) { uint i; DBUG_ENTER("libevent_init"); base= (struct event_base *) event_init(); created_threads= 0; killed_threads= 0; kill_pool_threads= FALSE; pthread_mutex_init(&LOCK_event_loop, MY_MUTEX_INIT_FAST); pthread_mutex_init(&LOCK_thd_add, MY_MUTEX_INIT_FAST); /* set up sockets used to add new thds to the event pool */ if (init_socketpair(thd_add_pair)) { sql_print_error("init_socketpair(thd_add_spair) error in libevent_init"); DBUG_RETURN(1); } /* set up sockets used to kill thds in the event queue */ if (init_socketpair(thd_kill_pair)) { sql_print_error("init_socketpair(thd_kill_pair) error in libevent_init"); close_socketpair(thd_add_pair); DBUG_RETURN(1); } event_set(&thd_add_event, thd_add_pair[0], EV_READ|EV_PERSIST, libevent_add_thd_callback, NULL); event_set(&thd_kill_event, thd_kill_pair[0], EV_READ|EV_PERSIST, libevent_kill_thd_callback, NULL); if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL)) { sql_print_error("thd_add_event event_add error in libevent_init"); libevent_end(); DBUG_RETURN(1); } /* Set up the thread pool */ created_threads= killed_threads= 0; pthread_mutex_lock(&LOCK_thread_count); for (i= 0; i < thread_pool_size; i++) { pthread_t thread; int error; if ((error= pthread_create(&thread, &connection_attrib, libevent_thread_proc, 0))) { sql_print_error("Can't create completion port thread (error %d)", error); pthread_mutex_unlock(&LOCK_thread_count); libevent_end(); // Cleanup DBUG_RETURN(TRUE); } } /* Wait until all threads are created */ while (created_threads != thread_pool_size) pthread_cond_wait(&COND_thread_count,&LOCK_thread_count); pthread_mutex_unlock(&LOCK_thread_count); DBUG_PRINT("info", ("%u threads created", (uint) thread_pool_size)); DBUG_RETURN(FALSE); } /* This is called when data is ready on the socket. NOTES This is only called by the thread that owns LOCK_event_loop. We add the thd that got the data to thds_need_processing, and cause the libevent event_loop() to terminate. Then this same thread will return from event_loop and pick the thd value back up for processing. */ void libevent_io_callback(int, short, void *ctx) { safe_mutex_assert_owner(&LOCK_event_loop); THD *thd= (THD*)ctx; thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->event_scheduler.list); thds_need_processing= list_add(thds_need_processing, &thd->event_scheduler.list); } /* This is called when we have a thread we want to be killed. NOTES This is only called by the thread that owns LOCK_event_loop. */ void libevent_kill_thd_callback(int Fd, short, void*) { safe_mutex_assert_owner(&LOCK_event_loop); /* clear the pending events */ char c; while (recv(Fd, &c, sizeof(c), 0) == sizeof(c)) {} LIST* list= thds_waiting_for_io; while (list) { THD *thd= (THD*)list->data; list= list_rest(list); if ((int) thd->killed >= (int) KILL_CONNECTION) { /* Delete from libevent and add to the processing queue. */ event_del(thd->event_scheduler.io_event); thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->event_scheduler.list); thds_need_processing= list_add(thds_need_processing, &thd->event_scheduler.list); } } } /* This is used to add connections to the pool. This callback is invoked from the libevent event_loop() call whenever the thd_add_pair[1] has a byte written to it. NOTES This is only called by the thread that owns LOCK_event_loop. */ void libevent_add_thd_callback(int Fd, short, void *) { safe_mutex_assert_owner(&LOCK_event_loop); /* clear the pending events */ char c; while (recv(Fd, &c, sizeof(c), 0) == sizeof(c)) {} pthread_mutex_lock(&LOCK_thd_add); while (thds_need_adding) { /* pop the first thd off the list */ THD* thd= (THD*)thds_need_adding->data; thds_need_adding= list_delete(thds_need_adding, thds_need_adding); pthread_mutex_unlock(&LOCK_thd_add); if (!thd->event_scheduler.logged_in || libevent_should_close_connection(thd)) { /* Add thd to thds_need_processing list. If it needs closing we'll close it outside of event_loop(). */ thds_need_processing= list_add(thds_need_processing, &thd->event_scheduler.list); } else { /* Add to libevent */ if (event_add(thd->event_scheduler.io_event, NULL)) { sql_print_error("event_add error in libevent_add_thd_callback"); libevent_connection_close(thd); } else { thds_waiting_for_io= list_add(thds_waiting_for_io, &thd->event_scheduler.list); } } pthread_mutex_lock(&LOCK_thd_add); } pthread_mutex_unlock(&LOCK_thd_add); } /** Notify the thread pool about a new connection NOTES LOCK_thread_count is locked on entry. This function MUST unlock it! */ static void libevent_add_connection(THD *thd) { DBUG_ENTER("libevent_add_connection"); DBUG_PRINT("enter", ("thd: %p thread_id: %lu", thd, thd->thread_id)); if (thd->event_scheduler.init(thd)) { sql_print_error("Scheduler init error in libevent_add_new_connection"); pthread_mutex_unlock(&LOCK_thread_count); libevent_connection_close(thd); DBUG_VOID_RETURN; } threads.append(thd); libevent_thd_add(thd); pthread_mutex_unlock(&LOCK_thread_count); DBUG_VOID_RETURN; } /** @brief Signal a waiting connection it's time to die. @details This function will signal libevent the THD should be killed. Either the global LOCK_thd_count or the THD's LOCK_thd_data must be locked upon entry. @param[in] thd The connection to kill */ static void libevent_post_kill_notification(THD *) { /* Note, we just wake up libevent with an event that a THD should be killed, It will search its list of thds for thd->killed == KILL_CONNECTION to find the THDs it should kill. So we don't actually tell it which one and we don't actually use the THD being passed to us, but that's just a design detail that could change later. */ char c= 0; send(thd_kill_pair[1], &c, sizeof(c), 0); } /* Close and delete a connection. */ static void libevent_connection_close(THD *thd) { DBUG_ENTER("libevent_connection_close"); DBUG_PRINT("enter", ("thd: %p", thd)); thd->killed= KILL_CONNECTION; // Avoid error messages if (thd->net.vio->sd >= 0) // not already closed { end_connection(thd); close_connection(thd, 0, 1); } thd->event_scheduler.thread_detach(); unlink_thd(thd); /* locks LOCK_thread_count and deletes thd */ pthread_mutex_unlock(&LOCK_thread_count); DBUG_VOID_RETURN; } /* Returns true if we should close and delete a THD connection. */ static bool libevent_should_close_connection(THD* thd) { return (thd->net.error || thd->net.vio == 0 || (int) thd->killed >= (int) KILL_CONNECTION); } /* libevent_thread_proc is the outer loop of each thread in the thread pool. These procs only return/terminate on shutdown (kill_pool_threads == true). */ pthread_handler_t libevent_thread_proc(void *arg) { if (init_new_connection_handler_thread()) { my_thread_global_end(); sql_print_error("libevent_thread_proc: my_thread_init() failed"); exit(1); } DBUG_ENTER("libevent_thread_proc"); /* Signal libevent_init() when all threads has been created and are ready to receive events. */ (void) pthread_mutex_lock(&LOCK_thread_count); created_threads++; thread_created++; if (created_threads == thread_pool_size) (void) pthread_cond_signal(&COND_thread_count); (void) pthread_mutex_unlock(&LOCK_thread_count); for (;;) { THD *thd= NULL; (void) pthread_mutex_lock(&LOCK_event_loop); /* get thd(s) to process */ while (!thds_need_processing) { if (kill_pool_threads) { /* the flag that we should die has been set */ (void) pthread_mutex_unlock(&LOCK_event_loop); goto thread_exit; } event_loop(EVLOOP_ONCE); } /* pop the first thd off the list */ thd= (THD*)thds_need_processing->data; thds_need_processing= list_delete(thds_need_processing, thds_need_processing); (void) pthread_mutex_unlock(&LOCK_event_loop); /* now we process the connection (thd) */ /* set up the thd<->thread links. */ thd->thread_stack= (char*) &thd; if (thd->event_scheduler.thread_attach()) { libevent_connection_close(thd); continue; } /* is the connection logged in yet? */ if (!thd->event_scheduler.logged_in) { DBUG_PRINT("info", ("init new connection. sd: %d", thd->net.vio->sd)); lex_start(thd); if (login_connection(thd)) { /* Failed to log in */ libevent_connection_close(thd); continue; } else { /* login successful */ #if MYSQL_VERSION_ID >= 60000 MYSQL_CONNECTION_START(thd->thread_id, thd->security_ctx->priv_user, (char *) thd->security_ctx->host_or_ip); #endif thd->event_scheduler.logged_in= TRUE; prepare_new_connection_state(thd); if (!libevent_needs_immediate_processing(thd)) continue; /* New connection is now waiting for data in libevent*/ } } do { /* Process a query */ if (do_command(thd)) { libevent_connection_close(thd); break; } } while (libevent_needs_immediate_processing(thd)); } thread_exit: DBUG_PRINT("exit", ("ending thread")); (void) pthread_mutex_lock(&LOCK_thread_count); killed_threads++; pthread_cond_broadcast(&COND_thread_count); (void) pthread_mutex_unlock(&LOCK_thread_count); my_thread_end(); pthread_exit(0); DBUG_RETURN(0); /* purify: deadcode */ } /* Returns TRUE if the connection needs immediate processing and FALSE if instead it's queued for libevent processing or closed, */ static bool libevent_needs_immediate_processing(THD *thd) { if (libevent_should_close_connection(thd)) { libevent_connection_close(thd); return FALSE; } /* If more data in the socket buffer, return TRUE to process another command. Note: we cannot add for event processing because the whole request might already be buffered and we wouldn't receive an event. */ if (vio_pending(thd->net.vio) > 0) return TRUE; thd->event_scheduler.thread_detach(); libevent_thd_add(thd); return FALSE; } /* Adds a THD to queued for libevent processing. This call does not actually register the event with libevent. Instead, it places the THD onto a queue and signals libevent by writing a byte into thd_add_pair, which will cause our libevent_add_thd_callback to be invoked which will find the THD on the queue and add it to libevent. */ static void libevent_thd_add(THD* thd) { char c= 0; /* release any audit resources, this thd is going to sleep */ #if MYSQL_VERSION_ID >= 60000 mysql_audit_release(thd); #endif pthread_mutex_lock(&LOCK_thd_add); /* queue for libevent */ thds_need_adding= list_add(thds_need_adding, &thd->event_scheduler.list); /* notify libevent */ send(thd_add_pair[1], &c, sizeof(c), 0); pthread_mutex_unlock(&LOCK_thd_add); } /** Wait until all pool threads have been deleted for clean shutdown */ static void libevent_end() { DBUG_ENTER("libevent_end"); DBUG_PRINT("enter", ("created_threads: %d killed_threads: %u", created_threads, killed_threads)); /* check if initialized. This may not be the case if get an error at startup */ if (!base) DBUG_VOID_RETURN; (void) pthread_mutex_lock(&LOCK_thread_count); kill_pool_threads= TRUE; while (killed_threads != created_threads) { /* wake up the event loop */ char c= 0; send(thd_add_pair[1], &c, sizeof(c), 0); pthread_cond_wait(&COND_thread_count, &LOCK_thread_count); } (void) pthread_mutex_unlock(&LOCK_thread_count); event_del(&thd_add_event); close_socketpair(thd_add_pair); event_del(&thd_kill_event); close_socketpair(thd_kill_pair); event_base_free(base); base= 0; (void) pthread_mutex_destroy(&LOCK_event_loop); (void) pthread_mutex_destroy(&LOCK_thd_add); DBUG_VOID_RETURN; } void pool_of_threads_scheduler(scheduler_functions* func) { func->max_threads= thread_pool_size; func->max_connections= &max_connections; func->connection_count= &connection_count; func->init= libevent_init; func->end= libevent_end; func->post_kill_notification= libevent_post_kill_notification; func->add_connection= libevent_add_connection; } #endif