diff options
Diffstat (limited to 'sql/mysqld.cc')
-rw-r--r-- | sql/mysqld.cc | 325 |
1 files changed, 218 insertions, 107 deletions
diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 172891b7c8e..a6525c1c78c 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -59,10 +59,6 @@ #define mysqld_charset &my_charset_latin1 -#ifndef DBUG_OFF -#define ONE_THREAD -#endif - #ifdef HAVE_purify #define IF_PURIFY(A,B) (A) #else @@ -282,6 +278,16 @@ static TYPELIB tc_heuristic_recover_typelib= array_elements(tc_heuristic_recover_names)-1,"", tc_heuristic_recover_names, NULL }; + +static const char *thread_handling_names[]= +{ "one-thread-per-connection", "no-threads", "pool-of-threads", NullS}; + +TYPELIB thread_handling_typelib= +{ + array_elements(thread_handling_names) - 1, "", + thread_handling_names, NULL +}; + const char *first_keyword= "first", *binary_keyword= "BINARY"; const char *my_localhost= "localhost", *delayed_user= "DELAYED"; #if SIZEOF_OFF_T > 4 && defined(BIG_TABLES) @@ -467,7 +473,8 @@ ulong thread_stack, what_to_log; ulong query_buff_size, slow_launch_time, slave_open_temp_tables; ulong open_files_limit, max_binlog_size, max_relay_log_size; ulong slave_net_timeout, slave_trans_retries; -ulong thread_cache_size=0, binlog_cache_size=0, max_binlog_cache_size=0; +ulong thread_cache_size=0, thread_pool_size= 0; +ulong binlog_cache_size=0, max_binlog_cache_size=0; ulong query_cache_size=0; ulong refresh_version, flush_version; /* Increments on each reload */ query_id_t global_query_id; @@ -681,6 +688,8 @@ my_bool opt_enable_shared_memory; HANDLE smem_event_connect_request= 0; #endif +scheduler_functions thread_scheduler; + #define SSL_VARS_NOT_STATIC #include "sslopt-vars.h" #ifdef HAVE_OPENSSL @@ -861,6 +870,7 @@ static void close_connections(void) continue; tmp->killed= THD::KILL_CONNECTION; + thread_scheduler.post_kill_notification(tmp); if (tmp->mysys_var) { tmp->mysys_var->abort=1; @@ -1254,6 +1264,7 @@ void clean_up(bool print_message) if (!opt_bootstrap) (void) my_delete(pidfile_name,MYF(0)); // This may not always exist #endif + thread_scheduler.end(); finish_client_errs(); my_free((gptr) my_error_unregister(ER_ERROR_FIRST, ER_ERROR_LAST), MYF(MY_WME | MY_FAE | MY_ALLOW_ZERO_PTR)); @@ -1513,6 +1524,9 @@ static void network_init(void) DBUG_ENTER("network_init"); LINT_INIT(ret); + if (thread_scheduler.init()) + unireg_abort(1); /* purecov: inspected */ + set_ports(); if (mysqld_port != 0 && !opt_disable_networking && !opt_bootstrap) @@ -1731,21 +1745,55 @@ extern "C" sig_handler end_thread_signal(int sig __attribute__((unused))) if (thd && ! thd->bootstrap) { statistic_increment(killed_threads, &LOCK_status); - end_thread(thd,0); + thread_scheduler.end_thread(thd,0); /* purecov: inspected */ } DBUG_VOID_RETURN; /* purecov: deadcode */ } -void end_thread(THD *thd, bool put_in_cache) +/* + Unlink thd from global list of available connections and free thd + + SYNOPSIS + unlink_thd() + thd Thread handler + + NOTES + LOCK_thread_count is locked and left locked +*/ + +void unlink_thd(THD *thd) { - DBUG_ENTER("end_thread"); + DBUG_ENTER("unlink_thd"); + DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd)); thd->cleanup(); (void) pthread_mutex_lock(&LOCK_thread_count); thread_count--; delete thd; + DBUG_VOID_RETURN; +} + - if (put_in_cache && cached_thread_count < thread_cache_size && +/* + Store thread in cache for reuse by new connections + + SYNOPSIS + cache_thread() + + NOTES + LOCK_thread_count has to be locked + + RETURN + 0 Thread was not put in cache + 1 Thread is to be reused by new connection. + (ie, caller should return, not abort with pthread_exit()) +*/ + + +static bool cache_thread() +{ + safe_mutex_assert_owner(&LOCK_thread_count); + if (cached_thread_count < thread_cache_size && ! abort_loop && !kill_cached_threads) { /* Don't kill the thread, just put it in cache for reuse */ @@ -1758,31 +1806,56 @@ void end_thread(THD *thd, bool put_in_cache) pthread_cond_signal(&COND_flush_thread_cache); if (wake_thread) { + THD *thd; wake_thread--; - thd=thread_cache.get(); - thd->real_id=pthread_self(); + thd= thread_cache.get(); thd->thread_stack= (char*) &thd; // For store_globals (void) thd->store_globals(); thd->thr_create_time= time(NULL); threads.append(thd); - pthread_mutex_unlock(&LOCK_thread_count); - DBUG_VOID_RETURN; + return(1); } } + return(0); +} + + +/* + End thread for the current connection + + SYNOPSIS + one_thread_per_connection_end() + thd Thread handler + put_in_cache Store thread in cache, if there is room in it + Normally this is true in all cases except when we got + out of resources initializing the current thread + + NOTES + If thread is cached, we will wait until thread is scheduled to be + reused and then we will return. + If thread is not cached, we end the thread. + + RETURN + 0 Signal to handle_one_connection to reuse connection +*/ + +bool one_thread_per_connection_end(THD *thd, bool put_in_cache) +{ + DBUG_ENTER("one_thread_per_connection_end"); + unlink_thd(thd); + if (put_in_cache) + put_in_cache= cache_thread(); + pthread_mutex_unlock(&LOCK_thread_count); + if (put_in_cache) + DBUG_RETURN(0); // Thread is reused - /* Tell main we are ready */ - (void) pthread_mutex_unlock(&LOCK_thread_count); /* It's safe to broadcast outside a lock (COND... is not deleted here) */ DBUG_PRINT("signal", ("Broadcasting COND_thread_count")); (void) pthread_cond_broadcast(&COND_thread_count); -#ifdef ONE_THREAD - if (!(test_flags & TEST_NO_THREADS)) // For debugging under Linux -#endif - { - my_thread_end(); - pthread_exit(0); - } - DBUG_VOID_RETURN; + + my_thread_end(); + pthread_exit(0); + DBUG_RETURN(0); // Impossible } @@ -2123,14 +2196,15 @@ and this may fail.\n\n"); (ulong) dflt_key_cache->key_cache_mem_size); fprintf(stderr, "read_buffer_size=%ld\n", (long) global_system_variables.read_buff_size); fprintf(stderr, "max_used_connections=%lu\n", max_used_connections); - fprintf(stderr, "max_connections=%lu\n", max_connections); + fprintf(stderr, "max_threads=%u\n", thread_scheduler.max_threads); fprintf(stderr, "threads_connected=%u\n", thread_count); fprintf(stderr, "It is possible that mysqld could use up to \n\ -key_buffer_size + (read_buffer_size + sort_buffer_size)*max_connections = %lu K\n\ +key_buffer_size + (read_buffer_size + sort_buffer_size)*max_threads = %lu K\n\ bytes of memory\n", ((ulong) dflt_key_cache->key_cache_mem_size + (global_system_variables.read_buff_size + global_system_variables.sortbuff_size) * - max_connections)/ 1024); + thread_scheduler.max_threads + + max_connections * sizeof(THD)) / 1024); fprintf(stderr, "Hope that's ok; if not, decrease some variables in the equation.\n\n"); #if defined(HAVE_LINUXTHREADS) @@ -2331,7 +2405,7 @@ pthread_handler_t signal_hand(void *arg __attribute__((unused))) This should actually be '+ max_number_of_slaves' instead of +10, but the +10 should be quite safe. */ - init_thr_alarm(max_connections + + init_thr_alarm(thread_scheduler.max_threads + global_system_variables.max_insert_delayed_threads + 10); #if SIGINT != THR_KILL_SIGNAL if (test_flags & TEST_SIGINT) @@ -2535,18 +2609,6 @@ static void my_str_free_mysqld(void *ptr) #ifdef __WIN__ -struct utsname -{ - char nodename[FN_REFLEN]; -}; - - -int uname(struct utsname *a) -{ - return -1; -} - - pthread_handler_t handle_shutdown(void *arg) { MSG msg; @@ -3357,7 +3419,7 @@ server."); #ifdef HAVE_REPLICATION if (opt_bin_log && expire_logs_days) { - long purge_time= time(0) - expire_logs_days*24*60*60; + long purge_time= (long) (time(0) - expire_logs_days*24*60*60); if (purge_time >= 0) mysql_bin_log.purge_logs_before_date(purge_time); } @@ -4009,7 +4071,7 @@ static void bootstrap(FILE *file) my_net_init(&thd->net,(st_vio*) 0); thd->max_client_packet_length= thd->net.max_packet; thd->security_ctx->master_access= ~(ulong)0; - thd->thread_id=thread_id++; + thd->thread_id= thd->variables.pseudo_thread_id= thread_id++; thread_count++; bootstrap_file=file; @@ -4052,6 +4114,74 @@ static bool read_init_file(char *file_name) #ifndef EMBEDDED_LIBRARY + +/* + Simple scheduler that use the main thread to handle the request + + NOTES + This is only used for debugging, when starting mysqld with + --thread-handling=no-threads or --one-thread + + When we enter this function, LOCK_thread_count is hold! +*/ + +void handle_connection_in_main_thread(THD *thd) +{ + safe_mutex_assert_owner(&LOCK_thread_count); + thread_cache_size=0; // Safety + threads.append(thd); + (void) pthread_mutex_unlock(&LOCK_thread_count); + handle_one_connection((void*) thd); +} + + +/* + Scheduler that uses one thread per connection +*/ + +void create_thread_to_handle_connection(THD *thd) +{ + if (cached_thread_count > wake_thread) + { + /* Get thread from cache */ + thread_cache.append(thd); + wake_thread++; + pthread_cond_signal(&COND_thread_cache); + } + else + { + /* Create new thread to handle connection */ + int error; + thread_created++; + threads.append(thd); + DBUG_PRINT("info",(("creating thread %lu"), thd->thread_id)); + thd->connect_time = time(NULL); + if ((error=pthread_create(&thd->real_id,&connection_attrib, + handle_one_connection, + (void*) thd))) + { + /* purify: begin inspected */ + DBUG_PRINT("error", + ("Can't create thread to handle request (error %d)", + error)); + thread_count--; + thd->killed= THD::KILL_CONNECTION; // Safety + (void) pthread_mutex_unlock(&LOCK_thread_count); + statistic_increment(aborted_connects,&LOCK_status); + net_printf_error(thd, ER_CANT_CREATE_THREAD, error); + (void) pthread_mutex_lock(&LOCK_thread_count); + close_connection(thd,0,0); + delete thd; + (void) pthread_mutex_unlock(&LOCK_thread_count); + return; + /* purecov: end */ + } + } + (void) pthread_mutex_unlock(&LOCK_thread_count); + DBUG_PRINT("info",("Thread created")); +} + + /* Create new thread to handle incoming connection. @@ -4088,64 +4218,15 @@ static void create_new_thread(THD *thd) DBUG_VOID_RETURN; } pthread_mutex_lock(&LOCK_thread_count); - thd->thread_id=thread_id++; - - thd->real_id=pthread_self(); // Keep purify happy + thd->thread_id= thd->variables.pseudo_thread_id= thread_id++; /* Start a new thread to handle connection */ thread_count++; -#ifdef ONE_THREAD - if (test_flags & TEST_NO_THREADS) // For debugging under Linux - { - thread_cache_size=0; // Safety - threads.append(thd); - thd->real_id=pthread_self(); - (void) pthread_mutex_unlock(&LOCK_thread_count); - handle_one_connection((void*) thd); - } - else -#endif - { - if (thread_count-delayed_insert_threads > max_used_connections) - max_used_connections=thread_count-delayed_insert_threads; + if (thread_count - delayed_insert_threads > max_used_connections) + max_used_connections= thread_count - delayed_insert_threads; - if (cached_thread_count > wake_thread) - { - thread_cache.append(thd); - wake_thread++; - pthread_cond_signal(&COND_thread_cache); - } - else - { - int error; - thread_created++; - threads.append(thd); - DBUG_PRINT("info",(("creating thread %lu"), thd->thread_id)); - thd->connect_time = time(NULL); - if ((error=pthread_create(&thd->real_id,&connection_attrib, - handle_one_connection, - (void*) thd))) - { - DBUG_PRINT("error", - ("Can't create thread to handle request (error %d)", - error)); - thread_count--; - thd->killed= THD::KILL_CONNECTION; // Safety - (void) pthread_mutex_unlock(&LOCK_thread_count); - statistic_increment(aborted_connects,&LOCK_status); - net_printf_error(thd, ER_CANT_CREATE_THREAD, error); - (void) pthread_mutex_lock(&LOCK_thread_count); - close_connection(thd,0,0); - delete thd; - (void) pthread_mutex_unlock(&LOCK_thread_count); - DBUG_VOID_RETURN; - } - } - (void) pthread_mutex_unlock(&LOCK_thread_count); - - } - DBUG_PRINT("info",("Thread created")); + thread_scheduler.add_connection(thd); DBUG_VOID_RETURN; } #endif /* EMBEDDED_LIBRARY */ @@ -4895,6 +4976,7 @@ enum options_mysqld OPT_GENERAL_LOG, OPT_SLOW_LOG, OPT_MERGE, + OPT_THREAD_HANDLING, OPT_INNODB_ROLLBACK_ON_TIMEOUT }; @@ -5502,11 +5584,9 @@ Disable with --skip-ndbcluster (will save memory).", (gptr*) &global_system_variables.old_passwords, (gptr*) &max_system_variables.old_passwords, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, -#ifdef ONE_THREAD {"one-thread", OPT_ONE_THREAD, - "Only use one thread (for debugging under Linux).", 0, 0, 0, GET_NO_ARG, - NO_ARG, 0, 0, 0, 0, 0, 0}, -#endif + "(deprecated): Only use one thread (for debugging under Linux). Use thread-handling=no-threads instead", + 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, {"old-style-user-limits", OPT_OLD_STYLE_USER_LIMITS, "Enable old-style user limits (before 5.0.3 user resources were counted per each user+host vs. per account)", (gptr*) &opt_old_style_user_limits, (gptr*) &opt_old_style_user_limits, @@ -5985,7 +6065,7 @@ The minimum value for this variable is 4096.", // children, to avoid "too many connections" error in a common setup {"max_connections", OPT_MAX_CONNECTIONS, "The number of simultaneous clients allowed.", (gptr*) &max_connections, - (gptr*) &max_connections, 0, GET_ULONG, REQUIRED_ARG, 151, 1, 16384, 0, 1, + (gptr*) &max_connections, 0, GET_ULONG, REQUIRED_ARG, 151, 1, 100000, 0, 1, 0}, {"max_delayed_threads", OPT_MAX_DELAYED_THREADS, "Don't start more than this number of threads to handle INSERT DELAYED statements. If set to zero, which means INSERT DELAYED is not used.", @@ -6276,6 +6356,12 @@ The minimum value for this variable is 4096.", "Permits the application to give the threads system a hint for the desired number of threads that should be run at the same time.", (gptr*) &concurrency, (gptr*) &concurrency, 0, GET_ULONG, REQUIRED_ARG, DEFAULT_CONCURRENCY, 1, 512, 0, 1, 0}, +#if HAVE_POOL_OF_THREADS == 1 + {"thread_pool_size", OPT_THREAD_CACHE_SIZE, + "How many threads we should create to handle query requests in case of 'thread_handling=pool-of-threads'", + (gptr*) &thread_pool_size, (gptr*) &thread_pool_size, 0, GET_ULONG, + REQUIRED_ARG, 20, 1, 16384, 0, 1, 0}, +#endif {"thread_stack", OPT_THREAD_STACK, "The stack size for each thread.", (gptr*) &thread_stack, (gptr*) &thread_stack, 0, GET_ULONG, REQUIRED_ARG,DEFAULT_THREAD_STACK, @@ -6300,6 +6386,10 @@ The minimum value for this variable is 4096.", (gptr*) &global_system_variables.trans_prealloc_size, (gptr*) &max_system_variables.trans_prealloc_size, 0, GET_ULONG, REQUIRED_ARG, TRANS_ALLOC_PREALLOC_SIZE, 1024, ~0L, 0, 1024, 0}, + {"thread_handling", OPT_THREAD_HANDLING, + "Define threads usage for handling queries: " + "one-thread-per-connection or no-threads", 0, 0, + 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"updatable_views_with_limit", OPT_UPDATABLE_VIEWS_WITH_LIMIT, "1 = YES = Don't issue an error message (warning only) if a VIEW without presence of a key of the underlying table is used in queries with a LIMIT clause for updating. 0 = NO = Prohibit update of a VIEW, which does not contain a key of the underlying table and the query uses a LIMIT clause (usually get from GUI tools).", (gptr*) &global_system_variables.updatable_views_with_limit, @@ -7241,7 +7331,6 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), break; case 'T': test_flags= argument ? (uint) atoi(argument) : 0; - test_flags&= ~TEST_NO_THREADS; opt_endinfo=1; break; case (int) OPT_BIG_TABLES: @@ -7473,11 +7562,6 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), opt_skip_show_db=1; opt_specialflag|=SPECIAL_SKIP_SHOW_DB; break; -#ifdef ONE_THREAD - case (int) OPT_ONE_THREAD: - test_flags |= TEST_NO_THREADS; -#endif - break; case (int) OPT_WANT_CORE: test_flags |= TEST_CORE_ON_SIGNAL; break; @@ -7723,6 +7807,23 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), sql_mode); break; } + case OPT_ONE_THREAD: + global_system_variables.thread_handling= 2; + break; + case OPT_THREAD_HANDLING: + { + if ((global_system_variables.thread_handling= + find_type(argument, &thread_handling_typelib, 2)) <= 0 || + (global_system_variables.thread_handling == SCHEDULER_POOL_OF_THREADS + && !HAVE_POOL_OF_THREADS)) + { + /* purecov: begin tested */ + fprintf(stderr,"Unknown/unsupported thread-handling: %s\n",argument); + exit(1); + /* purecov: end */ + } + break; + } case OPT_FT_BOOLEAN_SYNTAX: if (ft_boolean_check_syntax_string((byte*) argument)) { @@ -7843,6 +7944,7 @@ static void get_options(int argc,char **argv) if (mysqld_chroot) set_root(mysqld_chroot); #else + global_system_variables.thread_handling = SCHEDULER_NO_THREADS; max_allowed_packet= global_system_variables.max_allowed_packet; net_buffer_length= global_system_variables.net_buffer_length; #endif @@ -7873,6 +7975,17 @@ static void get_options(int argc,char **argv) &global_system_variables.datetime_format)) exit(1); +#ifdef EMBEDDED_LIBRARY + one_thread_scheduler(&thread_scheduler); +#else + if (global_system_variables.thread_handling <= + SCHEDULER_ONE_THREAD_PER_CONNECTION) + one_thread_per_connection_scheduler(&thread_scheduler); + else if (global_system_variables.thread_handling == SCHEDULER_NO_THREADS) + one_thread_scheduler(&thread_scheduler); + else + pool_of_threads_scheduler(&thread_scheduler); /* purecov: tested */ +#endif } @@ -8213,5 +8326,3 @@ template class I_List<NAMED_LIST>; template class I_List<Statement>; template class I_List_iterator<Statement>; #endif - - |