diff options
Diffstat (limited to 'sql/sql_class.cc')
-rw-r--r-- | sql/sql_class.cc | 302 |
1 files changed, 252 insertions, 50 deletions
diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 3a4c98410e1..2f15a3176f9 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -350,26 +350,6 @@ THD *thd_get_current_thd() } /** - Set up various THD data for a new connection - - thd_new_connection_setup - - @param thd THD object - @param stack_start Start of stack for connection -*/ -void thd_new_connection_setup(THD *thd, char *stack_start) -{ - thd->set_time(); - thd->prior_thr_create_utime= thd->thr_create_utime= thd->start_utime= - my_micro_time(); - threads.append(thd); - thd_unlock_thread_count(thd); - DBUG_PRINT("info", ("init new connection. thd: 0x%lx fd: %d", - (ulong)thd, thd->net.vio->sd)); - thd_set_thread_stack(thd, stack_start); -} - -/** Lock data that needs protection in THD object @param thd THD object @@ -510,13 +490,11 @@ int thd_tablespace_op(const THD *thd) extern "C" -const char *set_thd_proc_info(void *thd_arg, const char *info, +const char *set_thd_proc_info(THD *thd, const char *info, const char *calling_function, const char *calling_file, const unsigned int calling_line) { - THD *thd= (THD *) thd_arg; - if (!thd) thd= current_thd; @@ -746,7 +724,7 @@ THD::THD() :Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION, /* statement id */ 0), rli_fake(0), - user_time(0), in_sub_stmt(0), + in_sub_stmt(0), binlog_unsafe_warning_flags(0), binlog_table_maps(0), table_map_for_update(0), @@ -758,6 +736,7 @@ THD::THD() examined_row_count(0), warning_info(&main_warning_info), stmt_da(&main_da), + global_disable_checkpoint(0), is_fatal_error(0), transaction_rollback_request(0), is_fatal_sub_stmt_error(0), @@ -791,7 +770,7 @@ THD::THD() security_ctx= &main_security_ctx; no_errors= 0; password= 0; - query_start_used= 0; + query_start_used= query_start_sec_part_used= 0; count_cuted_fields= CHECK_FIELD_IGNORE; killed= NOT_KILLED; col_access=0; @@ -806,9 +785,11 @@ THD::THD() statement_id_counter= 0UL; // Must be reset to handle error with THD's created for init of mysqld lex->current_select= 0; - start_time=(time_t) 0; + user_time.val= start_time= start_time_sec_part= 0; start_utime= prior_thr_create_utime= 0L; utime_after_lock= 0L; + progress.report_to_client= 0; + progress.max_counter= 0; current_linfo = 0; slave_thread = 0; bzero(&variables, sizeof(variables)); @@ -845,6 +826,8 @@ THD::THD() active_vio = 0; #endif mysql_mutex_init(key_LOCK_thd_data, &LOCK_thd_data, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wakeup_ready, &LOCK_wakeup_ready, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wakeup_ready, &COND_wakeup_ready, 0); /* Variables with default values */ proc_info="login"; @@ -892,6 +875,9 @@ THD::THD() arena_for_cached_items= 0; memset(&invoker_user, 0, sizeof(invoker_user)); memset(&invoker_host, 0, sizeof(invoker_host)); + prepare_derived_at_open= FALSE; + create_tmp_table_for_derived= FALSE; + save_prep_leaf_list= FALSE; } @@ -1242,17 +1228,21 @@ void THD::update_stats(void) void THD::update_all_stats() { - time_t save_time; ulonglong end_cpu_time, end_utime; double busy_time, cpu_time; + /* Reset status variables used by information_schema.processlist */ + progress.max_counter= 0; + progress.max_stage= 0; + progress.report= 0; + /* This is set at start of query if opt_userstat_running was set */ if (!userstat_running) return; end_cpu_time= my_getcputime(); - end_utime= my_micro_time_and_time(&save_time); - busy_time= (end_utime - start_utime) / 1000000.0; + end_utime= microsecond_interval_timer(); + busy_time= (end_utime - start_utime) / 1000000.0; cpu_time= (end_cpu_time - start_cpu_time) / 10000000.0; /* In case there are bad values, 2629743 is the #seconds in a month. */ if (cpu_time > 2629743.0) @@ -1260,7 +1250,8 @@ void THD::update_all_stats() status_var_add(status_var.cpu_time, cpu_time); status_var_add(status_var.busy_time, busy_time); - update_global_user_stats(this, TRUE, save_time); + update_global_user_stats(this, TRUE, my_time(0)); + // Has to be updated after update_global_user_stats() userstat_running= 0; } @@ -1354,6 +1345,11 @@ void THD::cleanup(void) /* All metadata locks must have been released by now. */ DBUG_ASSERT(!mdl_context.has_locks()); + if (user_connect) + { + decrease_user_connections(user_connect); + user_connect= 0; // Safety + } wt_thd_destroy(&transaction.wt); #if defined(ENABLED_DEBUG_SYNC) @@ -1412,6 +1408,8 @@ THD::~THD() my_free(db); db= NULL; free_root(&transaction.mem_root,MYF(0)); + mysql_cond_destroy(&COND_wakeup_ready); + mysql_mutex_destroy(&LOCK_wakeup_ready); mysql_mutex_destroy(&LOCK_thd_data); #ifndef DBUG_OFF dbug_sentry= THD_SENTRY_GONE; @@ -1455,9 +1453,12 @@ void add_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var) *(to++)+= *(from++); /* Handle the not ulong variables. See end of system_status_var */ - to_var->bytes_received= from_var->bytes_received; + to_var->bytes_received+= from_var->bytes_received; to_var->bytes_sent+= from_var->bytes_sent; - to_var->binlog_bytes_written= from_var->binlog_bytes_written; + to_var->rows_read+= from_var->rows_read; + to_var->rows_sent+= from_var->rows_sent; + to_var->rows_tmp_read+= from_var->rows_tmp_read; + to_var->binlog_bytes_written+= from_var->binlog_bytes_written; to_var->cpu_time+= from_var->cpu_time; to_var->busy_time+= from_var->busy_time; } @@ -1490,6 +1491,9 @@ void add_diff_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var, to_var->bytes_received+= from_var->bytes_received - dec_var->bytes_received; to_var->bytes_sent+= from_var->bytes_sent - dec_var->bytes_sent; + to_var->rows_read+= from_var->rows_read - dec_var->rows_read; + to_var->rows_sent+= from_var->rows_sent - dec_var->rows_sent; + to_var->rows_tmp_read+= from_var->rows_tmp_read - dec_var->rows_tmp_read; to_var->binlog_bytes_written+= from_var->binlog_bytes_written - dec_var->binlog_bytes_written; to_var->cpu_time+= from_var->cpu_time - dec_var->cpu_time; @@ -1597,12 +1601,35 @@ void THD::awake(THD::killed_state state_to_set) enter_cond(). This should make the signaling as safe as possible. However, there is still a small chance of failure on platforms with instruction or memory write reordering. + + We have to do the loop with trylock, because if we would use + pthread_mutex_lock(), we can cause a deadlock as we are here locking + the mysys_var->mutex and mysys_var->current_mutex in a different order + than in the thread we are trying to kill. + We only sleep for 2 seconds as we don't want to have LOCK_thd_data + locked too long time. + + There is a small change we may not succeed in aborting a thread that + is not yet waiting for a mutex, but as this happens only for a + thread that was doing something else when the kill was issued and + which should detect the kill flag before it starts to wait, this + should be good enough. */ if (mysys_var->current_cond && mysys_var->current_mutex) { - mysql_mutex_lock(mysys_var->current_mutex); - mysql_cond_broadcast(mysys_var->current_cond); - mysql_mutex_unlock(mysys_var->current_mutex); + uint i; + for (i= 0; i < WAIT_FOR_KILL_TRY_TIMES * SECONDS_TO_WAIT_FOR_KILL; i++) + { + int ret= mysql_mutex_trylock(mysys_var->current_mutex); + mysql_cond_broadcast(mysys_var->current_cond); + if (!ret) + { + /* Signal is sure to get through */ + mysql_mutex_unlock(mysys_var->current_mutex); + break; + } + } + my_sleep(1000000L / WAIT_FOR_KILL_TRY_TIMES); } mysql_mutex_unlock(&mysys_var->mutex); } @@ -1724,6 +1751,9 @@ void THD::reset_globals() void THD::cleanup_after_query() { DBUG_ENTER("THD::cleanup_after_query"); + + thd_progress_end(this); + /* Reset rand_used so that detection of calls to rand() will save random seeds if needed by the slave. @@ -2071,6 +2101,36 @@ void THD::nocheck_register_item_tree_change(Item **place, Item *old_value, change_list.append(change); } +/** + Check and register item change if needed + + @param place place where we should assign new value + @param new_value place of the new value + + @details + Let C be a reference to an item that changed the reference A + at the location (occurrence) L1 and this change has been registered. + If C is substituted for reference A another location (occurrence) L2 + that is to be registered as well than this change has to be + consistent with the first change in order the procedure that rollback + changes to substitute the same reference at both locations L1 and L2. +*/ + +void THD::check_and_register_item_tree_change(Item **place, Item **new_value, + MEM_ROOT *runtime_memroot) +{ + Item_change_record *change; + I_List_iterator<Item_change_record> it(change_list); + while ((change= it++)) + { + if (change->place == new_value) + break; // we need only very first value + } + if (change) + nocheck_register_item_tree_change(place, change->old_value, + runtime_memroot); +} + void THD::rollback_item_tree_changes() { @@ -2179,7 +2239,7 @@ void select_send::cleanup() /* Send data to client. Returns 0 if ok */ -bool select_send::send_data(List<Item> &items) +int select_send::send_data(List<Item> &items) { Protocol *protocol= thd->protocol; DBUG_ENTER("select_send::send_data"); @@ -2466,7 +2526,7 @@ select_export::prepare(List<Item> &list, SELECT_LEX_UNIT *u) (int) (uchar) (x) == line_sep_char || \ !(x)) -bool select_export::send_data(List<Item> &items) +int select_export::send_data(List<Item> &items) { DBUG_ENTER("select_export::send_data"); @@ -2672,7 +2732,6 @@ bool select_export::send_data(List<Item> &items) { // Fill with space if (item->max_length > used_length) { - /* QQ: Fix by adding a my_b_fill() function */ if (!space_inited) { space_inited=1; @@ -2724,7 +2783,7 @@ select_dump::prepare(List<Item> &list __attribute__((unused)), } -bool select_dump::send_data(List<Item> &items) +int select_dump::send_data(List<Item> &items) { List_iterator_fast<Item> li(items); char buff[MAX_FIELD_WIDTH]; @@ -2769,7 +2828,7 @@ select_subselect::select_subselect(Item_subselect *item_arg) } -bool select_singlerow_subselect::send_data(List<Item> &items) +int select_singlerow_subselect::send_data(List<Item> &items) { DBUG_ENTER("select_singlerow_subselect::send_data"); Item_singlerow_subselect *it= (Item_singlerow_subselect *)item; @@ -2800,7 +2859,7 @@ void select_max_min_finder_subselect::cleanup() } -bool select_max_min_finder_subselect::send_data(List<Item> &items) +int select_max_min_finder_subselect::send_data(List<Item> &items) { DBUG_ENTER("select_max_min_finder_subselect::send_data"); Item_maxmin_subselect *it= (Item_maxmin_subselect *)item; @@ -2832,6 +2891,8 @@ bool select_max_min_finder_subselect::send_data(List<Item> &items) op= &select_max_min_finder_subselect::cmp_decimal; break; case ROW_RESULT: + case TIME_RESULT: + case IMPOSSIBLE_RESULT: // This case should never be choosen DBUG_ASSERT(0); op= 0; @@ -2903,7 +2964,7 @@ bool select_max_min_finder_subselect::cmp_str() sortcmp(val1, val2, cache->collation.collation) < 0); } -bool select_exists_subselect::send_data(List<Item> &items) +int select_exists_subselect::send_data(List<Item> &items) { DBUG_ENTER("select_exists_subselect::send_data"); Item_exists_subselect *it= (Item_exists_subselect *)item; @@ -2964,6 +3025,7 @@ void Query_arena::free_items() for (; free_list; free_list= next) { next= free_list->next; + DBUG_ASSERT(free_list != next); free_list->delete_self(); } /* Postcondition: free_list is 0 */ @@ -3244,7 +3306,7 @@ Statement_map::~Statement_map() my_hash_free(&st_hash); } -bool select_dumpvar::send_data(List<Item> &items) +int select_dumpvar::send_data(List<Item> &items) { List_iterator_fast<my_var> var_li(var_list); List_iterator<Item> it(items); @@ -3303,7 +3365,8 @@ bool select_materialize_with_stats:: create_result_table(THD *thd_arg, List<Item> *column_types, bool is_union_distinct, ulonglong options, - const char *table_alias, bool bit_fields_as_long) + const char *table_alias, bool bit_fields_as_long, + bool create_table) { DBUG_ASSERT(table == 0); tmp_table_param.field_count= column_types->elements; @@ -3349,18 +3412,26 @@ void select_materialize_with_stats::cleanup() @return FALSE on success */ -bool select_materialize_with_stats::send_data(List<Item> &items) +int select_materialize_with_stats::send_data(List<Item> &items) { List_iterator_fast<Item> item_it(items); Item *cur_item; Column_statistics *cur_col_stat= col_stat; uint nulls_in_row= 0; + int res; + + if ((res= select_union::send_data(items))) + return res; + /* Skip duplicate rows. */ + if (write_err == HA_ERR_FOUND_DUPP_KEY || + write_err == HA_ERR_FOUND_DUPP_UNIQUE) + return 0; ++count_rows; while ((cur_item= item_it++)) { - if (cur_item->is_null()) + if (cur_item->is_null_result()) { ++cur_col_stat->null_count; cur_col_stat->max_null_row= count_rows; @@ -3373,7 +3444,7 @@ bool select_materialize_with_stats::send_data(List<Item> &items) if (nulls_in_row > max_nulls_in_row) max_nulls_in_row= nulls_in_row; - return select_union::send_data(items); + return 0; } @@ -3391,6 +3462,7 @@ void TMP_TABLE_PARAM::init() table_charset= 0; precomputed_group_by= 0; bit_fields_as_long= 0; + materialized_subquery= 0; skip_create_table= 0; DBUG_VOID_RETURN; } @@ -3623,11 +3695,123 @@ void THD::restore_backup_open_tables_state(Open_tables_backup *backup) @retval 0 the user thread is active @retval 1 the user thread has been killed */ + extern "C" int thd_killed(const MYSQL_THD thd) { return(thd->killed); } + +/** + Send an out-of-band progress report to the client + + The report is sent every 'thd->...progress_report_time' second, + however not more often than global.progress_report_time. + If global.progress_report_time is 0, then don't send progress reports, but + check every second if the value has changed +*/ + +static void thd_send_progress(THD *thd) +{ + /* Check if we should send the client a progress report */ + ulonglong report_time= my_interval_timer(); + if (report_time > thd->progress.next_report_time) + { + uint seconds_to_next= max(thd->variables.progress_report_time, + global_system_variables.progress_report_time); + if (seconds_to_next == 0) // Turned off + seconds_to_next= 1; // Check again after 1 second + + thd->progress.next_report_time= (report_time + + seconds_to_next * 1000000000ULL); + if (global_system_variables.progress_report_time && + thd->variables.progress_report_time) + net_send_progress_packet(thd); + } +} + + +/** Initialize progress report handling **/ + +extern "C" void thd_progress_init(MYSQL_THD thd, uint max_stage) +{ + /* + Send progress reports to clients that supports it, if the command + is a high level command (like ALTER TABLE) and we are not in a + stored procedure + */ + thd->progress.report= ((thd->client_capabilities & CLIENT_PROGRESS) && + thd->progress.report_to_client && + !thd->in_sub_stmt); + thd->progress.next_report_time= 0; + thd->progress.stage= 0; + thd->progress.counter= thd->progress.max_counter= 0; + thd->progress.max_stage= max_stage; +} + + +/* Inform processlist and the client that some progress has been made */ + +extern "C" void thd_progress_report(MYSQL_THD thd, + ulonglong progress, ulonglong max_progress) +{ + if (thd->progress.max_counter != max_progress) // Simple optimization + { + mysql_mutex_lock(&thd->LOCK_thd_data); + thd->progress.counter= progress; + thd->progress.max_counter= max_progress; + mysql_mutex_unlock(&thd->LOCK_thd_data); + } + else + thd->progress.counter= progress; + + if (thd->progress.report) + thd_send_progress(thd); +} + +/** + Move to next stage in process list handling + + This will reset the timer to ensure the progress is sent to the client + if client progress reports are activated. +*/ + +extern "C" void thd_progress_next_stage(MYSQL_THD thd) +{ + mysql_mutex_lock(&thd->LOCK_thd_data); + thd->progress.stage++; + thd->progress.counter= 0; + DBUG_ASSERT(thd->progress.stage < thd->progress.max_stage); + mysql_mutex_unlock(&thd->LOCK_thd_data); + if (thd->progress.report) + { + thd->progress.next_report_time= 0; // Send new stage info + thd_send_progress(thd); + } +} + +/** + Disable reporting of progress in process list. + + @note + This function is safe to call even if one has not called thd_progress_init. + + This function should be called by all parts that does progress + reporting to ensure that progress list doesn't contain 100 % done + forever. +*/ + + +extern "C" void thd_progress_end(MYSQL_THD thd) +{ + /* + It's enough to reset max_counter to set disable progress indicator + in processlist. + */ + thd->progress.max_counter= 0; +} + + /** Return the thread id of a user thread @param thd user thread @@ -4990,8 +5174,8 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, bool suppress_use, int errcode) { DBUG_ENTER("THD::binlog_query"); - DBUG_PRINT("enter", ("qtype: %s query: '%s'", - show_query_type(qtype), query_arg)); + DBUG_PRINT("enter", ("qtype: %s query: '%-.*s'", + show_query_type(qtype), (int) query_len, query_arg)); DBUG_ASSERT(query_arg && mysql_bin_log.is_open()); /* @@ -5034,7 +5218,6 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, spcont == NULL && !binlog_evt_union.do_union) issue_unsafe_warnings(); - switch (qtype) { /* ROW_QUERY_TYPE means that the statement may be logged either in @@ -5087,6 +5270,25 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, DBUG_RETURN(0); } +void +THD::wait_for_wakeup_ready() +{ + mysql_mutex_lock(&LOCK_wakeup_ready); + while (!wakeup_ready) + mysql_cond_wait(&COND_wakeup_ready, &LOCK_wakeup_ready); + mysql_mutex_unlock(&LOCK_wakeup_ready); +} + +void +THD::signal_wakeup_ready() +{ + mysql_mutex_lock(&LOCK_wakeup_ready); + wakeup_ready= true; + mysql_mutex_unlock(&LOCK_wakeup_ready); + mysql_cond_signal(&COND_wakeup_ready); +} + + bool Discrete_intervals_list::append(ulonglong start, ulonglong val, ulonglong incr) { |