diff options
author | Michael Widenius <monty@askmonty.org> | 2009-03-13 00:27:35 +0200 |
---|---|---|
committer | Michael Widenius <monty@askmonty.org> | 2009-03-13 00:27:35 +0200 |
commit | 4fe342500953e9cf337aae462fb512a7cec176f8 (patch) | |
tree | af9359248fb5e1174bbd409aa9444c34378bc52d /sql | |
parent | 44e6c2f253a26251613529e144545f194e34d9df (diff) | |
download | mariadb-git-4fe342500953e9cf337aae462fb512a7cec176f8.tar.gz |
Added "pool-of-threads" handling (with libevent)
This is a backport of code from MySQL 6.0 with cleanups and extensions
The following new options are supported
configure options:
--with-libevent ; Enable use of libevent, which is needed for pool of threads
mysqld options:
--thread-handling=pool-of-threads ; Use a pool of threads to handle queries
--thread-pool-size=# ; Define how many threads should be created to handle all queries
--extra-port=# ; Extra tcp port that uses the old one-thread-per-connection method
--extra-max-connections=# ; Number of connections to accept to 'extra-port'
--test-ignore-wrong-options ; Ignore setting an enum value to a wrong option (for mysql-test-run)
BUILD/SETUP.sh:
Added libevents (and thus pool-of-threads) to max builds
CMakeLists.txt:
Added libevent
Makefile.am:
Added libevents
config/ac-macros/libevent.m4:
Libevent code for configure
config/ac-macros/libevent_configure.m4:
Libevent code for configure
configure.in:
Added libevents
dbug/dbug.c:
Added _db_is_pushed(); Needed for pool-of-threads code
extra/Makefile.am:
Added libevents
extra/libevent:
Libevent initial code
extra/libevent/CMakeLists.txt:
Libevent initial code
extra/libevent/Makefile.am:
Libevent initial code
extra/libevent/README:
Libevent initial code
extra/libevent/WIN32-Code:
Libevent initial code
extra/libevent/WIN32-Code/config.h:
Libevent initial code
extra/libevent/WIN32-Code/misc.c:
Libevent initial code
extra/libevent/WIN32-Code/misc.h:
Libevent initial code
extra/libevent/WIN32-Code/tree.h:
Libevent initial code
extra/libevent/WIN32-Code/win32.c:
Libevent initial code
extra/libevent/buffer.c:
Libevent initial code
extra/libevent/compat:
Libevent initial code
extra/libevent/compat/sys:
Libevent initial code
extra/libevent/compat/sys/_time.h:
Libevent initial code
extra/libevent/compat/sys/queue.h:
Libevent initial code
extra/libevent/compat/sys/tree.h:
Libevent initial code
extra/libevent/devpoll.c:
Libevent initial code
extra/libevent/epoll.c:
Libevent initial code
extra/libevent/epoll_sub.c:
Libevent initial code
extra/libevent/evbuffer.c:
Libevent initial code
extra/libevent/evdns.c:
Libevent initial code
extra/libevent/evdns.h:
Libevent initial code
extra/libevent/event-config.h:
Libevent initial code
extra/libevent/event-internal.h:
Libevent initial code
extra/libevent/event.c:
Libevent initial code
extra/libevent/event.h:
Libevent initial code
extra/libevent/event_tagging.c:
Libevent initial code
extra/libevent/evhttp.h:
Libevent initial code
extra/libevent/evport.c:
Libevent initial code
extra/libevent/evrpc-internal.h:
Libevent initial code
extra/libevent/evrpc.c:
Libevent initial code
extra/libevent/evrpc.h:
Libevent initial code
extra/libevent/evsignal.h:
Libevent initial code
extra/libevent/evutil.c:
Libevent initial code
extra/libevent/evutil.h:
Libevent initial code
extra/libevent/http-internal.h:
Libevent initial code
extra/libevent/http.c:
Libevent initial code
extra/libevent/kqueue.c:
Libevent initial code
extra/libevent/log.c:
Libevent initial code
extra/libevent/log.h:
Libevent initial code
extra/libevent/min_heap.h:
Libevent initial code
extra/libevent/poll.c:
Libevent initial code
extra/libevent/select.c:
Libevent initial code
extra/libevent/signal.c:
Libevent initial code
extra/libevent/strlcpy-internal.h:
Libevent initial code
extra/libevent/strlcpy.c:
Libevent initial code
include/config-win.h:
Libevent support
include/my_dbug.h:
ADded _db_is_pushed
include/mysql.h.pp:
Update to handle new prototypes
include/typelib.h:
Split find_type_or_exit() into two functions
include/violite.h:
Added vio_is_pending()
libmysqld/Makefile.am:
Added libevent
mysql-test/include/have_pool_of_threads.inc:
Added test for pool-of-threads
mysql-test/mysql-test-run.pl:
Don't abort based on time and don't retry test cases when run under --gdb or --debug
mysql-test/r/crash_commit_before.result:
USE GLOBAL for debug variable
mysql-test/r/have_pool_of_threads.require:
Added test for pool-of-threads
mysql-test/r/pool_of_threads.result:
Added test for pool-of-threads
mysql-test/r/subselect_debug.result:
USE GLOBAL for debug variable
mysql-test/t/crash_commit_before.test:
USE GLOBAL for debug variable
mysql-test/t/merge-big.test:
USE GLOBAL for debug variable
mysql-test/t/pool_of_threads-master.opt:
Added test for pool-of-threads
mysql-test/t/pool_of_threads.test:
Added test for pool-of-threads
mysys/typelib.c:
Split find_type_or_exit() into find_type_with_warning()
sql/Makefile.am:
Added libevent
sql/handler.cc:
Indentation fix.
Fixed memory loss bug
Fixed crash on exit when handler plugin failed
sql/mysql_priv.h:
Added extra_max_connections and mysqld_extra_port
Added extern functions from sql_connect.cc
sql/mysqld.cc:
Added support for new mysqld options
Added code for 'extra-port' and 'extra-max-connections'
Split some functions into smaller pieces to be able to reuse code
Added code for test-ignore-wrong-options
sql/scheduler.cc:
Updated schduler code from MySQL 6.0
sql/scheduler.h:
Updated schduler code from MySQL 6.0
sql/set_var.cc:
Added support for changing "extra_max_connections"
sql/sql_class.cc:
Iniitalize thread schduler options in THD
sql/sql_class.h:
Added to extra_port and scheduler to 'THD'
sql/sql_connect.cc:
Use thd->schduler to check number of connections and terminate connection
Made some local functions global (for scheduler.cc)
vio/viosocket.c:
Added 'vio_pending', needed for scheduler..c
Diffstat (limited to 'sql')
-rw-r--r-- | sql/Makefile.am | 5 | ||||
-rw-r--r-- | sql/handler.cc | 8 | ||||
-rw-r--r-- | sql/mysql_priv.h | 9 | ||||
-rw-r--r-- | sql/mysqld.cc | 438 | ||||
-rw-r--r-- | sql/scheduler.cc | 678 | ||||
-rw-r--r-- | sql/scheduler.h | 38 | ||||
-rw-r--r-- | sql/set_var.cc | 9 | ||||
-rw-r--r-- | sql/sql_class.cc | 2 | ||||
-rw-r--r-- | sql/sql_class.h | 7 | ||||
-rw-r--r-- | sql/sql_connect.cc | 27 |
10 files changed, 1029 insertions, 192 deletions
diff --git a/sql/Makefile.am b/sql/Makefile.am index e477a6123ec..a3559b38ce4 100644 --- a/sql/Makefile.am +++ b/sql/Makefile.am @@ -22,7 +22,8 @@ MYSQLLIBdir= $(pkglibdir) pkgplugindir = $(pkglibdir)/plugin INCLUDES = @ZLIB_INCLUDES@ \ -I$(top_builddir)/include -I$(top_srcdir)/include \ - -I$(top_srcdir)/regex -I$(srcdir) $(openssl_includes) + -I$(top_srcdir)/regex -I$(srcdir) $(openssl_includes) \ + $(libevent_includes) WRAPLIBS= @WRAPLIBS@ SUBDIRS = share libexec_PROGRAMS = mysqld @@ -41,7 +42,7 @@ mysqld_DEPENDENCIES= @mysql_plugin_libs@ $(SUPPORTING_LIBS) libndb.la LDADD = $(SUPPORTING_LIBS) @ZLIB_LIBS@ @NDB_SCI_LIBS@ mysqld_LDADD = libndb.la \ @MYSQLD_EXTRA_LDFLAGS@ \ - @pstack_libs@ \ + @pstack_libs@ $(libevent_libs) \ @mysql_plugin_libs@ \ $(LDADD) $(CXXLDFLAGS) $(WRAPLIBS) @LIBDL@ \ $(yassl_libs) $(openssl_libs) @MYSQLD_EXTRA_LIBS@ diff --git a/sql/handler.cc b/sql/handler.cc index f2ac55159e7..2d5278faf7f 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -377,8 +377,7 @@ int ha_finalize_handlerton(st_plugin_int *plugin) if (!hton) goto end; - switch (hton->state) - { + switch (hton->state) { case SHOW_OPTION_NO: case SHOW_OPTION_DISABLED: break; @@ -435,6 +434,9 @@ int ha_initialize_handlerton(st_plugin_int *plugin) { sql_print_error("Plugin '%s' init function returned error.", plugin->name.str); + /* Free data, so that we don't refer to it in ha_finalize_handlerton */ + my_free(hton, MYF(0)); + plugin->data= 0; goto err; } } @@ -463,6 +465,8 @@ int ha_initialize_handlerton(st_plugin_int *plugin) if (idx == (int) DB_TYPE_DEFAULT) { sql_print_warning("Too many storage engines!"); + my_free(hton, MYF(0)); + plugin->data= 0; DBUG_RETURN(1); } if (hton->db_type != DB_TYPE_UNKNOWN) diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h index 42131340d6d..7753e98da41 100644 --- a/sql/mysql_priv.h +++ b/sql/mysql_priv.h @@ -1023,6 +1023,9 @@ void time_out_user_resource_limits(THD *thd, USER_CONN *uc); void decrease_user_connections(USER_CONN *uc); void thd_init_client_charset(THD *thd, uint cs_number); bool setup_connection_thread_globals(THD *thd); +bool login_connection(THD *thd); +void end_connection(THD *thd); +void prepare_new_connection_state(THD* thd); int mysql_create_db(THD *thd, char *db, HA_CREATE_INFO *create, bool silent); bool mysql_alter_db(THD *thd, const char *db, HA_CREATE_INFO *create); @@ -1912,6 +1915,7 @@ extern ulong query_cache_size, query_cache_min_res_unit; extern ulong slow_launch_threads, slow_launch_time; extern ulong table_cache_size, table_def_size; extern ulong max_connections,max_connect_errors, connect_timeout; +extern ulong extra_max_connections; extern ulong slave_net_timeout, slave_trans_retries; extern uint max_user_connections; extern ulong what_to_log,flush_time; @@ -1933,7 +1937,7 @@ extern ulong opt_tc_log_size, tc_log_max_pages_used, tc_log_page_size; extern ulong tc_log_page_waits; extern my_bool relay_log_purge, opt_innodb_safe_binlog, opt_innodb; extern uint test_flags,select_errors,ha_open_options; -extern uint protocol_version, mysqld_port, dropping_tables; +extern uint protocol_version, mysqld_port, mysqld_extra_port, dropping_tables; extern uint delay_key_write_options; #endif /* MYSQL_SERVER */ #if defined MYSQL_SERVER || defined INNODB_COMPATIBILITY_HOOKS @@ -1957,7 +1961,8 @@ extern bool opt_disable_networking, opt_skip_show_db; extern my_bool opt_character_set_client_handshake; extern bool volatile abort_loop, shutdown_in_progress; extern uint volatile thread_count, thread_running, global_read_lock; -extern uint connection_count; +extern ulong thread_created; +extern uint connection_count, extra_connection_count; extern my_bool opt_sql_bin_update, opt_safe_user_create, opt_no_mix_types; extern my_bool opt_safe_show_db, opt_local_infile, opt_myisam_use_mmap; extern my_bool opt_slave_compressed_protocol, use_temp_pool; diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 4df499bf978..8f477039011 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -357,8 +357,9 @@ static bool volatile select_thread_in_use, signal_thread_in_use; static bool volatile ready_to_exit; static my_bool opt_debugging= 0, opt_external_locking= 0, opt_console= 0; static my_bool opt_short_log_format= 0; +static my_bool opt_ignore_wrong_options= 0; static uint kill_cached_threads, wake_thread; -static ulong thread_created; +ulong thread_created; static ulong max_used_connections; static ulong my_bind_addr; /**< the address we bind to */ static volatile ulong cached_thread_count= 0; @@ -469,6 +470,7 @@ const char *opt_binlog_format= binlog_format_names[opt_binlog_format_id]; static bool calling_initgroups= FALSE; /**< Used in SIGSEGV handler. */ #endif uint mysqld_port, test_flags, select_errors, dropping_tables, ha_open_options; +uint mysqld_extra_port; uint mysqld_port_timeout; uint delay_key_write_options, protocol_version; uint lower_case_table_names; @@ -495,6 +497,7 @@ ulong delayed_insert_errors,flush_time; ulong specialflag=0; ulong binlog_cache_use= 0, binlog_cache_disk_use= 0; ulong max_connections, max_connect_errors; +ulong extra_max_connections; uint max_user_connections= 0; /** Limit of the total number of prepared statements in the server. @@ -648,7 +651,7 @@ static int defaults_argc; static char **defaults_argv; static char *opt_bin_logname; -static my_socket unix_sock,ip_sock; +static my_socket unix_sock, base_ip_sock, extra_ip_sock; struct my_rnd_struct sql_rand; ///< used by sql_class.cc:THD::THD() #ifndef EMBEDDED_LIBRARY @@ -708,7 +711,7 @@ my_bool opt_enable_shared_memory; HANDLE smem_event_connect_request= 0; #endif -scheduler_functions thread_scheduler; +scheduler_functions thread_scheduler, extra_thread_scheduler; #define SSL_VARS_NOT_STATIC #include "sslopt-vars.h" @@ -735,7 +738,7 @@ struct st_VioSSLFd *ssl_acceptor_fd; Number of currently active user connections. The variable is protected by LOCK_connection_count. */ -uint connection_count= 0; +uint connection_count= 0, extra_connection_count= 0; /* Function declarations */ @@ -831,11 +834,17 @@ static void close_connections(void) DBUG_PRINT("quit",("Closing sockets")); if (!opt_disable_networking ) { - if (ip_sock != INVALID_SOCKET) + if (base_ip_sock != INVALID_SOCKET) { - (void) shutdown(ip_sock, SHUT_RDWR); - (void) closesocket(ip_sock); - ip_sock= INVALID_SOCKET; + (void) shutdown(base_ip_sock, SHUT_RDWR); + (void) closesocket(base_ip_sock); + base_ip_sock= INVALID_SOCKET; + } + if (extra_ip_sock != INVALID_SOCKET) + { + (void) shutdown(extra_ip_sock, SHUT_RDWR); + (void) closesocket(extra_ip_sock); + extra_ip_sock= INVALID_SOCKET; } } #ifdef __NT__ @@ -969,42 +978,39 @@ static void close_connections(void) } -static void close_server_sock() +static void close_socket(my_socket sock, const char *info) { -#ifdef HAVE_CLOSE_SERVER_SOCK - DBUG_ENTER("close_server_sock"); - my_socket tmp_sock; - tmp_sock=ip_sock; - if (tmp_sock != INVALID_SOCKET) + DBUG_ENTER("close_socket"); + + if (sock != INVALID_SOCKET) { - ip_sock=INVALID_SOCKET; - DBUG_PRINT("info",("calling shutdown on TCP/IP socket")); - VOID(shutdown(tmp_sock, SHUT_RDWR)); + DBUG_PRINT("info", ("calling shutdown on %s socket", info)); + (void) shutdown(sock, SHUT_RDWR); #if defined(__NETWARE__) /* The following code is disabled for normal systems as it causes MySQL to hang on AIX 4.3 during shutdown */ - DBUG_PRINT("info",("calling closesocket on TCP/IP socket")); - VOID(closesocket(tmp_sock)); + DBUG_PRINT("info", ("calling closesocket on %s socket", info)); + (void) closesocket(tmp_sock); #endif } - tmp_sock=unix_sock; - if (tmp_sock != INVALID_SOCKET) - { - unix_sock=INVALID_SOCKET; - DBUG_PRINT("info",("calling shutdown on unix socket")); - VOID(shutdown(tmp_sock, SHUT_RDWR)); -#if defined(__NETWARE__) - /* - The following code is disabled for normal systems as it may cause MySQL - to hang on AIX 4.3 during shutdown - */ - DBUG_PRINT("info",("calling closesocket on unix/IP socket")); - VOID(closesocket(tmp_sock)); -#endif + DBUG_VOID_RETURN; +} + + +static void close_server_sock() +{ +#ifdef HAVE_CLOSE_SERVER_SOCK + DBUG_ENTER("close_server_sock"); + + close_socket(base_ip_sock, "TCP/IP"); + close_socket(extra_ip_sock, "TCP/IP"); + close_socket(unix_sock, "unix/IP"); + + if (unix_sock != INVALID_SOCKET) VOID(unlink(mysqld_unix_port)); - } + base_ip_sock= extra_ip_sock= unix_sock= INVALID_SOCKET; DBUG_VOID_RETURN; #endif } @@ -1592,17 +1598,86 @@ static void set_root(const char *path) #endif } -static void network_init(void) +/** + Activate usage of a tcp port +*/ + +static my_socket activate_tcp_port(uint port) { - struct sockaddr_in IPaddr; -#ifdef HAVE_SYS_UN_H - struct sockaddr_un UNIXaddr; -#endif + struct sockaddr_in IPaddr; + my_socket ip_sock; int arg=1; int ret; uint waited; uint this_wait; uint retry; + DBUG_ENTER("activate_tcp_port"); + DBUG_PRINT("enter",("port: %u", port)); + + ip_sock = socket(AF_INET, SOCK_STREAM, 0); + if (ip_sock == INVALID_SOCKET) + { + DBUG_PRINT("error",("Got error: %d from socket()",socket_errno)); + sql_perror(ER(ER_IPSOCK_ERROR)); /* purecov: tested */ + unireg_abort(1); /* purecov: tested */ + } + bzero((char*) &IPaddr, sizeof(IPaddr)); + IPaddr.sin_family = AF_INET; + IPaddr.sin_addr.s_addr = my_bind_addr; + IPaddr.sin_port = (unsigned short) htons((unsigned short) port); + +#ifndef __WIN__ + /* + We should not use SO_REUSEADDR on windows as this would enable a + user to open two mysqld servers with the same TCP/IP port. + */ + (void) setsockopt(ip_sock,SOL_SOCKET,SO_REUSEADDR,(char*)&arg,sizeof(arg)); +#endif /* __WIN__ */ + /* + Sometimes the port is not released fast enough when stopping and + restarting the server. This happens quite often with the test suite + on busy Linux systems. Retry to bind the address at these intervals: + Sleep intervals: 1, 2, 4, 6, 9, 13, 17, 22, ... + Retry at second: 1, 3, 7, 13, 22, 35, 52, 74, ... + Limit the sequence by mysqld_port_timeout (set --port-open-timeout=#). + */ + + for (waited= 0, retry= 1; ; retry++, waited+= this_wait) + { + if (((ret= bind(ip_sock, my_reinterpret_cast(struct sockaddr *) (&IPaddr), + sizeof(IPaddr))) >= 0) || + (socket_errno != SOCKET_EADDRINUSE) || + (waited >= mysqld_port_timeout)) + break; + sql_print_information("Retrying bind on TCP/IP port %u", port); + this_wait= retry * retry / 3 + 1; + sleep(this_wait); + } + if (ret < 0) + { + DBUG_PRINT("error",("Got error: %d from bind",socket_errno)); + sql_perror("Can't start server: Bind on TCP/IP port"); + sql_print_error("Do you already have another mysqld server running on " + "port: %u ?", port); + unireg_abort(1); + } + if (listen(ip_sock,(int) back_log) < 0) + { + sql_perror("Can't start server: listen() on TCP/IP port"); + sql_print_error("listen() on TCP/IP failed with error %d", + socket_errno); + unireg_abort(1); + } + DBUG_RETURN(ip_sock); +} + + +static void network_init(void) +{ +#ifdef HAVE_SYS_UN_H + struct sockaddr_un UNIXaddr; +#endif + int arg=1; DBUG_ENTER("network_init"); LINT_INIT(ret); @@ -1611,61 +1686,12 @@ static void network_init(void) set_ports(); - if (mysqld_port != 0 && !opt_disable_networking && !opt_bootstrap) + if (!opt_disable_networking && !opt_bootstrap) { - DBUG_PRINT("general",("IP Socket is %d",mysqld_port)); - ip_sock = socket(AF_INET, SOCK_STREAM, 0); - if (ip_sock == INVALID_SOCKET) - { - DBUG_PRINT("error",("Got error: %d from socket()",socket_errno)); - sql_perror(ER(ER_IPSOCK_ERROR)); /* purecov: tested */ - unireg_abort(1); /* purecov: tested */ - } - bzero((char*) &IPaddr, sizeof(IPaddr)); - IPaddr.sin_family = AF_INET; - IPaddr.sin_addr.s_addr = my_bind_addr; - IPaddr.sin_port = (unsigned short) htons((unsigned short) mysqld_port); - -#ifndef __WIN__ - /* - We should not use SO_REUSEADDR on windows as this would enable a - user to open two mysqld servers with the same TCP/IP port. - */ - (void) setsockopt(ip_sock,SOL_SOCKET,SO_REUSEADDR,(char*)&arg,sizeof(arg)); -#endif /* __WIN__ */ - /* - Sometimes the port is not released fast enough when stopping and - restarting the server. This happens quite often with the test suite - on busy Linux systems. Retry to bind the address at these intervals: - Sleep intervals: 1, 2, 4, 6, 9, 13, 17, 22, ... - Retry at second: 1, 3, 7, 13, 22, 35, 52, 74, ... - Limit the sequence by mysqld_port_timeout (set --port-open-timeout=#). - */ - for (waited= 0, retry= 1; ; retry++, waited+= this_wait) - { - if (((ret= bind(ip_sock, my_reinterpret_cast(struct sockaddr *) (&IPaddr), - sizeof(IPaddr))) >= 0) || - (socket_errno != SOCKET_EADDRINUSE) || - (waited >= mysqld_port_timeout)) - break; - sql_print_information("Retrying bind on TCP/IP port %u", mysqld_port); - this_wait= retry * retry / 3 + 1; - sleep(this_wait); - } - if (ret < 0) - { - DBUG_PRINT("error",("Got error: %d from bind",socket_errno)); - sql_perror("Can't start server: Bind on TCP/IP port"); - sql_print_error("Do you already have another mysqld server running on port: %d ?",mysqld_port); - unireg_abort(1); - } - if (listen(ip_sock,(int) back_log) < 0) - { - sql_perror("Can't start server: listen() on TCP/IP port"); - sql_print_error("listen() on TCP/IP failed with error %d", - socket_errno); - unireg_abort(1); - } + if (mysqld_port) + base_ip_sock= activate_tcp_port(mysqld_port); + if (mysqld_extra_port) + extra_ip_sock= activate_tcp_port(mysqld_extra_port); } #ifdef __NT__ @@ -1829,7 +1855,7 @@ void unlink_thd(THD *thd) thd->cleanup(); pthread_mutex_lock(&LOCK_connection_count); - --connection_count; + (*thd->scheduler->connection_count)--; pthread_mutex_unlock(&LOCK_connection_count); (void) pthread_mutex_lock(&LOCK_thread_count); @@ -2438,15 +2464,17 @@ and this may fail.\n\n"); (ulong) dflt_key_cache->key_cache_mem_size); fprintf(stderr, "read_buffer_size=%ld\n", (long) global_system_variables.read_buff_size); fprintf(stderr, "max_used_connections=%lu\n", max_used_connections); - fprintf(stderr, "max_threads=%u\n", thread_scheduler.max_threads); + fprintf(stderr, "max_threads=%u\n", thread_scheduler.max_threads + + (uint) extra_max_connections); fprintf(stderr, "threads_connected=%u\n", thread_count); fprintf(stderr, "It is possible that mysqld could use up to \n\ key_buffer_size + (read_buffer_size + sort_buffer_size)*max_threads = %lu K\n\ bytes of memory\n", ((ulong) dflt_key_cache->key_cache_mem_size + (global_system_variables.read_buff_size + global_system_variables.sortbuff_size) * - thread_scheduler.max_threads + - max_connections * sizeof(THD)) / 1024); + (thread_scheduler.max_threads + extra_max_connections) + + (max_connections + extra_max_connections)* sizeof(THD)) + / 1024); fprintf(stderr, "Hope that's ok; if not, decrease some variables in the equation.\n\n"); #if defined(HAVE_LINUXTHREADS) @@ -2692,7 +2720,7 @@ pthread_handler_t signal_hand(void *arg __attribute__((unused))) This should actually be '+ max_number_of_slaves' instead of +10, but the +10 should be quite safe. */ - init_thr_alarm(thread_scheduler.max_threads + + init_thr_alarm(thread_scheduler.max_threads + extra_max_connections + global_system_variables.max_insert_delayed_threads + 10); if (test_flags & TEST_SIGINT) { @@ -3005,10 +3033,8 @@ sizeof(load_default_groups)/sizeof(load_default_groups[0]); The default value is taken from either opt_date_time_formats[] or the ISO format (ANSI SQL) - @retval - 0 ok - @retval - 1 error + @retval 0 ok + @retval 1 error */ static bool init_global_datetime_format(timestamp_type format_type, @@ -3303,7 +3329,8 @@ static int init_common_variables(const char *conf_file_name, int argc, uint files, wanted_files, max_open_files; /* MyISAM requires two file handles per table. */ - wanted_files= 10+max_connections+table_cache_size*2; + wanted_files= (10 + max_connections + extra_max_connections + + table_cache_size*2); /* We are trying to allocate no less than max_connections*5 file handles (i.e. we are trying to set the limit so that they will @@ -3314,7 +3341,8 @@ static int init_common_variables(const char *conf_file_name, int argc, can't get max_connections*5 but still got no less than was requested (value of wanted_files). */ - max_open_files= max(max(wanted_files, max_connections*5), + max_open_files= max(max(wanted_files, + (max_connections + extra_max_connections)*5), open_files_limit); files= my_set_max_open_files(max_open_files); @@ -4585,10 +4613,8 @@ static char *add_quoted_string(char *to, const char *from, char *to_end) @param file_path Path to this program @param startup_option Startup option to mysqld - @retval - 0 option handled - @retval - 1 Could not handle option + @retval 0 option handled + @retval 1 Could not handle option */ static bool @@ -4851,7 +4877,7 @@ void create_thread_to_handle_connection(THD *thd) (void) pthread_mutex_unlock(&LOCK_thread_count); pthread_mutex_lock(&LOCK_connection_count); - --connection_count; + (*thd->scheduler->connection_count)--; pthread_mutex_unlock(&LOCK_connection_count); statistic_increment(aborted_connects,&LOCK_status); @@ -4900,7 +4926,8 @@ static void create_new_thread(THD *thd) pthread_mutex_lock(&LOCK_connection_count); - if (connection_count >= max_connections + 1 || abort_loop) + if (*thd->scheduler->connection_count >= + *thd->scheduler->max_connections + 1|| abort_loop) { pthread_mutex_unlock(&LOCK_connection_count); @@ -4910,10 +4937,10 @@ static void create_new_thread(THD *thd) DBUG_VOID_RETURN; } - ++connection_count; + ++*thd->scheduler->connection_count; - if (connection_count > max_used_connections) - max_used_connections= connection_count; + if (connection_count + extra_connection_count > max_used_connections) + max_used_connections= connection_count + extra_connection_count; pthread_mutex_unlock(&LOCK_connection_count); @@ -4930,7 +4957,7 @@ static void create_new_thread(THD *thd) thread_count++; - thread_scheduler.add_connection(thd); + thd->scheduler->add_connection(thd); DBUG_VOID_RETURN; } @@ -4945,7 +4972,8 @@ inline void kill_broken_server() #if !defined(__NETWARE__) unix_sock == INVALID_SOCKET || #endif - (!opt_disable_networking && ip_sock == INVALID_SOCKET)) + (!opt_disable_networking && + (base_ip_sock == INVALID_SOCKET || extra_ip_sock != INVALID_SOCKET))) { select_thread_in_use = 0; /* The following call will never return */ @@ -4964,26 +4992,38 @@ pthread_handler_t handle_connections_sockets(void *arg __attribute__((unused))) { my_socket sock,new_sock; uint error_count=0; - uint max_used_connection= (uint) (max(ip_sock,unix_sock)+1); + uint max_used_connection; fd_set readFDs,clientFDs; THD *thd; struct sockaddr_in cAddr; - int ip_flags=0,socket_flags=0,flags; + int base_ip_flags=0, extra_ip_flags= 0, socket_flags=0, flags; st_vio *vio_tmp; DBUG_ENTER("handle_connections_sockets"); + max_used_connection= (uint) (max(base_ip_sock, unix_sock)); + max_used_connection= (uint) (max(extra_ip_sock, (int) max_used_connection)); + max_used_connection++; + LINT_INIT(new_sock); (void) my_pthread_getprio(pthread_self()); // For debugging FD_ZERO(&clientFDs); - if (ip_sock != INVALID_SOCKET) + if (base_ip_sock != INVALID_SOCKET) { - FD_SET(ip_sock,&clientFDs); + FD_SET(base_ip_sock, &clientFDs); #ifdef HAVE_FCNTL - ip_flags = fcntl(ip_sock, F_GETFL, 0); + base_ip_flags = fcntl(base_ip_sock, F_GETFL, 0); #endif } + if (extra_ip_sock != INVALID_SOCKET) + { + FD_SET(extra_ip_sock, &clientFDs); +#ifdef HAVE_FCNTL + extra_ip_flags = fcntl(extra_ip_sock, F_GETFL, 0); +#endif + } + #ifdef HAVE_SYS_UN_H FD_SET(unix_sock,&clientFDs); #ifdef HAVE_FCNTL @@ -5021,14 +5061,22 @@ pthread_handler_t handle_connections_sockets(void *arg __attribute__((unused))) #ifdef HAVE_SYS_UN_H if (FD_ISSET(unix_sock,&readFDs)) { - sock = unix_sock; + sock= unix_sock; flags= socket_flags; } else #endif { - sock = ip_sock; - flags= ip_flags; + if (FD_ISSET(base_ip_sock,&readFDs)) + { + sock= base_ip_sock; + flags= base_ip_flags; + } + else + { + sock= extra_ip_sock; + flags= extra_ip_flags; + } } #if !defined(NO_FCNTL_NONBLOCK) @@ -5081,7 +5129,7 @@ pthread_handler_t handle_connections_sockets(void *arg __attribute__((unused))) #ifdef HAVE_LIBWRAP { - if (sock == ip_sock) + if (sock == base_ip_sock || sock == extra_ip_sock) { struct request_info req; signal(SIGCHLD, SIG_DFL); @@ -5161,6 +5209,11 @@ pthread_handler_t handle_connections_sockets(void *arg __attribute__((unused))) if (sock == unix_sock) thd->security_ctx->host=(char*) my_localhost; + if (sock == extra_ip_sock) + { + thd->extra_port= 1; + thd->scheduler= &extra_thread_scheduler; + } create_new_thread(thd); } @@ -5502,6 +5555,7 @@ enum options_mysqld OPT_SKIP_GRANT, OPT_SKIP_LOCK, OPT_ENABLE_LOCK, OPT_USE_LOCKING, OPT_SOCKET, OPT_UPDATE_LOG, + OPT_EXTRA_PORT, OPT_BIN_LOG, OPT_SKIP_RESOLVE, OPT_SKIP_NETWORKING, OPT_BIN_LOG_INDEX, OPT_BIND_ADDRESS, OPT_PID_FILE, @@ -5660,6 +5714,7 @@ enum options_mysqld OPT_MIN_EXAMINED_ROW_LIMIT, OPT_LOG_SLOW_SLAVE_STATEMENTS, OPT_DEBUG_CRC, OPT_DEBUG_ON, OPT_OLD_MODE, + OPT_TEST_IGNORE_WRONG_OPTIONS, OPT_SLAVE_EXEC_MODE, OPT_DEADLOCK_SEARCH_DEPTH_SHORT, OPT_DEADLOCK_SEARCH_DEPTH_LONG, @@ -5884,6 +5939,15 @@ struct my_option my_long_options[] = GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, /* We must always support the next option to make scripts like mysqltest easier to do */ + {"extra-port", OPT_EXTRA_PORT, + "Extra port number to use for tcp-connections in a one-thread-per-connection manner. 0 means don't use another port", + (uchar**) &mysqld_extra_port, + (uchar**) &mysqld_extra_port, 0, GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + {"extra-max-connections", OPT_MAX_CONNECTIONS, + "The number of connections on 'extra-port.", + (uchar**) &extra_max_connections, + (uchar**) &extra_max_connections, 0, GET_ULONG, REQUIRED_ARG, 1, 1, 100000, + 0, 1, 0}, {"gdb", OPT_DEBUGGING, "Set up signals usable for debugging", (uchar**) &opt_debugging, (uchar**) &opt_debugging, @@ -6471,6 +6535,10 @@ log and this option does nothing anymore.", (uchar**) &use_temp_pool, (uchar**) &use_temp_pool, 0, GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0}, + {"test-ignore-wrong-options", OPT_TEST_IGNORE_WRONG_OPTIONS, + "Ignore wrong enums values in command line arguments. Usefull only for test scripts", + (uchar**) &opt_ignore_wrong_options, (uchar**) &opt_ignore_wrong_options, + 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, {"timed_mutexes", OPT_TIMED_MUTEXES, "Specify whether to time mutexes (only InnoDB mutexes are currently supported)", (uchar**) &timed_mutexes, (uchar**) &timed_mutexes, 0, GET_BOOL, NO_ARG, 0, @@ -7648,7 +7716,7 @@ static int mysql_init_variables(void) slave_exec_mode_options= (uint) find_bit_type_or_exit(slave_exec_mode_str, &slave_exec_mode_typelib, NULL); opt_specialflag= SPECIAL_ENGLISH; - unix_sock= ip_sock= INVALID_SOCKET; + unix_sock= base_ip_sock= extra_ip_sock= INVALID_SOCKET; mysql_home_ptr= mysql_home; pidfile_name_ptr= pidfile_name; log_error_file_ptr= log_error_file; @@ -7820,6 +7888,38 @@ static int mysql_init_variables(void) } +/** + Find type for option + + If opt_ignore_wrong_options is set ignore wrong values + otherwise exit + + @return + @retval 0 ok ; *result is updated + @retval 1 error ; *result is not touched +*/ + +static my_bool find_opt_type(const char *x, TYPELIB *typelib, + const char *option, int *result) +{ + int res; + + if (opt_ignore_wrong_options) + { + if ((res= find_type_with_warning(x, typelib, option)) <= 0) + return 1; + } + else + res= find_type_or_exit(x, typelib, option); + *result= res; + return 0; +} + + +/** + Get next option from the command line +*/ + my_bool mysqld_get_one_option(int optid, const struct my_option *opt __attribute__((unused)), @@ -7923,9 +8023,11 @@ mysqld_get_one_option(int optid, case (int) OPT_INIT_RPL_ROLE: { int role; - role= find_type_or_exit(argument, &rpl_role_typelib, opt->name); - rpl_status = (role == 1) ? RPL_AUTH_MASTER : RPL_IDLE_SLAVE; - break; + if (!find_opt_type(argument, &rpl_role_typelib, opt->name, &role)) + { + rpl_status = (role == 1) ? RPL_AUTH_MASTER : RPL_IDLE_SLAVE; + break; + } } case (int)OPT_REPLICATE_IGNORE_DB: { @@ -7979,8 +8081,10 @@ mysqld_get_one_option(int optid, case OPT_BINLOG_FORMAT: { int id; - id= find_type_or_exit(argument, &binlog_format_typelib, opt->name); - global_system_variables.binlog_format= opt_binlog_format_id= id - 1; + if (!find_opt_type(argument, &binlog_format_typelib, opt->name, &id)) + { + global_system_variables.binlog_format= opt_binlog_format_id= id - 1; + } break; } case (int)OPT_BINLOG_DO_DB: @@ -8093,7 +8197,7 @@ mysqld_get_one_option(int optid, exit(1); #endif opt_disable_networking=1; - mysqld_port=0; + mysqld_port= mysqld_extra_port= 0; break; case (int) OPT_SKIP_SHOW_DB: opt_skip_show_db=1; @@ -8192,8 +8296,8 @@ mysqld_get_one_option(int optid, else { int type; - type= find_type_or_exit(argument, &delay_key_write_typelib, opt->name); - delay_key_write_options= (uint) type-1; + if (!find_opt_type(argument, &delay_key_write_typelib, opt->name, &type)) + delay_key_write_options= (uint) type-1; } break; case OPT_CHARSETS_DIR: @@ -8203,8 +8307,8 @@ mysqld_get_one_option(int optid, case OPT_TX_ISOLATION: { int type; - type= find_type_or_exit(argument, &tx_isolation_typelib, opt->name); - global_system_variables.tx_isolation= (type-1); + if (!find_opt_type(argument, &tx_isolation_typelib, opt->name, &type)) + global_system_variables.tx_isolation= (type-1); break; } #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE @@ -8233,8 +8337,8 @@ mysqld_get_one_option(int optid, break; case OPT_NDB_DISTRIBUTION: int id; - id= find_type_or_exit(argument, &ndb_distribution_typelib, opt->name); - opt_ndb_distribution_id= (enum ndb_distribution)(id-1); + if (!find_opt_type(argument, &ndb_distribution_typelib, opt->name, &id)) + opt_ndb_distribution_id= (enum ndb_distribution)(id-1); break; case OPT_NDB_EXTRA_LOGGING: if (!argument) @@ -8274,9 +8378,8 @@ mysqld_get_one_option(int optid, myisam_concurrent_insert= 0; /* --skip-concurrent-insert */ break; case OPT_TC_HEURISTIC_RECOVER: - tc_heuristic_recover= find_type_or_exit(argument, - &tc_heuristic_recover_typelib, - opt->name); + find_opt_type(argument, &tc_heuristic_recover_typelib, + opt->name, (int*) &tc_heuristic_recover); break; case OPT_MYISAM_STATS_METHOD: { @@ -8285,21 +8388,23 @@ mysqld_get_one_option(int optid, LINT_INIT(method_conv); myisam_stats_method_str= argument; - method= find_type_or_exit(argument, &myisam_stats_method_typelib, - opt->name); - switch (method-1) { - case 2: - method_conv= MI_STATS_METHOD_IGNORE_NULLS; - break; - case 1: - method_conv= MI_STATS_METHOD_NULLS_EQUAL; - break; - case 0: - default: - method_conv= MI_STATS_METHOD_NULLS_NOT_EQUAL; - break; + if (!find_opt_type(argument, &myisam_stats_method_typelib, + opt->name, &method)) + { + switch (method-1) { + case 2: + method_conv= MI_STATS_METHOD_IGNORE_NULLS; + break; + case 1: + method_conv= MI_STATS_METHOD_NULLS_EQUAL; + break; + case 0: + default: + method_conv= MI_STATS_METHOD_NULLS_NOT_EQUAL; + break; + } + global_system_variables.myisam_stats_method= method_conv; } - global_system_variables.myisam_stats_method= method_conv; break; } case OPT_SQL_MODE: @@ -8317,8 +8422,9 @@ mysqld_get_one_option(int optid, break; case OPT_THREAD_HANDLING: { - global_system_variables.thread_handling= - find_type_or_exit(argument, &thread_handling_typelib, opt->name)-1; + int id; + if (!find_opt_type(argument, &thread_handling_typelib, opt->name, &id)) + global_system_variables.thread_handling= id - 1; break; } case OPT_FT_BOOLEAN_SYNTAX: @@ -8338,6 +8444,10 @@ mysqld_get_one_option(int optid, lower_case_table_names= argument ? atoi(argument) : 1; lower_case_table_names_used= 1; break; + case OPT_TEST_IGNORE_WRONG_OPTIONS: + /* Used for testing options */ + opt_ignore_wrong_options= 1; + break; } return 0; } @@ -8483,14 +8593,19 @@ static void get_options(int *argc,char **argv) #ifdef EMBEDDED_LIBRARY one_thread_scheduler(&thread_scheduler); + one_thread_scheduler(&extra_thread_scheduler); #else if (global_system_variables.thread_handling <= SCHEDULER_ONE_THREAD_PER_CONNECTION) - one_thread_per_connection_scheduler(&thread_scheduler); + one_thread_per_connection_scheduler(&thread_scheduler, &max_connections, + &connection_count); else if (global_system_variables.thread_handling == SCHEDULER_NO_THREADS) one_thread_scheduler(&thread_scheduler); else pool_of_threads_scheduler(&thread_scheduler); /* purecov: tested */ + one_thread_per_connection_scheduler(&extra_thread_scheduler, + &extra_max_connections, + &extra_connection_count); #endif } @@ -8715,12 +8830,9 @@ skip: ; @param dir_name Directory to test - @retval - -1 Don't know (Test failed) - @retval - 0 File system is case sensitive - @retval - 1 File system is case insensitive + @retval -1 Don't know (Test failed) + @retval 0 File system is case sensitive + @retval 1 File system is case insensitive */ static int test_if_case_insensitive(const char *dir_name) diff --git a/sql/scheduler.cc b/sql/scheduler.cc index b05bdf4756f..be068859a6f 100644 --- a/sql/scheduler.cc +++ b/sql/scheduler.cc @@ -22,6 +22,9 @@ #endif #include <mysql_priv.h> +#if MYSQL_VERSION_ID >= 60000 +#include "sql_audit.h" +#endif /* 'Dummy' functions to be used when we don't need any handling for a scheduler @@ -29,7 +32,7 @@ */ static bool init_dummy(void) {return 0;} -static void post_kill_dummy(THD* thd) {} +static void post_kill_dummy(THD *thd) {} static void end_dummy(void) {} static bool end_thread_dummy(THD *thd, bool cache_thread) { return 0; } @@ -63,9 +66,12 @@ static bool no_threads_end(THD *thd, bool put_in_cache) Initailize scheduler for --thread-handling=no-threads */ -void one_thread_scheduler(scheduler_functions* func) +void one_thread_scheduler(scheduler_functions *func) { func->max_threads= 1; + max_connections= 1; + func->max_connections= &max_connections; + func->connection_count= &connection_count; #ifndef EMBEDDED_LIBRARY func->add_connection= handle_connection_in_main_thread; #endif @@ -79,10 +85,674 @@ void one_thread_scheduler(scheduler_functions* func) */ #ifndef EMBEDDED_LIBRARY -void one_thread_per_connection_scheduler(scheduler_functions* func) +void one_thread_per_connection_scheduler(scheduler_functions *func, + ulong *arg_max_connections, + uint *arg_connection_count) { - func->max_threads= max_connections; + func->max_threads= *arg_max_connections + 1; + func->max_connections= arg_max_connections; + func->connection_count= arg_connection_count; func->add_connection= create_thread_to_handle_connection; func->end_thread= one_thread_per_connection_end; } #endif /* EMBEDDED_LIBRARY */ + + +#if defined(HAVE_LIBEVENT) && HAVE_POOL_OF_THREADS == 1 + +#include "event.h" + +static struct event_base *base; + +static uint created_threads, killed_threads; +static bool kill_pool_threads; + +static struct event thd_add_event; +static struct event thd_kill_event; + +static pthread_mutex_t LOCK_thd_add; /* protects thds_need_adding */ +static LIST *thds_need_adding; /* list of thds to add to libevent queue */ + +static int thd_add_pair[2]; /* pipe to signal add a connection to libevent*/ +static int thd_kill_pair[2]; /* pipe to signal kill a connection in libevent */ + +/* + LOCK_event_loop protects the non-thread safe libevent calls (event_add and + event_del) and thds_need_processing and thds_waiting_for_io. +*/ +static pthread_mutex_t LOCK_event_loop; +static LIST *thds_need_processing; /* list of thds that needs some processing */ +static LIST *thds_waiting_for_io; /* list of thds with added events */ + +pthread_handler_t libevent_thread_proc(void *arg); +static void libevent_end(); +static bool libevent_needs_immediate_processing(THD *thd); +static void libevent_connection_close(THD *thd); +static bool libevent_should_close_connection(THD* thd); +static void libevent_thd_add(THD* thd); +void libevent_io_callback(int Fd, short Operation, void *ctx); +void libevent_add_thd_callback(int Fd, short Operation, void *ctx); +void libevent_kill_thd_callback(int Fd, short Operation, void *ctx); + + +/* + Create a pipe and set to non-blocking. + Returns TRUE if there is an error. +*/ + +static bool init_socketpair(int sock_pair[]) +{ + sock_pair[0]= sock_pair[1]= -1; + return (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, sock_pair) < 0 || + evutil_make_socket_nonblocking(sock_pair[0]) == -1 || + evutil_make_socket_nonblocking(sock_pair[1]) == -1); +} + +static void close_socketpair(int sock_pair[]) +{ + if (sock_pair[0] != -1) + EVUTIL_CLOSESOCKET(sock_pair[0]); + if (sock_pair[1] != -1) + EVUTIL_CLOSESOCKET(sock_pair[1]); +} + +/* + thd_scheduler keeps the link between THD and events. + It's embedded in the THD class. +*/ + +thd_scheduler::thd_scheduler() + : logged_in(FALSE), io_event(NULL), thread_attached(FALSE) +{ +#ifndef DBUG_OFF + dbug_explain[0]= '\0'; + set_explain= FALSE; +#endif +} + + +thd_scheduler::~thd_scheduler() +{ + my_free(io_event, MYF(MY_ALLOW_ZERO_PTR)); +} + + +bool thd_scheduler::init(THD *parent_thd) +{ + io_event= + (struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME)); + + if (!io_event) + { + sql_print_error("Memory allocation error in thd_scheduler::init\n"); + return TRUE; + } + + event_set(io_event, (int)parent_thd->net.vio->sd, EV_READ, + libevent_io_callback, (void*)parent_thd); + + list.data= parent_thd; + + return FALSE; +} + + +/* + Attach/associate the connection with the OS thread, for command processing. +*/ + +bool thd_scheduler::thread_attach() +{ + DBUG_ASSERT(!thread_attached); + THD* thd = (THD*)list.data; + if (libevent_should_close_connection(thd) || + setup_connection_thread_globals(thd)) + { + return TRUE; + } + my_errno= 0; + thd->mysys_var->abort= 0; + thread_attached= TRUE; +#ifndef DBUG_OFF + /* + When we attach the thread for a connection for the first time, + we know that there is no session value set yet. Thus + the initial setting of set_explain to FALSE is OK. + */ + if (set_explain) + DBUG_SET(dbug_explain); +#endif + return FALSE; +} + + +/* + Detach/disassociate the connection with the OS thread. +*/ + +void thd_scheduler::thread_detach() +{ + if (thread_attached) + { + THD* thd = (THD*)list.data; + pthread_mutex_lock(&thd->LOCK_delete); + thd->mysys_var= NULL; + pthread_mutex_unlock(&thd->LOCK_delete); + thread_attached= FALSE; +#ifndef DBUG_OFF + /* + If during the session @@session.dbug was assigned, the + dbug options/state has been pushed. Check if this is the + case, to be able to restore the state when we attach this + logical connection to a physical thread. + */ + if (_db_is_pushed_()) + { + set_explain= TRUE; + if (DBUG_EXPLAIN(dbug_explain, sizeof(dbug_explain))) + sql_print_error("thd_scheduler: DBUG_EXPLAIN buffer is too small"); + } + /* DBUG_POP() is a no-op in case there is no session state */ + DBUG_POP(); +#endif + } +} + +/** + Create all threads for the thread pool + + NOTES + After threads are created we wait until all threads has signaled that + they have started before we return + + RETURN + 0 ok + 1 We got an error creating the thread pool + In this case we will abort all created threads +*/ + +static bool libevent_init(void) +{ + uint i; + DBUG_ENTER("libevent_init"); + + base= (struct event_base *) event_init(); + + created_threads= 0; + killed_threads= 0; + kill_pool_threads= FALSE; + + pthread_mutex_init(&LOCK_event_loop, NULL); + pthread_mutex_init(&LOCK_thd_add, NULL); + + /* set up sockets used to add new thds to the event pool */ + if (init_socketpair(thd_add_pair)) + { + sql_print_error("init_socketpair(thd_add_spair) error in libevent_init"); + DBUG_RETURN(1); + } + /* set up sockets used to kill thds in the event queue */ + if (init_socketpair(thd_kill_pair)) + { + sql_print_error("init_socketpair(thd_kill_pair) error in libevent_init"); + close_socketpair(thd_add_pair); + DBUG_RETURN(1); + } + event_set(&thd_add_event, thd_add_pair[0], EV_READ|EV_PERSIST, + libevent_add_thd_callback, NULL); + event_set(&thd_kill_event, thd_kill_pair[0], EV_READ|EV_PERSIST, + libevent_kill_thd_callback, NULL); + + if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL)) + { + sql_print_error("thd_add_event event_add error in libevent_init"); + libevent_end(); + DBUG_RETURN(1); + } + /* Set up the thread pool */ + created_threads= killed_threads= 0; + pthread_mutex_lock(&LOCK_thread_count); + + for (i= 0; i < thread_pool_size; i++) + { + pthread_t thread; + int error; + if ((error= pthread_create(&thread, &connection_attrib, + libevent_thread_proc, 0))) + { + sql_print_error("Can't create completion port thread (error %d)", + error); + pthread_mutex_unlock(&LOCK_thread_count); + libevent_end(); // Cleanup + DBUG_RETURN(TRUE); + } + } + + /* Wait until all threads are created */ + while (created_threads != thread_pool_size) + pthread_cond_wait(&COND_thread_count,&LOCK_thread_count); + pthread_mutex_unlock(&LOCK_thread_count); + + DBUG_PRINT("info", ("%u threads created", (uint) thread_pool_size)); + DBUG_RETURN(FALSE); +} + + +/* + This is called when data is ready on the socket. + + NOTES + This is only called by the thread that owns LOCK_event_loop. + + We add the thd that got the data to thds_need_processing, and + cause the libevent event_loop() to terminate. Then this same thread will + return from event_loop and pick the thd value back up for processing. +*/ + +void libevent_io_callback(int, short, void *ctx) +{ + safe_mutex_assert_owner(&LOCK_event_loop); + THD *thd= (THD*)ctx; + thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->event_scheduler.list); + thds_need_processing= list_add(thds_need_processing, &thd->event_scheduler.list); +} + +/* + This is called when we have a thread we want to be killed. + + NOTES + This is only called by the thread that owns LOCK_event_loop. +*/ + +void libevent_kill_thd_callback(int Fd, short, void*) +{ + safe_mutex_assert_owner(&LOCK_event_loop); + + /* clear the pending events */ + char c; + while (recv(Fd, &c, sizeof(c), 0) == sizeof(c)) + {} + + LIST* list= thds_waiting_for_io; + while (list) + { + THD *thd= (THD*)list->data; + list= list_rest(list); + if (thd->killed == THD::KILL_CONNECTION) + { + /* + Delete from libevent and add to the processing queue. + */ + event_del(thd->event_scheduler.io_event); + thds_waiting_for_io= list_delete(thds_waiting_for_io, + &thd->event_scheduler.list); + thds_need_processing= list_add(thds_need_processing, + &thd->event_scheduler.list); + } + } +} + + +/* + This is used to add connections to the pool. This callback is invoked from + the libevent event_loop() call whenever the thd_add_pair[1] has a byte + written to it. + + NOTES + This is only called by the thread that owns LOCK_event_loop. +*/ + +void libevent_add_thd_callback(int Fd, short, void *) +{ + safe_mutex_assert_owner(&LOCK_event_loop); + + /* clear the pending events */ + char c; + while (recv(Fd, &c, sizeof(c), 0) == sizeof(c)) + {} + + pthread_mutex_lock(&LOCK_thd_add); + while (thds_need_adding) + { + /* pop the first thd off the list */ + THD* thd= (THD*)thds_need_adding->data; + thds_need_adding= list_delete(thds_need_adding, thds_need_adding); + + pthread_mutex_unlock(&LOCK_thd_add); + + if (!thd->event_scheduler.logged_in || libevent_should_close_connection(thd)) + { + /* + Add thd to thds_need_processing list. If it needs closing we'll close + it outside of event_loop(). + */ + thds_need_processing= list_add(thds_need_processing, + &thd->event_scheduler.list); + } + else + { + /* Add to libevent */ + if (event_add(thd->event_scheduler.io_event, NULL)) + { + sql_print_error("event_add error in libevent_add_thd_callback"); + libevent_connection_close(thd); + } + else + { + thds_waiting_for_io= list_add(thds_waiting_for_io, + &thd->event_scheduler.list); + } + } + pthread_mutex_lock(&LOCK_thd_add); + } + pthread_mutex_unlock(&LOCK_thd_add); +} + + +/** + Notify the thread pool about a new connection + + NOTES + LOCK_thread_count is locked on entry. This function MUST unlock it! +*/ + +static void libevent_add_connection(THD *thd) +{ + DBUG_ENTER("libevent_add_connection"); + DBUG_PRINT("enter", ("thd: %p thread_id: %lu", + thd, thd->thread_id)); + + if (thd->event_scheduler.init(thd)) + { + sql_print_error("Scheduler init error in libevent_add_new_connection"); + pthread_mutex_unlock(&LOCK_thread_count); + libevent_connection_close(thd); + DBUG_VOID_RETURN; + } + threads.append(thd); + libevent_thd_add(thd); + + pthread_mutex_unlock(&LOCK_thread_count); + DBUG_VOID_RETURN; +} + + +/** + @brief Signal a waiting connection it's time to die. + + @details This function will signal libevent the THD should be killed. + Either the global LOCK_thd_count or the THD's LOCK_delete must be locked + upon entry. + + @param[in] thd The connection to kill +*/ + +static void libevent_post_kill_notification(THD *) +{ + /* + Note, we just wake up libevent with an event that a THD should be killed, + It will search its list of thds for thd->killed == KILL_CONNECTION to + find the THDs it should kill. + + So we don't actually tell it which one and we don't actually use the + THD being passed to us, but that's just a design detail that could change + later. + */ + char c= 0; + send(thd_kill_pair[1], &c, sizeof(c), 0); +} + + +/* + Close and delete a connection. +*/ + +static void libevent_connection_close(THD *thd) +{ + DBUG_ENTER("libevent_connection_close"); + DBUG_PRINT("enter", ("thd: %p", thd)); + + thd->killed= THD::KILL_CONNECTION; // Avoid error messages + + if (thd->net.vio->sd >= 0) // not already closed + { + end_connection(thd); + close_connection(thd, 0, 1); + } + thd->event_scheduler.thread_detach(); + unlink_thd(thd); /* locks LOCK_thread_count and deletes thd */ + pthread_mutex_unlock(&LOCK_thread_count); + + DBUG_VOID_RETURN; +} + + +/* + Returns true if we should close and delete a THD connection. +*/ + +static bool libevent_should_close_connection(THD* thd) +{ + return thd->net.error || + thd->net.vio == 0 || + thd->killed == THD::KILL_CONNECTION; +} + + +/* + libevent_thread_proc is the outer loop of each thread in the thread pool. + These procs only return/terminate on shutdown (kill_pool_threads == true). +*/ + +pthread_handler_t libevent_thread_proc(void *arg) +{ + if (init_new_connection_handler_thread()) + { + my_thread_global_end(); + sql_print_error("libevent_thread_proc: my_thread_init() failed"); + exit(1); + } + DBUG_ENTER("libevent_thread_proc"); + + /* + Signal libevent_init() when all threads has been created and are ready to + receive events. + */ + (void) pthread_mutex_lock(&LOCK_thread_count); + created_threads++; + thread_created++; + if (created_threads == thread_pool_size) + (void) pthread_cond_signal(&COND_thread_count); + (void) pthread_mutex_unlock(&LOCK_thread_count); + + for (;;) + { + THD *thd= NULL; + (void) pthread_mutex_lock(&LOCK_event_loop); + + /* get thd(s) to process */ + while (!thds_need_processing) + { + if (kill_pool_threads) + { + /* the flag that we should die has been set */ + (void) pthread_mutex_unlock(&LOCK_event_loop); + goto thread_exit; + } + event_loop(EVLOOP_ONCE); + } + + /* pop the first thd off the list */ + thd= (THD*)thds_need_processing->data; + thds_need_processing= list_delete(thds_need_processing, + thds_need_processing); + + (void) pthread_mutex_unlock(&LOCK_event_loop); + + /* now we process the connection (thd) */ + + /* set up the thd<->thread links. */ + thd->thread_stack= (char*) &thd; + + if (thd->event_scheduler.thread_attach()) + { + libevent_connection_close(thd); + continue; + } + + /* is the connection logged in yet? */ + if (!thd->event_scheduler.logged_in) + { + DBUG_PRINT("info", ("init new connection. sd: %d", + thd->net.vio->sd)); + lex_start(thd); + if (login_connection(thd)) + { + /* Failed to log in */ + libevent_connection_close(thd); + continue; + } + else + { + /* login successful */ +#if MYSQL_VERSION_ID >= 60000 + MYSQL_CONNECTION_START(thd->thread_id, thd->security_ctx->priv_user, + (char *) thd->security_ctx->host_or_ip); +#endif + thd->event_scheduler.logged_in= TRUE; + prepare_new_connection_state(thd); + if (!libevent_needs_immediate_processing(thd)) + continue; /* New connection is now waiting for data in libevent*/ + } + } + + do + { + /* Process a query */ + if (do_command(thd)) + { + libevent_connection_close(thd); + break; + } + } while (libevent_needs_immediate_processing(thd)); + } + +thread_exit: + DBUG_PRINT("exit", ("ending thread")); + (void) pthread_mutex_lock(&LOCK_thread_count); + killed_threads++; + pthread_cond_broadcast(&COND_thread_count); + (void) pthread_mutex_unlock(&LOCK_thread_count); + my_thread_end(); + pthread_exit(0); + DBUG_RETURN(0); /* purify: deadcode */ +} + + +/* + Returns TRUE if the connection needs immediate processing and FALSE if + instead it's queued for libevent processing or closed, +*/ + +static bool libevent_needs_immediate_processing(THD *thd) +{ + if (libevent_should_close_connection(thd)) + { + libevent_connection_close(thd); + return FALSE; + } + /* + If more data in the socket buffer, return TRUE to process another command. + + Note: we cannot add for event processing because the whole request might + already be buffered and we wouldn't receive an event. + */ + if (vio_pending(thd->net.vio) > 0) + return TRUE; + + thd->event_scheduler.thread_detach(); + libevent_thd_add(thd); + return FALSE; +} + + +/* + Adds a THD to queued for libevent processing. + + This call does not actually register the event with libevent. + Instead, it places the THD onto a queue and signals libevent by writing + a byte into thd_add_pair, which will cause our libevent_add_thd_callback to + be invoked which will find the THD on the queue and add it to libevent. +*/ + +static void libevent_thd_add(THD* thd) +{ + char c= 0; + /* release any audit resources, this thd is going to sleep */ +#if MYSQL_VERSION_ID >= 60000 + mysql_audit_release(thd); +#endif + pthread_mutex_lock(&LOCK_thd_add); + /* queue for libevent */ + thds_need_adding= list_add(thds_need_adding, &thd->event_scheduler.list); + /* notify libevent */ + send(thd_add_pair[1], &c, sizeof(c), 0); + pthread_mutex_unlock(&LOCK_thd_add); +} + + +/** + Wait until all pool threads have been deleted for clean shutdown +*/ + +static void libevent_end() +{ + DBUG_ENTER("libevent_end"); + DBUG_PRINT("enter", ("created_threads: %d killed_threads: %u", + created_threads, killed_threads)); + + /* + check if initialized. This may not be the case if get an error at + startup + */ + if (!base) + DBUG_VOID_RETURN; + + (void) pthread_mutex_lock(&LOCK_thread_count); + + + kill_pool_threads= TRUE; + while (killed_threads != created_threads) + { + /* wake up the event loop */ + char c= 0; + send(thd_add_pair[1], &c, sizeof(c), 0); + pthread_cond_wait(&COND_thread_count, &LOCK_thread_count); + } + (void) pthread_mutex_unlock(&LOCK_thread_count); + + event_del(&thd_add_event); + close_socketpair(thd_add_pair); + event_del(&thd_kill_event); + close_socketpair(thd_kill_pair); + event_base_free(base); + base= 0; + + (void) pthread_mutex_destroy(&LOCK_event_loop); + (void) pthread_mutex_destroy(&LOCK_thd_add); + DBUG_VOID_RETURN; +} + + +void pool_of_threads_scheduler(scheduler_functions* func) +{ + func->max_threads= thread_pool_size; + func->max_connections= &max_connections; + func->connection_count= &connection_count; + func->init= libevent_init; + func->end= libevent_end; + func->post_kill_notification= libevent_post_kill_notification; + func->add_connection= libevent_add_connection; +} + +#endif diff --git a/sql/scheduler.h b/sql/scheduler.h index 46bbd300cbb..c3610bdc07b 100644 --- a/sql/scheduler.h +++ b/sql/scheduler.h @@ -28,7 +28,8 @@ class THD; class scheduler_functions { public: - uint max_threads; + uint max_threads, *connection_count; + ulong *max_connections; bool (*init)(void); bool (*init_new_connection_thread)(void); void (*add_connection)(THD *thd); @@ -45,16 +46,45 @@ enum scheduler_types SCHEDULER_POOL_OF_THREADS }; -void one_thread_per_connection_scheduler(scheduler_functions* func); +void one_thread_per_connection_scheduler(scheduler_functions *func, + ulong *arg_max_connections, + uint *arg_connection_count); void one_thread_scheduler(scheduler_functions* func); -enum pool_command_op +#if defined(HAVE_LIBEVENT) && !defined(EMBEDDED_LIBRARY) + +#define HAVE_POOL_OF_THREADS 1 + +struct event; + +class thd_scheduler { - NOT_IN_USE_OP= 0, NORMAL_OP= 1, CONNECT_OP, KILL_OP, DIE_OP +public: + bool logged_in; + struct event* io_event; + LIST list; + bool thread_attached; /* Indicates if THD is attached to the OS thread */ + +#ifndef DBUG_OFF + char dbug_explain[256]; + bool set_explain; +#endif + + thd_scheduler(); + ~thd_scheduler(); + bool init(THD* parent_thd); + bool thread_attach(); + void thread_detach(); }; +void pool_of_threads_scheduler(scheduler_functions* func); + +#else + #define HAVE_POOL_OF_THREADS 0 /* For easyer tests */ #define pool_of_threads_scheduler(A) one_thread_per_connection_scheduler(A) class thd_scheduler {}; + +#endif diff --git a/sql/set_var.cc b/sql/set_var.cc index 3544d9546f9..7ca6fe4d979 100644 --- a/sql/set_var.cc +++ b/sql/set_var.cc @@ -265,6 +265,13 @@ static sys_var_long_ptr sys_delayed_queue_size(&vars, "delayed_queue_size", static sys_var_event_scheduler sys_event_scheduler(&vars, "event_scheduler"); #endif +static sys_var_const sys_extra_port(&vars, "extra_port", + OPT_GLOBAL, SHOW_INT, + (uchar*) &mysqld_extra_port); +static sys_var_long_ptr sys_extra_max_connections(&vars, + "extra_max_connections", + &extra_max_connections, + fix_max_connections); static sys_var_long_ptr sys_expire_logs_days(&vars, "expire_logs_days", &expire_logs_days); static sys_var_bool_ptr sys_flush(&vars, "flush", &myisam_flush); @@ -1357,7 +1364,7 @@ static int check_max_delayed_threads(THD *thd, set_var *var) static void fix_max_connections(THD *thd, enum_var_type type) { #ifndef EMBEDDED_LIBRARY - resize_thr_alarm(max_connections + + resize_thr_alarm(max_connections + extra_max_connections + global_system_variables.max_insert_delayed_threads + 10); #endif } diff --git a/sql/sql_class.cc b/sql/sql_class.cc index af62961c65f..d2981b7b212 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -572,6 +572,8 @@ THD::THD() init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0); stmt_arena= this; thread_stack= 0; + scheduler= &thread_scheduler; // Will be fixed later + extra_port= 0; catalog= (char*)"std"; // the only catalog we have for now main_security_ctx.init(); security_ctx= &main_security_ctx; diff --git a/sql/sql_class.h b/sql/sql_class.h index c491f2d3348..d24f65046d3 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -1260,6 +1260,7 @@ public: struct st_mysql_stmt *current_stmt; #endif NET net; // client connection descriptor + scheduler_functions *scheduler; // Scheduler for this connection MEM_ROOT warn_root; // For warnings and errors Protocol *protocol; // Current protocol Protocol_text protocol_text; // Normal protocol @@ -1346,7 +1347,7 @@ public: uint32 server_id; uint32 file_id; // for LOAD DATA INFILE /* remote (peer) port */ - uint16 peer_port; + uint16 peer_port; time_t start_time, user_time; ulonglong connect_utime, thr_create_utime; // track down slow pthread_create ulonglong start_utime, utime_after_lock; @@ -1722,6 +1723,8 @@ public: bool locked, some_tables_deleted; bool last_cuted_field; bool no_errors, password; + bool extra_port; /* If extra connection */ + /** Set to TRUE if execution of the current compound statement can not continue. In particular, disables activation of @@ -2206,7 +2209,7 @@ public: *p_db_length= db_length; return FALSE; } - thd_scheduler scheduler; + thd_scheduler event_scheduler; public: /** diff --git a/sql/sql_connect.cc b/sql/sql_connect.cc index 931ac5d1336..ef3cfaa4e11 100644 --- a/sql/sql_connect.cc +++ b/sql/sql_connect.cc @@ -402,11 +402,15 @@ check_user(THD *thd, enum enum_server_command command, if (check_count) { - pthread_mutex_lock(&LOCK_connection_count); - bool count_ok= connection_count <= max_connections || - (thd->main_security_ctx.master_access & SUPER_ACL); - VOID(pthread_mutex_unlock(&LOCK_connection_count)); - + bool count_ok= 1; + + if (!(thd->main_security_ctx.master_access & SUPER_ACL)) + { + pthread_mutex_lock(&LOCK_connection_count); + count_ok= (*thd->scheduler->connection_count <= + *thd->scheduler->max_connections); + VOID(pthread_mutex_unlock(&LOCK_connection_count)); + } if (!count_ok) { // too many connections my_error(ER_CON_COUNT_ERROR, MYF(0)); @@ -917,7 +921,7 @@ bool setup_connection_thread_globals(THD *thd) { close_connection(thd, ER_OUT_OF_RESOURCES, 1); statistic_increment(aborted_connects,&LOCK_status); - thread_scheduler.end_thread(thd, 0); + thd->scheduler->end_thread(thd, 0); return 1; // Error } return 0; @@ -939,8 +943,7 @@ bool setup_connection_thread_globals(THD *thd) 1 error */ - -static bool login_connection(THD *thd) +bool login_connection(THD *thd) { NET *net= &thd->net; int error; @@ -978,7 +981,7 @@ static bool login_connection(THD *thd) This mainly updates status variables */ -static void end_connection(THD *thd) +void end_connection(THD *thd) { NET *net= &thd->net; plugin_thdvar_cleanup(thd); @@ -1011,7 +1014,7 @@ static void end_connection(THD *thd) Initialize THD to handle queries */ -static void prepare_new_connection_state(THD* thd) +void prepare_new_connection_state(THD* thd) { Security_context *sctx= thd->security_ctx; @@ -1081,7 +1084,7 @@ pthread_handler_t handle_one_connection(void *arg) { close_connection(thd, ER_OUT_OF_RESOURCES, 1); statistic_increment(aborted_connects,&LOCK_status); - thread_scheduler.end_thread(thd,0); + thd->scheduler->end_thread(thd,0); return 0; } if (launch_time >= slow_launch_time*1000000L) @@ -1119,7 +1122,7 @@ pthread_handler_t handle_one_connection(void *arg) end_thread: close_connection(thd, 0, 1); - if (thread_scheduler.end_thread(thd,1)) + if (thd->scheduler->end_thread(thd,1)) return 0; // Probably no-threads /* |