diff options
Diffstat (limited to 'sql/sql_class.cc')
-rw-r--r-- | sql/sql_class.cc | 1838 |
1 files changed, 1392 insertions, 446 deletions
diff --git a/sql/sql_class.cc b/sql/sql_class.cc index a665cd6522e..1ec2d8b3c9e 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -28,13 +28,13 @@ #pragma implementation // gcc: Class implementation #endif -#include "my_global.h" /* NO_EMBEDDED_ACCESS_CHECKS */ +#include <my_global.h> /* NO_EMBEDDED_ACCESS_CHECKS */ #include "sql_priv.h" -#include "unireg.h" // REQUIRED: for other includes #include "sql_class.h" #include "sql_cache.h" // query_cache_abort #include "sql_base.h" // close_thread_tables #include "sql_time.h" // date_time_format_copy +#include "tztime.h" // MYSQL_TIME <-> my_time_t #include "sql_acl.h" // NO_ACCESS, // acl_getroot_no_password #include "sql_base.h" // close_temporary_tables @@ -62,6 +62,7 @@ #include "debug_sync.h" #include "sql_parse.h" // is_update_query #include "sql_callback.h" +#include "lock.h" #include "sql_connect.h" #include "repl_failsafe.h" @@ -74,23 +75,6 @@ char empty_c_string[1]= {0}; /* used for not defined db */ const char * const THD::DEFAULT_WHERE= "field list"; - -/***************************************************************************** -** Instansiate templates -*****************************************************************************/ - -#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION -/* Used templates */ -template class List<Key>; -template class List_iterator<Key>; -template class List<Key_part_spec>; -template class List_iterator<Key_part_spec>; -template class List<Alter_drop>; -template class List_iterator<Alter_drop>; -template class List<Alter_column>; -template class List_iterator<Alter_column>; -#endif - /**************************************************************************** ** User variables ****************************************************************************/ @@ -130,7 +114,8 @@ Key::Key(const Key &rhs, MEM_ROOT *mem_root) columns(rhs.columns, mem_root), name(rhs.name), option_list(rhs.option_list), - generated(rhs.generated) + generated(rhs.generated), + create_if_not_exists(rhs.create_if_not_exists) { list_copy_and_replace_each_value(columns, mem_root); } @@ -144,6 +129,7 @@ Key::Key(const Key &rhs, MEM_ROOT *mem_root) Foreign_key::Foreign_key(const Foreign_key &rhs, MEM_ROOT *mem_root) :Key(rhs,mem_root), + ref_db(rhs.ref_db), ref_table(rhs.ref_table), ref_columns(rhs.ref_columns,mem_root), delete_opt(rhs.delete_opt), @@ -245,7 +231,7 @@ bool Foreign_key::validate(List<Create_field> &table_fields) sql_field->field_name)) {} if (!sql_field) { - my_error(ER_KEY_COLUMN_DOES_NOT_EXITS, MYF(0), column->field_name); + my_error(ER_KEY_COLUMN_DOES_NOT_EXITS, MYF(0), column->field_name.str); DBUG_RETURN(TRUE); } if (type == Key::FOREIGN_KEY && sql_field->vcol_info) @@ -368,29 +354,6 @@ void thd_set_thread_stack(THD *thd, char *stack_start) } /** - Lock connection data for the set of connections this connection - belongs to - - @param thd THD object -*/ -void thd_lock_thread_count(THD *) -{ - mysql_mutex_lock(&LOCK_thread_count); -} - -/** - Lock connection data for the set of connections this connection - belongs to - - @param thd THD object -*/ -void thd_unlock_thread_count(THD *) -{ - mysql_cond_broadcast(&COND_thread_count); - mysql_mutex_unlock(&LOCK_thread_count); -} - -/** Close the socket used by this connection @param thd THD object @@ -496,7 +459,7 @@ void thd_set_mysys_var(THD *thd, st_my_thread_var *mysys_var) */ my_socket thd_get_fd(THD *thd) { - return thd->net.vio->sd; + return mysql_socket_getfd(thd->net.vio->mysql_socket); } #endif @@ -552,54 +515,103 @@ extern "C" int mysql_tmpfile(const char *prefix) extern "C" int thd_in_lock_tables(const THD *thd) { - return test(thd->in_lock_tables); + return MY_TEST(thd->in_lock_tables); } extern "C" int thd_tablespace_op(const THD *thd) { - return test(thd->tablespace_op); + return MY_TEST(thd->tablespace_op); } - extern "C" -const char *set_thd_proc_info(THD *thd, const char *info, +const char *set_thd_proc_info(THD *thd_arg, const char *info, const char *calling_function, const char *calling_file, const unsigned int calling_line) { - if (!thd) + PSI_stage_info old_stage; + PSI_stage_info new_stage; + + old_stage.m_key= 0; + old_stage.m_name= info; + + set_thd_stage_info(thd_arg, & old_stage, & new_stage, + calling_function, calling_file, calling_line); + + return new_stage.m_name; +} + +extern "C" +void set_thd_stage_info(void *thd_arg, + const PSI_stage_info *new_stage, + PSI_stage_info *old_stage, + const char *calling_func, + const char *calling_file, + const unsigned int calling_line) +{ + THD *thd= (THD*) thd_arg; + if (thd == NULL) thd= current_thd; - const char *old_info= thd->proc_info; - DBUG_PRINT("proc_info", ("%s:%d %s", calling_file, calling_line, info)); + thd->enter_stage(new_stage, old_stage, calling_func, calling_file, + calling_line); +} + +void THD::enter_stage(const PSI_stage_info *new_stage, + PSI_stage_info *old_stage, + const char *calling_func, + const char *calling_file, + const unsigned int calling_line) +{ + DBUG_PRINT("THD::enter_stage", ("%s:%d", calling_file, calling_line)); + + if (old_stage != NULL) + { + old_stage->m_key= m_current_stage_key; + old_stage->m_name= proc_info; + } + + if (new_stage != NULL) + { + const char *msg= new_stage->m_name; #if defined(ENABLED_PROFILING) - thd->profiling.status_change(info, - calling_function, calling_file, calling_line); + profiling.status_change(msg, calling_func, calling_file, calling_line); #endif - thd->proc_info= info; - return old_info; + + m_current_stage_key= new_stage->m_key; + proc_info= msg; + +#ifdef HAVE_PSI_THREAD_INTERFACE + PSI_THREAD_CALL(set_thread_state)(msg); + MYSQL_SET_STAGE(m_current_stage_key, calling_file, calling_line); +#endif + } + return; } -extern "C" -const char* thd_enter_cond(MYSQL_THD thd, mysql_cond_t *cond, - mysql_mutex_t *mutex, const char *msg) +void thd_enter_cond(MYSQL_THD thd, mysql_cond_t *cond, mysql_mutex_t *mutex, + const PSI_stage_info *stage, PSI_stage_info *old_stage, + const char *src_function, const char *src_file, + int src_line) { if (!thd) thd= current_thd; - return thd->enter_cond(cond, mutex, msg); + return thd->enter_cond(cond, mutex, stage, old_stage, src_function, src_file, + src_line); } -extern "C" -void thd_exit_cond(MYSQL_THD thd, const char *old_msg) +void thd_exit_cond(MYSQL_THD thd, const PSI_stage_info *stage, + const char *src_function, const char *src_file, + int src_line) { if (!thd) thd= current_thd; - thd->exit_cond(old_msg); + thd->exit_cond(stage, src_function, src_file, src_line); return; } @@ -645,6 +657,17 @@ void thd_set_ha_data(THD *thd, const struct handlerton *hton, } +/** + Allow storage engine to wakeup commits waiting in THD::wait_for_prior_commit. + @see thd_wakeup_subsequent_commits() definition in plugin.h +*/ +extern "C" +void thd_wakeup_subsequent_commits(THD *thd, int wakeup_error) +{ + thd->wakeup_subsequent_commits(wakeup_error); +} + + extern "C" long long thd_test_options(const THD *thd, long long test_options) { @@ -664,109 +687,51 @@ int thd_tx_isolation(const THD *thd) } extern "C" -void thd_inc_row_count(THD *thd) +int thd_tx_is_read_only(const THD *thd) { - thd->warning_info->inc_current_row_for_warning(); + return (int) thd->tx_read_only; } -/** - Dumps a text description of a thread, its security context - (user, host) and the current query. - - @param thd thread context - @param buffer pointer to preferred result buffer - @param length length of buffer - @param max_query_len how many chars of query to copy (0 for all) - - @req LOCK_thread_count - - @note LOCK_thread_count mutex is not necessary when the function is invoked on - the currently running thread (current_thd) or if the caller in some other - way guarantees that access to thd->query is serialized. - - @return Pointer to string -*/ - extern "C" -char *thd_security_context(THD *thd, char *buffer, unsigned int length, - unsigned int max_query_len) -{ - String str(buffer, length, &my_charset_latin1); - const Security_context *sctx= &thd->main_security_ctx; - char header[256]; - int len; - /* - The pointers thd->query and thd->proc_info might change since they are - being modified concurrently. This is acceptable for proc_info since its - values doesn't have to very accurate and the memory it points to is static, - but we need to attempt a snapshot on the pointer values to avoid using NULL - values. The pointer to thd->query however, doesn't point to static memory - and has to be protected by thd->LOCK_thd_data or risk pointing to - uninitialized memory. - */ - const char *proc_info= thd->proc_info; - - len= my_snprintf(header, sizeof(header), - "MySQL thread id %lu, OS thread handle 0x%lx, query id %lu", - thd->thread_id, (ulong) thd->real_id, (ulong) thd->query_id); - str.length(0); - str.append(header, len); - - if (sctx->host) - { - str.append(' '); - str.append(sctx->host); - } +{ /* Functions for thd_error_context_service */ - if (sctx->ip) + const char *thd_get_error_message(const THD *thd) { - str.append(' '); - str.append(sctx->ip); + return thd->get_stmt_da()->message(); } - if (sctx->user) + uint thd_get_error_number(const THD *thd) { - str.append(' '); - str.append(sctx->user); + return thd->get_stmt_da()->sql_errno(); } - if (proc_info) + ulong thd_get_error_row(const THD *thd) { - str.append(' '); - str.append(proc_info); + return thd->get_stmt_da()->current_row_for_warning(); } - /* Don't wait if LOCK_thd_data is used as this could cause a deadlock */ - if (!mysql_mutex_trylock(&thd->LOCK_thd_data)) + void thd_inc_error_row(THD *thd) { - if (thd->query()) - { - if (max_query_len < 1) - len= thd->query_length(); - else - len= min(thd->query_length(), max_query_len); - str.append('\n'); - str.append(thd->query(), len); - } - mysql_mutex_unlock(&thd->LOCK_thd_data); + thd->get_stmt_da()->inc_current_row_for_warning(); } +} - if (str.c_ptr_safe() == buffer) - return buffer; - /* - We have to copy the new string to the destination buffer because the string - was reallocated to a larger buffer to be able to fit. - */ - DBUG_ASSERT(buffer != NULL); - length= min(str.length(), length-1); - memcpy(buffer, str.c_ptr_quick(), length); - /* Make sure that the new string is null terminated */ - buffer[length]= '\0'; - return buffer; +#if MARIA_PLUGIN_INTERFACE_VERSION < 0x0200 +/** + TODO: This function is for API compatibility, remove it eventually. + All engines should switch to use thd_get_error_context_description() + plugin service function. +*/ +extern "C" +char *thd_security_context(THD *thd, + char *buffer, unsigned int length, + unsigned int max_query_len) +{ + return thd_get_error_context_description(thd, buffer, length, max_query_len); } - +#endif /** Implementation of Drop_table_error_handler::handle_condition(). @@ -785,9 +750,9 @@ char *thd_security_context(THD *thd, char *buffer, unsigned int length, bool Drop_table_error_handler::handle_condition(THD *thd, uint sql_errno, const char* sqlstate, - MYSQL_ERROR::enum_warning_level level, + Sql_condition::enum_warning_level level, const char* msg, - MYSQL_ERROR ** cond_hdl) + Sql_condition ** cond_hdl) { *cond_hdl= NULL; return ((sql_errno == EE_DELETE && my_errno == ENOENT) || @@ -798,7 +763,7 @@ bool Drop_table_error_handler::handle_condition(THD *thd, THD::THD() :Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION, /* statement id */ 0), - rli_fake(0), rli_slave(NULL), + rli_fake(0), rgi_fake(0), rgi_slave(NULL), in_sub_stmt(0), log_all_errors(0), binlog_unsafe_warning_flags(0), binlog_table_maps(0), @@ -808,10 +773,12 @@ THD::THD() first_successful_insert_id_in_prev_stmt_for_binlog(0), first_successful_insert_id_in_cur_stmt(0), stmt_depends_on_first_successful_insert_id_in_prev_stmt(FALSE), - examined_row_count(0), + m_examined_row_count(0), accessed_rows_and_keys(0), - warning_info(&main_warning_info), - stmt_da(&main_da), + m_digest(NULL), + m_statement_psi(NULL), + m_idle_psi(NULL), + thread_id(0), global_disable_checkpoint(0), failed_com_change_user(0), is_fatal_error(0), @@ -822,22 +789,36 @@ THD::THD() in_lock_tables(0), bootstrap(0), derived_tables_processing(FALSE), + waiting_on_group_commit(FALSE), has_waiter(FALSE), spcont(NULL), m_parser_state(NULL), #if defined(ENABLED_DEBUG_SYNC) debug_sync_control(0), #endif /* defined(ENABLED_DEBUG_SYNC) */ - main_warning_info(0, false) + wait_for_commit_ptr(0), + main_da(0, false, false), + m_stmt_da(&main_da) { ulong tmp; mdl_context.init(this); /* + We set THR_THD to temporally point to this THD to register all the + variables that allocates memory for this THD + */ + THD *old_THR_THD= current_thd; + set_current_thd(this); + status_var.memory_used= 0; + main_da.init(); + + /* Pass nominal parameters to init_alloc_root only to ensure that the destructor works OK in case of an error. The main_mem_root will be re-initialized in init_for_queries(). */ - init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0); + init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0, + MYF(MY_THREAD_SPECIFIC)); + stmt_arena= this; thread_stack= 0; scheduler= thread_scheduler; // Will be fixed later @@ -856,9 +837,10 @@ THD::THD() col_access=0; is_slave_error= thread_specific_used= FALSE; my_hash_clear(&handler_tables_hash); + my_hash_clear(&ull_hash); tmp_table=0; cuted_fields= 0L; - sent_row_count= 0L; + m_sent_row_count= 0L; limit_found_rows= 0; m_row_count_func= -1; statement_id_counter= 0UL; @@ -872,9 +854,10 @@ THD::THD() progress.max_counter= 0; current_linfo = 0; slave_thread = 0; + connection_name.str= 0; + connection_name.length= 0; + bzero(&variables, sizeof(variables)); - thread_id= 0; - one_shot_set= 0; file_id = 0; query_id= 0; query_name_consts= 0; @@ -884,6 +867,7 @@ THD::THD() mysys_var=0; binlog_evt_union.do_union= FALSE; enable_slow_log= 0; + durability_property= HA_REGULAR_DURABILITY; #ifndef DBUG_OFF dbug_sentry=THD_SENTRY_MAGIC; @@ -892,8 +876,8 @@ THD::THD() mysql_audit_init_thd(this); #endif net.vio=0; + net.buff= 0; client_capabilities= 0; // minimalistic client - ull=0; system_thread= NON_SYSTEM_THREAD; cleanup_done= abort_on_warning= 0; peer_port= 0; // For SHOW PROCESSLIST @@ -918,9 +902,9 @@ THD::THD() /* Variables with default values */ proc_info="login"; where= THD::DEFAULT_WHERE; - server_id = ::server_id; + variables.server_id = global_system_variables.server_id; slave_net = 0; - command=COM_CONNECT; + m_command=COM_CONNECT; *scramble= '\0'; /* Call to init() below requires fully initialized Open_tables_state. */ @@ -933,7 +917,7 @@ THD::THD() user_connect=(USER_CONN *)0; my_hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0, (my_hash_get_key) get_var_key, - (my_hash_free_key) free_user_var, 0); + (my_hash_free_key) free_user_var, HASH_THREAD_SPECIFIC); sp_proc_cache= NULL; sp_func_cache= NULL; @@ -941,7 +925,7 @@ THD::THD() /* For user vars replication*/ if (opt_bin_log) my_init_dynamic_array(&user_var_events, - sizeof(BINLOG_USER_VAR_EVENT *), 16, 16); + sizeof(BINLOG_USER_VAR_EVENT *), 16, 16, MYF(0)); else bzero((char*) &user_var_events, sizeof(user_var_events)); @@ -951,18 +935,34 @@ THD::THD() protocol_binary.init(this); tablespace_op=FALSE; - tmp= sql_rnd_with_mutex(); + + /* + Initialize the random generator. We call my_rnd() without a lock as + it's not really critical if two threads modifies the structure at the + same time. We ensure that we have an unique number foreach thread + by adding the address of the stack. + */ + tmp= (ulong) (my_rnd(&sql_rand) * 0xffffffff); my_rnd_init(&rand, tmp + (ulong) &rand, tmp + (ulong) ::global_query_id); substitute_null_with_insert_id = FALSE; thr_lock_info_init(&lock_info); /* safety: will be reset after start */ + m_token_array= NULL; + if (max_digest_length > 0) + { + m_token_array= (unsigned char*) my_malloc(max_digest_length, + MYF(MY_WME|MY_THREAD_SPECIFIC)); + } + m_internal_handler= NULL; - m_binlog_invoker= FALSE; + m_binlog_invoker= INVOKER_NONE; memset(&invoker_user, 0, sizeof(invoker_user)); memset(&invoker_host, 0, sizeof(invoker_host)); prepare_derived_at_open= FALSE; create_tmp_table_for_derived= FALSE; save_prep_leaf_list= FALSE; + /* Restore THR_THD */ + set_current_thd(old_THR_THD); } @@ -983,9 +983,9 @@ void THD::push_internal_handler(Internal_error_handler *handler) bool THD::handle_condition(uint sql_errno, const char* sqlstate, - MYSQL_ERROR::enum_warning_level level, + Sql_condition::enum_warning_level level, const char* msg, - MYSQL_ERROR ** cond_hdl) + Sql_condition ** cond_hdl) { if (!m_internal_handler) { @@ -1022,7 +1022,7 @@ void THD::raise_error(uint sql_errno) const char* msg= ER(sql_errno); (void) raise_condition(sql_errno, NULL, - MYSQL_ERROR::WARN_LEVEL_ERROR, + Sql_condition::WARN_LEVEL_ERROR, msg); } @@ -1038,7 +1038,7 @@ void THD::raise_error_printf(uint sql_errno, ...) va_end(args); (void) raise_condition(sql_errno, NULL, - MYSQL_ERROR::WARN_LEVEL_ERROR, + Sql_condition::WARN_LEVEL_ERROR, ebuff); DBUG_VOID_RETURN; } @@ -1048,7 +1048,7 @@ void THD::raise_warning(uint sql_errno) const char* msg= ER(sql_errno); (void) raise_condition(sql_errno, NULL, - MYSQL_ERROR::WARN_LEVEL_WARN, + Sql_condition::WARN_LEVEL_WARN, msg); } @@ -1064,7 +1064,7 @@ void THD::raise_warning_printf(uint sql_errno, ...) va_end(args); (void) raise_condition(sql_errno, NULL, - MYSQL_ERROR::WARN_LEVEL_WARN, + Sql_condition::WARN_LEVEL_WARN, ebuff); DBUG_VOID_RETURN; } @@ -1078,7 +1078,7 @@ void THD::raise_note(uint sql_errno) const char* msg= ER(sql_errno); (void) raise_condition(sql_errno, NULL, - MYSQL_ERROR::WARN_LEVEL_NOTE, + Sql_condition::WARN_LEVEL_NOTE, msg); DBUG_VOID_RETURN; } @@ -1097,24 +1097,25 @@ void THD::raise_note_printf(uint sql_errno, ...) va_end(args); (void) raise_condition(sql_errno, NULL, - MYSQL_ERROR::WARN_LEVEL_NOTE, + Sql_condition::WARN_LEVEL_NOTE, ebuff); DBUG_VOID_RETURN; } -MYSQL_ERROR* THD::raise_condition(uint sql_errno, +Sql_condition* THD::raise_condition(uint sql_errno, const char* sqlstate, - MYSQL_ERROR::enum_warning_level level, + Sql_condition::enum_warning_level level, const char* msg) { - MYSQL_ERROR *cond= NULL; + Diagnostics_area *da= get_stmt_da(); + Sql_condition *cond= NULL; DBUG_ENTER("THD::raise_condition"); if (!(variables.option_bits & OPTION_SQL_NOTES) && - (level == MYSQL_ERROR::WARN_LEVEL_NOTE)) + (level == Sql_condition::WARN_LEVEL_NOTE)) DBUG_RETURN(NULL); - warning_info->opt_clear_warning_info(query_id); + da->opt_clear_warning_info(query_id); /* TODO: replace by DBUG_ASSERT(sql_errno != 0) once all bugs similar to @@ -1128,24 +1129,24 @@ MYSQL_ERROR* THD::raise_condition(uint sql_errno, if (sqlstate == NULL) sqlstate= mysql_errno_to_sqlstate(sql_errno); - if ((level == MYSQL_ERROR::WARN_LEVEL_WARN) && + if ((level == Sql_condition::WARN_LEVEL_WARN) && really_abort_on_warning()) { /* FIXME: push_warning and strict SQL_MODE case. */ - level= MYSQL_ERROR::WARN_LEVEL_ERROR; + level= Sql_condition::WARN_LEVEL_ERROR; killed= KILL_BAD_DATA; } switch (level) { - case MYSQL_ERROR::WARN_LEVEL_NOTE: - case MYSQL_ERROR::WARN_LEVEL_WARN: + case Sql_condition::WARN_LEVEL_NOTE: + case Sql_condition::WARN_LEVEL_WARN: got_warning= 1; break; - case MYSQL_ERROR::WARN_LEVEL_ERROR: + case Sql_condition::WARN_LEVEL_ERROR: break; default: DBUG_ASSERT(FALSE); @@ -1154,16 +1155,16 @@ MYSQL_ERROR* THD::raise_condition(uint sql_errno, if (handle_condition(sql_errno, sqlstate, level, msg, &cond)) DBUG_RETURN(cond); - if (level == MYSQL_ERROR::WARN_LEVEL_ERROR) + if (level == Sql_condition::WARN_LEVEL_ERROR) { mysql_audit_general(this, MYSQL_AUDIT_GENERAL_ERROR, sql_errno, msg); is_slave_error= 1; // needed to catch query errors during replication - if (! stmt_da->is_error()) + if (!da->is_error()) { set_row_count_func(-1); - stmt_da->set_error_status(this, sql_errno, msg, sqlstate); + da->set_error_status(sql_errno, msg, sqlstate, cond); } } @@ -1177,7 +1178,7 @@ MYSQL_ERROR* THD::raise_condition(uint sql_errno, if (!(is_fatal_error && (sql_errno == EE_OUTOFMEMORY || sql_errno == ER_OUTOFMEMORY))) { - cond= warning_info->push_warning(this, sql_errno, sqlstate, level, msg); + cond= da->push_warning(this, sql_errno, sqlstate, level, msg); } DBUG_RETURN(cond); } @@ -1211,8 +1212,8 @@ LEX_STRING *thd_make_lex_string(THD *thd, LEX_STRING *lex_str, const char *str, unsigned int size, int allocate_lex_string) { - return thd->make_lex_string(lex_str, str, size, - (bool) allocate_lex_string); + return allocate_lex_string ? thd->make_lex_string(str, size) + : thd->make_lex_string(lex_str, str, size); } extern "C" @@ -1227,6 +1228,26 @@ void thd_get_xid(const MYSQL_THD thd, MYSQL_XID *xid) *xid = *(MYSQL_XID *) &thd->transaction.xid_state.xid; } + +extern "C" +my_time_t thd_TIME_to_gmt_sec(MYSQL_THD thd, const MYSQL_TIME *ltime, + unsigned int *errcode) +{ + Time_zone *tz= thd ? thd->variables.time_zone : + global_system_variables.time_zone; + return tz->TIME_to_gmt_sec(ltime, errcode); +} + + +extern "C" +void thd_gmt_sec_to_TIME(MYSQL_THD thd, MYSQL_TIME *ltime, my_time_t t) +{ + Time_zone *tz= thd ? thd->variables.time_zone : + global_system_variables.time_zone; + tz->gmt_sec_to_TIME(ltime, t); +} + + #ifdef _WIN32 extern "C" THD *_current_thd_noinline(void) { @@ -1239,6 +1260,7 @@ extern "C" THD *_current_thd_noinline(void) void THD::init(void) { + DBUG_ENTER("thd::init"); mysql_mutex_lock(&LOCK_global_system_variables); plugin_thdvar_init(this); /* @@ -1247,7 +1269,14 @@ void THD::init(void) avoid temporary tables replication failure. */ variables.pseudo_thread_id= thread_id; + + variables.default_master_connection.str= default_master_connection_buff; + ::strmake(variables.default_master_connection.str, + global_system_variables.default_master_connection.str, + variables.default_master_connection.length); + mysql_mutex_unlock(&LOCK_global_system_variables); + server_status= SERVER_STATUS_AUTOCOMMIT; if (variables.sql_mode & MODE_NO_BACKSLASH_ESCAPES) server_status|= SERVER_STATUS_NO_BACKSLASH_ESCAPES; @@ -1261,11 +1290,14 @@ void THD::init(void) TL_WRITE_LOW_PRIORITY : TL_WRITE); tx_isolation= (enum_tx_isolation) variables.tx_isolation; + tx_read_only= variables.tx_read_only; update_charset(); reset_current_stmt_binlog_format_row(); - bzero((char *) &status_var, sizeof(status_var)); + reset_binlog_local_stmt_filter(); + set_status_var_init(); bzero((char *) &org_status_var, sizeof(org_status_var)); start_bytes_received= 0; + last_commit_gtid.seq_no= 0; status_in_global= 0; if (variables.sql_log_bin) @@ -1281,6 +1313,8 @@ void THD::init(void) /* Initialize the Debug Sync Facility. See debug_sync.cc. */ debug_sync_init_thread(this); #endif /* defined(ENABLED_DEBUG_SYNC) */ + apc_target.init(&LOCK_thd_data); + DBUG_VOID_RETURN; } @@ -1419,8 +1453,6 @@ void THD::cleanup(void) if (global_read_lock.is_acquired()) global_read_lock.unlock_global_read_lock(this); - /* All metadata locks must have been released by now. */ - DBUG_ASSERT(!mdl_context.has_locks()); if (user_connect) { decrease_user_connections(user_connect); @@ -1437,14 +1469,11 @@ void THD::cleanup(void) sp_cache_clear(&sp_proc_cache); sp_cache_clear(&sp_func_cache); - if (ull) - { - mysql_mutex_lock(&LOCK_user_locks); - item_user_lock_release(ull); - mysql_mutex_unlock(&LOCK_user_locks); - ull= NULL; - } + mysql_ull_cleanup(this); + /* All metadata locks must have been released by now. */ + DBUG_ASSERT(!mdl_context.has_locks()); + apc_target.destroy(); cleanup_done=1; DBUG_VOID_RETURN; } @@ -1452,8 +1481,16 @@ void THD::cleanup(void) THD::~THD() { + THD *orig_thd= current_thd; THD_CHECK_SENTRY(this); DBUG_ENTER("~THD()"); + + /* + In error cases, thd may not be current thd. We have to fix this so + that memory allocation counting is done correctly + */ + set_current_thd(this); + /* Ensure that no one is using THD */ mysql_mutex_lock(&LOCK_thd_data); mysql_mutex_unlock(&LOCK_thd_data); @@ -1461,10 +1498,8 @@ THD::~THD() /* Close connection */ #ifndef EMBEDDED_LIBRARY if (net.vio) - { vio_delete(net.vio); - net_end(&net); - } + net_end(&net); #endif stmt_map.reset(); /* close all prepared statements */ if (!cleanup_done) @@ -1475,7 +1510,6 @@ THD::~THD() mysql_audit_release(this); plugin_thdvar_cleanup(this); - DBUG_PRINT("info", ("freeing security context")); main_security_ctx.destroy(); my_free(db); db= NULL; @@ -1487,6 +1521,11 @@ THD::~THD() dbug_sentry= THD_SENTRY_GONE; #endif #ifndef EMBEDDED_LIBRARY + if (rgi_fake) + { + delete rgi_fake; + rgi_fake= NULL; + } if (rli_fake) { delete rli_fake; @@ -1494,13 +1533,23 @@ THD::~THD() } mysql_audit_free_thd(this); - if (rli_slave) - rli_slave->cleanup_after_session(); + if (rgi_slave) + rgi_slave->cleanup_after_session(); my_free(semisync_info); unregister_slave(this, true, true); #endif free_root(&main_mem_root, MYF(0)); + my_free(m_token_array); + main_da.free_memory(); + if (status_var.memory_used != 0) + { + DBUG_PRINT("error", ("memory_used: %lld", status_var.memory_used)); + SAFEMALLOC_REPORT_MEMORY(my_thread_dbug_id()); + DBUG_ASSERT(status_var.memory_used == 0); // Ensure everything is freed + } + + set_current_thd(orig_thd == this ? 0 : orig_thd); DBUG_VOID_RETURN; } @@ -1598,7 +1647,8 @@ void add_diff_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var, void THD::awake(killed_state state_to_set) { DBUG_ENTER("THD::awake"); - DBUG_PRINT("enter", ("this: %p current_thd: %p", this, current_thd)); + DBUG_PRINT("enter", ("this: %p current_thd: %p state: %d", + this, current_thd, (int) state_to_set)); THD_CHECK_SENTRY(this); mysql_mutex_assert_owner(&LOCK_thd_data); @@ -1727,6 +1777,63 @@ void THD::disconnect() } +bool THD::notify_shared_lock(MDL_context_owner *ctx_in_use, + bool needs_thr_lock_abort) +{ + THD *in_use= ctx_in_use->get_thd(); + bool signalled= FALSE; + DBUG_ENTER("THD::notify_shared_lock"); + DBUG_PRINT("enter",("needs_thr_lock_abort: %d", needs_thr_lock_abort)); + + if ((in_use->system_thread & SYSTEM_THREAD_DELAYED_INSERT) && + !in_use->killed) + { + /* This code is similar to kill_delayed_threads() */ + DBUG_PRINT("info", ("kill delayed thread")); + mysql_mutex_lock(&in_use->LOCK_thd_data); + if (in_use->killed < KILL_CONNECTION) + in_use->killed= KILL_CONNECTION; + if (in_use->mysys_var) + { + mysql_mutex_lock(&in_use->mysys_var->mutex); + if (in_use->mysys_var->current_cond) + mysql_cond_broadcast(in_use->mysys_var->current_cond); + + /* Abort if about to wait in thr_upgrade_write_delay_lock */ + in_use->mysys_var->abort= 1; + mysql_mutex_unlock(&in_use->mysys_var->mutex); + } + mysql_mutex_unlock(&in_use->LOCK_thd_data); + signalled= TRUE; + } + + if (needs_thr_lock_abort) + { + mysql_mutex_lock(&in_use->LOCK_thd_data); + /* If not already dying */ + if (in_use->killed != KILL_CONNECTION_HARD) + { + for (TABLE *thd_table= in_use->open_tables; + thd_table ; + thd_table= thd_table->next) + { + /* + Check for TABLE::needs_reopen() is needed since in some + places we call handler::close() for table instance (and set + TABLE::db_stat to 0) and do not remove such instances from + the THD::open_tables for some time, during which other + thread can see those instances (e.g. see partitioning code). + */ + if (!thd_table->needs_reopen()) + signalled|= mysql_lock_abort_for_thread(this, thd_table); + } + } + mysql_mutex_unlock(&in_use->LOCK_thd_data); + } + DBUG_RETURN(signalled); +} + + /* Get error number for killed state Note that the error message can't have any parameters. @@ -1776,7 +1883,7 @@ bool THD::store_globals() */ DBUG_ASSERT(thread_stack); - if (my_pthread_setspecific_ptr(THR_THD, this) || + if (set_current_thd(this) || my_pthread_setspecific_ptr(THR_MALLOC, &mem_root)) return 1; /* @@ -1820,7 +1927,7 @@ void THD::reset_globals() mysql_mutex_unlock(&LOCK_thd_data); /* Undocking the thread specific data. */ - my_pthread_setspecific_ptr(THR_THD, NULL); + set_current_thd(0); my_pthread_setspecific_ptr(THR_MALLOC, NULL); } @@ -1873,10 +1980,18 @@ void THD::cleanup_after_query() which is intended to consume its event (there can be other SET statements between them). */ - if ((rli_slave || rli_fake) && is_update_query(lex->sql_command)) + if ((rgi_slave || rli_fake) && is_update_query(lex->sql_command)) auto_inc_intervals_forced.empty(); #endif } + /* + Forget the binlog stmt filter for the next query. + There are some code paths that: + - do not call THD::decide_logging_format() + - do call THD::binlog_query(), + making this reset necessary. + */ + reset_binlog_local_stmt_filter(); if (first_successful_insert_id_in_cur_stmt > 0) { /* set what LAST_INSERT_ID() will return */ @@ -1892,41 +2007,17 @@ void THD::cleanup_after_query() where= THD::DEFAULT_WHERE; /* reset table map for multi-table update */ table_map_for_update= 0; - m_binlog_invoker= FALSE; + m_binlog_invoker= INVOKER_NONE; #ifndef EMBEDDED_LIBRARY - if (rli_slave) - rli_slave->cleanup_after_query(); + if (rgi_slave) + rgi_slave->cleanup_after_query(); #endif DBUG_VOID_RETURN; } -/** - Create a LEX_STRING in this connection. - - @param lex_str pointer to LEX_STRING object to be initialized - @param str initializer to be copied into lex_str - @param length length of str, in bytes - @param allocate_lex_string if TRUE, allocate new LEX_STRING object, - instead of using lex_str value - @return NULL on failure, or pointer to the LEX_STRING object -*/ -LEX_STRING *THD::make_lex_string(LEX_STRING *lex_str, - const char* str, uint length, - bool allocate_lex_string) -{ - if (allocate_lex_string) - if (!(lex_str= (LEX_STRING *)alloc_root(mem_root, sizeof(LEX_STRING)))) - return 0; - if (!(lex_str->str= strmake_root(mem_root, str, length))) - return 0; - lex_str->length= length; - return lex_str; -} - - /* Convert a string to another character set @@ -2114,6 +2205,21 @@ CHANGED_TABLE_LIST* THD::changed_table_dup(const char *key, long key_length) int THD::send_explain_fields(select_result *result) { List<Item> field_list; + make_explain_field_list(field_list); + result->prepare(field_list, NULL); + return (result->send_result_set_metadata(field_list, + Protocol::SEND_NUM_ROWS | + Protocol::SEND_EOF)); +} + + +/* + Populate the provided field_list with EXPLAIN output columns. + this->lex->describe has the EXPLAIN flags +*/ + +void THD::make_explain_field_list(List<Item> &field_list) +{ Item *item; CHARSET_INFO *cs= system_charset_info; field_list.push_back(item= new Item_return_int("id",3, MYSQL_TYPE_LONGLONG)); @@ -2152,10 +2258,9 @@ int THD::send_explain_fields(select_result *result) } item->maybe_null= 1; field_list.push_back(new Item_empty_string("Extra", 255, cs)); - return (result->send_result_set_metadata(field_list, - Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)); } + #ifdef SIGNAL_WITH_VIO_CLOSE void THD::close_active_vio() { @@ -2279,12 +2384,6 @@ select_result::select_result() thd=current_thd; } -void select_result::send_error(uint errcode,const char *err) -{ - my_message(errcode, err, MYF(0)); -} - - void select_result::cleanup() { /* do nothing */ @@ -2369,7 +2468,8 @@ int select_send::send_data(List<Item> &items) Protocol *protocol= thd->protocol; DBUG_ENTER("select_send::send_data"); - if (unit->offset_limit_cnt) + /* unit is not set when using 'delete ... returning' */ + if (unit && unit->offset_limit_cnt) { // using limit offset,count unit->offset_limit_cnt--; DBUG_RETURN(FALSE); @@ -2391,7 +2491,7 @@ int select_send::send_data(List<Item> &items) DBUG_RETURN(TRUE); } - thd->sent_row_count++; + thd->inc_sent_row_count(1); if (thd->vio_ok()) DBUG_RETURN(protocol->write()); @@ -2399,6 +2499,7 @@ int select_send::send_data(List<Item> &items) DBUG_RETURN(0); } + bool select_send::send_eof() { /* @@ -2424,23 +2525,9 @@ bool select_send::send_eof() Handling writing to file ************************************************************************/ -void select_to_file::send_error(uint errcode,const char *err) -{ - my_message(errcode, err, MYF(0)); - if (file > 0) - { - (void) end_io_cache(&cache); - mysql_file_close(file, MYF(0)); - /* Delete file on error */ - mysql_file_delete(key_select_to_file, path, MYF(0)); - file= -1; - } -} - - bool select_to_file::send_eof() { - int error= test(end_io_cache(&cache)); + int error= MY_TEST(end_io_cache(&cache)); if (mysql_file_close(file, MYF(MY_WME)) || thd->is_error()) error= true; @@ -2483,7 +2570,7 @@ select_to_file::~select_to_file() select_export::~select_export() { - thd->sent_row_count=row_count; + thd->set_sent_row_count(row_count); } @@ -2605,7 +2692,7 @@ select_export::prepare(List<Item> &list, SELECT_LEX_UNIT *u) Non-ASCII separator arguments are not fully supported */ - push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN, + push_warning(thd, Sql_condition::WARN_LEVEL_WARN, WARN_NON_ASCII_SEPARATOR_NOT_IMPLEMENTED, ER(WARN_NON_ASCII_SEPARATOR_NOT_IMPLEMENTED)); } @@ -2621,8 +2708,8 @@ select_export::prepare(List<Item> &list, SELECT_LEX_UNIT *u) escape_char= (int) (uchar) (*exchange->escaped)[0]; else escape_char= -1; - is_ambiguous_field_sep= test(strchr(ESCAPE_CHARS, field_sep_char)); - is_unsafe_field_sep= test(strchr(NUMERIC_CHARS, field_sep_char)); + is_ambiguous_field_sep= MY_TEST(strchr(ESCAPE_CHARS, field_sep_char)); + is_unsafe_field_sep= MY_TEST(strchr(NUMERIC_CHARS, field_sep_char)); line_sep_char= (exchange->line_term->length() ? (int) (uchar) (*exchange->line_term)[0] : INT_MAX); if (!field_term_length) @@ -2636,7 +2723,7 @@ select_export::prepare(List<Item> &list, SELECT_LEX_UNIT *u) (exchange->opt_enclosed && non_string_results && field_term_length && strchr(NUMERIC_CHARS, field_term_char))) { - push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN, + push_warning(thd, Sql_condition::WARN_LEVEL_WARN, ER_AMBIGUOUS_FIELD_TERM, ER(ER_AMBIGUOUS_FIELD_TERM)); is_ambiguous_field_term= TRUE; } @@ -2719,7 +2806,7 @@ int select_export::send_data(List<Item> &items) convert_to_printable(printable_buff, sizeof(printable_buff), error_pos, res->ptr() + res->length() - error_pos, res->charset(), 6); - push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, ER_TRUNCATED_WRONG_VALUE_FOR_FIELD, ER(ER_TRUNCATED_WRONG_VALUE_FOR_FIELD), "string", printable_buff, @@ -2730,7 +2817,7 @@ int select_export::send_data(List<Item> &items) /* result is longer than UINT_MAX32 and doesn't fit into String */ - push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, WARN_DATA_TRUNCATED, ER(WARN_DATA_TRUNCATED), item->full_name(), static_cast<long>(row_count)); } @@ -2765,7 +2852,7 @@ int select_export::send_data(List<Item> &items) else { if (fixed_row_size) - used_length=min(res->length(),item->max_length); + used_length=MY_MIN(res->length(),item->max_length); else used_length=res->length(); if ((result_type == STRING_RESULT || is_unsafe_field_sep) && @@ -3263,6 +3350,10 @@ void THD::end_statement() } +/* + Start using arena specified by @set. Current arena data will be saved to + *backup. +*/ void THD::set_n_backup_active_arena(Query_arena *set, Query_arena *backup) { DBUG_ENTER("THD::set_n_backup_active_arena"); @@ -3277,6 +3368,12 @@ void THD::set_n_backup_active_arena(Query_arena *set, Query_arena *backup) } +/* + Stop using the temporary arena, and start again using the arena that is + specified in *backup. + The temporary arena is returned back into *set. +*/ + void THD::restore_active_arena(Query_arena *set, Query_arena *backup) { DBUG_ENTER("THD::restore_active_arena"); @@ -3496,7 +3593,7 @@ int select_dumpvar::send_data(List<Item> &items) bool select_dumpvar::send_eof() { if (! row_count) - push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN, + push_warning(thd, Sql_condition::WARN_LEVEL_WARN, ER_SP_FETCH_NO_DATA, ER(ER_SP_FETCH_NO_DATA)); /* Don't send EOF if we're in error condition (which implies we've already @@ -3635,6 +3732,12 @@ void thd_increment_bytes_sent(ulong length) } } +my_bool thd_net_is_killed() +{ + THD *thd= current_thd; + return thd && thd->killed ? 1 : 0; +} + void thd_increment_bytes_received(ulong length) { @@ -3650,7 +3753,8 @@ void thd_increment_net_big_packet_count(ulong length) void THD::set_status_var_init() { - bzero((char*) &status_var, sizeof(status_var)); + bzero((char*) &status_var, offsetof(STATUS_VAR, + last_cleared_system_status_var)); } @@ -3658,7 +3762,7 @@ void Security_context::init() { host= user= ip= external_user= 0; host_or_ip= "connecting host"; - priv_user[0]= priv_host[0]= proxy_user[0]= '\0'; + priv_user[0]= priv_host[0]= proxy_user[0]= priv_role[0]= '\0'; master_access= 0; #ifndef NO_EMBEDDED_ACCESS_CHECKS db_access= NO_ACCESS; @@ -3668,6 +3772,7 @@ void Security_context::init() void Security_context::destroy() { + DBUG_PRINT("info", ("freeing security context")); // If not pointer to constant if (host != my_localhost) { @@ -3855,12 +3960,7 @@ void THD::restore_backup_open_tables_state(Open_tables_backup *backup) #undef thd_killed extern "C" int thd_killed(const MYSQL_THD thd) { - if (!thd) - thd= current_thd; - - if (!(thd->killed & KILL_HARD_BIT)) - return 0; - return thd->killed != 0; + return thd_kill_level(thd) > THD_ABORT_SOFTLY; } #else #error now thd_killed() function can go away @@ -3872,8 +3972,17 @@ extern "C" int thd_killed(const MYSQL_THD thd) */ extern "C" enum thd_kill_levels thd_kill_level(const MYSQL_THD thd) { + THD* current= current_thd; + if (!thd) - thd= current_thd; + thd= current; + + if (thd == current) + { + Apc_target *apc_target= (Apc_target*)&thd->apc_target; + if (apc_target->have_apc_requests()) + apc_target->process_apc_requests(); + } if (likely(thd->killed == NOT_KILLED)) return THD_IS_NOT_KILLED; @@ -3881,6 +3990,7 @@ extern "C" enum thd_kill_levels thd_kill_level(const MYSQL_THD thd) return thd->killed & KILL_HARD_BIT ? THD_ABORT_ASAP : THD_ABORT_SOFTLY; } + /** Send an out-of-band progress report to the client @@ -3888,6 +3998,10 @@ extern "C" enum thd_kill_levels thd_kill_level(const MYSQL_THD thd) however not more often than global.progress_report_time. If global.progress_report_time is 0, then don't send progress reports, but check every second if the value has changed + + We clear any errors that we get from sending the progress packet to + the client as we don't want to set an error without the caller knowing + about it. */ static void thd_send_progress(THD *thd) @@ -3896,7 +4010,7 @@ static void thd_send_progress(THD *thd) ulonglong report_time= my_interval_timer(); if (report_time > thd->progress.next_report_time) { - uint seconds_to_next= max(thd->variables.progress_report_time, + uint seconds_to_next= MY_MAX(thd->variables.progress_report_time, global_system_variables.progress_report_time); if (seconds_to_next == 0) // Turned off seconds_to_next= 1; // Check again after 1 second @@ -3904,8 +4018,12 @@ static void thd_send_progress(THD *thd) thd->progress.next_report_time= (report_time + seconds_to_next * 1000000000ULL); if (global_system_variables.progress_report_time && - thd->variables.progress_report_time) + thd->variables.progress_report_time && !thd->is_error()) + { net_send_progress_packet(thd); + if (thd->is_error()) + thd->clear_error(); + } } } @@ -4012,11 +4130,15 @@ extern "C" unsigned long thd_get_thread_id(const MYSQL_THD thd) return((unsigned long)thd->thread_id); } -extern "C" enum_tx_isolation thd_get_trx_isolation(const MYSQL_THD thd) +/** + Check if THD socket is still connected. + */ +extern "C" int thd_is_connected(MYSQL_THD thd) { - return thd->tx_isolation; + return thd->is_connected(); } + #ifdef INNODB_COMPATIBILITY_HOOKS extern "C" const struct charset_info_st *thd_charset(MYSQL_THD thd) { @@ -4048,6 +4170,230 @@ extern "C" int thd_slave_thread(const MYSQL_THD thd) return(thd->slave_thread); } +/* Returns true for a worker thread in parallel replication. */ +extern "C" int thd_rpl_is_parallel(const MYSQL_THD thd) +{ + return thd->rgi_slave && thd->rgi_slave->is_parallel_exec; +} + +/* + This function can optionally be called to check if thd_report_wait_for() + needs to be called for waits done by a given transaction. + + If this function returns false for a given thd, there is no need to do any + calls to thd_report_wait_for() on that thd. + + This call is optional; it is safe to call thd_report_wait_for() in any case. + This call can be used to save some redundant calls to thd_report_wait_for() + if desired. (This is unlikely to matter much unless there are _lots_ of + waits to report, as the overhead of thd_report_wait_for() is small). +*/ +extern "C" int +thd_need_wait_for(const MYSQL_THD thd) +{ + rpl_group_info *rgi; + + if (mysql_bin_log.is_open() && opt_binlog_commit_wait_count > 0) + return true; + if (!thd) + return false; + rgi= thd->rgi_slave; + if (!rgi) + return false; + return rgi->is_parallel_exec; +} + +/* + Used by InnoDB/XtraDB to report that one transaction THD is about to go to + wait for a transactional lock held by another transactions OTHER_THD. + + This is used for parallel replication, where transactions are required to + commit in the same order on the slave as they did on the master. If the + transactions on the slave encounters lock conflicts on the slave that did + not exist on the master, this can cause deadlocks. + + Normally, such conflicts will not occur, because the same conflict would + have prevented the two transactions from committing in parallel on the + master, thus preventing them from running in parallel on the slave in the + first place. However, it is possible in case when the optimizer chooses a + different plan on the slave than on the master (eg. table scan instead of + index scan). + + InnoDB/XtraDB reports lock waits using this call. If a lock wait causes a + deadlock with the pre-determined commit order, we kill the later transaction, + and later re-try it, to resolve the deadlock. + + This call need only receive reports about waits for locks that will remain + until the holding transaction commits. InnoDB/XtraDB auto-increment locks + are released earlier, and so need not be reported. (Such false positives are + not harmful, but could lead to unnecessary kill and retry, so best avoided). +*/ +extern "C" void +thd_report_wait_for(MYSQL_THD thd, MYSQL_THD other_thd) +{ + rpl_group_info *rgi; + rpl_group_info *other_rgi; + + if (!thd || !other_thd) + return; + binlog_report_wait_for(thd, other_thd); + rgi= thd->rgi_slave; + other_rgi= other_thd->rgi_slave; + if (!rgi || !other_rgi) + return; + if (!rgi->is_parallel_exec) + return; + if (rgi->rli != other_rgi->rli) + return; + if (!rgi->gtid_sub_id || !other_rgi->gtid_sub_id) + return; + if (rgi->current_gtid.domain_id != other_rgi->current_gtid.domain_id) + return; + if (rgi->gtid_sub_id > other_rgi->gtid_sub_id) + return; + /* + This transaction is about to wait for another transaction that is required + by replication binlog order to commit after. This would cause a deadlock. + + So send a kill to the other transaction, with a temporary error; this will + cause replication to rollback (and later re-try) the other transaction, + releasing the lock for this transaction so replication can proceed. + */ + other_rgi->killed_for_retry= true; + mysql_mutex_lock(&other_thd->LOCK_thd_data); + other_thd->awake(KILL_CONNECTION); + mysql_mutex_unlock(&other_thd->LOCK_thd_data); +} + +/* + This function is called from InnoDB/XtraDB to check if the commit order of + two transactions has already been decided by the upper layer. This happens + in parallel replication, where the commit order is forced to be the same on + the slave as it was originally on the master. + + If this function returns false, it means that such commit order will be + enforced. This allows the storage engine to optionally omit gap lock waits + or similar measures that would otherwise be needed to ensure that + transactions would be serialised in a way that would cause a commit order + that is correct for binlogging for statement-based replication. + + Since transactions are only run in parallel on the slave if they ran without + lock conflicts on the master, normally no lock conflicts on the slave happen + during parallel replication. However, there are a couple of corner cases + where it can happen, like these secondary-index operations: + + T1: INSERT INTO t1 VALUES (7, NULL); + T2: DELETE FROM t1 WHERE b <= 3; + + T1: UPDATE t1 SET secondary=NULL WHERE primary=1 + T2: DELETE t1 WHERE secondary <= 3 + + The DELETE takes a gap lock that can block the INSERT/UPDATE, but the row + locks set by INSERT/UPDATE do not block the DELETE. Thus, the execution + order of the transactions determine whether a lock conflict occurs or + not. Thus a lock conflict can occur on the slave where it did not on the + master. + + If this function returns true, normal locking should be done as required by + the binlogging and transaction isolation level in effect. But if it returns + false, the correct order will be enforced anyway, and InnoDB/XtraDB can + avoid taking the gap lock, preventing the lock conflict. + + Calling this function is just an optimisation to avoid unnecessary + deadlocks. If it was not used, a gap lock would be set that could eventually + cause a deadlock; the deadlock would be caught by thd_report_wait_for() and + the transaction T2 killed and rolled back (and later re-tried). +*/ +extern "C" int +thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd) +{ + rpl_group_info *rgi, *other_rgi; + + DBUG_EXECUTE_IF("disable_thd_need_ordering_with", return 1;); + if (!thd || !other_thd) + return 1; + rgi= thd->rgi_slave; + other_rgi= other_thd->rgi_slave; + if (!rgi || !other_rgi) + return 1; + if (!rgi->is_parallel_exec) + return 1; + if (rgi->rli != other_rgi->rli) + return 1; + if (rgi->current_gtid.domain_id != other_rgi->current_gtid.domain_id) + return 1; + if (!rgi->commit_id || rgi->commit_id != other_rgi->commit_id) + return 1; + DBUG_EXECUTE_IF("thd_need_ordering_with_force", return 1;); + /* + Otherwise, these two threads are doing parallel replication within the same + replication domain. Their commit order is already fixed, so we do not need + gap locks or similar to otherwise enforce ordering (and in fact such locks + could lead to unnecessary deadlocks and transaction retry). + */ + return 0; +} + + +/* + If the storage engine detects a deadlock, and needs to choose a victim + transaction to roll back, it can call this function to ask the upper + server layer for which of two possible transactions is prefered to be + aborted and rolled back. + + In parallel replication, if two transactions are running in parallel and + one is fixed to commit before the other, then the one that commits later + will be prefered as the victim - chosing the early transaction as a victim + will not resolve the deadlock anyway, as the later transaction still needs + to wait for the earlier to commit. + + Otherwise, a transaction that uses only transactional tables, and can thus + be safely rolled back, will be prefered as a deadlock victim over a + transaction that also modified non-transactional (eg. MyISAM) tables. + + The return value is -1 if the first transaction is prefered as a deadlock + victim, 1 if the second transaction is prefered, or 0 for no preference (in + which case the storage engine can make the choice as it prefers). +*/ +extern "C" int +thd_deadlock_victim_preference(const MYSQL_THD thd1, const MYSQL_THD thd2) +{ + rpl_group_info *rgi1, *rgi2; + bool nontrans1, nontrans2; + + if (!thd1 || !thd2) + return 0; + + /* + If the transactions are participating in the same replication domain in + parallel replication, then request to select the one that will commit + later (in the fixed commit order from the master) as the deadlock victim. + */ + rgi1= thd1->rgi_slave; + rgi2= thd2->rgi_slave; + if (rgi1 && rgi2 && + rgi1->is_parallel_exec && + rgi1->rli == rgi2->rli && + rgi1->current_gtid.domain_id == rgi2->current_gtid.domain_id) + return rgi1->gtid_sub_id < rgi2->gtid_sub_id ? 1 : -1; + + /* + If one transaction has modified non-transactional tables (so that it + cannot be safely rolled back), and the other has not, then prefer to + select the purely transactional one as the victim. + */ + nontrans1= thd1->transaction.all.modified_non_trans_table; + nontrans2= thd2->transaction.all.modified_non_trans_table; + if (nontrans1 && !nontrans2) + return 1; + else if (!nontrans1 && nontrans2) + return -1; + + /* No preferences, let the storage engine decide. */ + return 0; +} + + extern "C" int thd_non_transactional_update(const MYSQL_THD thd) { return(thd->transaction.all.modified_non_trans_table); @@ -4072,11 +4418,56 @@ extern "C" bool thd_binlog_filter_ok(const MYSQL_THD thd) return binlog_filter->db_ok(thd->db); } +/* + This is similar to sqlcom_can_generate_row_events, with the expection + that we only return 1 if we are going to generate row events in a + transaction. + CREATE OR REPLACE is always safe to do as this will run in it's own + transaction. +*/ + extern "C" bool thd_sqlcom_can_generate_row_events(const MYSQL_THD thd) { - return sqlcom_can_generate_row_events(thd); + return (sqlcom_can_generate_row_events(thd) && thd->lex->sql_command != + SQLCOM_CREATE_TABLE); +} + + +extern "C" enum durability_properties thd_get_durability_property(const MYSQL_THD thd) +{ + enum durability_properties ret= HA_REGULAR_DURABILITY; + + if (thd != NULL) + ret= thd->durability_property; + + return ret; } +/** Get the auto_increment_offset auto_increment_increment. +Exposed by thd_autoinc_service. +Needed by InnoDB. +@param thd Thread object +@param off auto_increment_offset +@param inc auto_increment_increment */ +extern "C" void thd_get_autoinc(const MYSQL_THD thd, ulong* off, ulong* inc) +{ + *off = thd->variables.auto_increment_offset; + *inc = thd->variables.auto_increment_increment; +} + + +/** + Is strict sql_mode set. + Needed by InnoDB. + @param thd Thread object + @return True if sql_mode has strict mode (all or trans). + @retval true sql_mode has strict mode (all or trans). + @retval false sql_mode has not strict mode (all or trans). +*/ +extern "C" bool thd_is_strict_mode(const MYSQL_THD thd) +{ + return thd->is_strict_mode(); +} /* @@ -4182,8 +4573,8 @@ void THD::reset_sub_statement_state(Sub_statement_state *backup, backup->enable_slow_log= enable_slow_log; backup->query_plan_flags= query_plan_flags; backup->limit_found_rows= limit_found_rows; - backup->examined_row_count= examined_row_count; - backup->sent_row_count= sent_row_count; + backup->examined_row_count= m_examined_row_count; + backup->sent_row_count= m_sent_row_count; backup->cuted_fields= cuted_fields; backup->client_capabilities= client_capabilities; backup->savepoints= transaction.savepoints; @@ -4206,8 +4597,8 @@ void THD::reset_sub_statement_state(Sub_statement_state *backup, /* Disable result sets */ client_capabilities &= ~CLIENT_MULTI_RESULTS; in_sub_stmt|= new_state; - examined_row_count= 0; - sent_row_count= 0; + m_examined_row_count= 0; + m_sent_row_count= 0; cuted_fields= 0; transaction.savepoints= 0; first_successful_insert_id_in_cur_stmt= 0; @@ -4254,7 +4645,7 @@ void THD::restore_sub_statement_state(Sub_statement_state *backup) first_successful_insert_id_in_cur_stmt= backup->first_successful_insert_id_in_cur_stmt; limit_found_rows= backup->limit_found_rows; - sent_row_count= backup->sent_row_count; + set_sent_row_count(backup->sent_row_count); client_capabilities= backup->client_capabilities; /* If we've left sub-statement mode, reset the fatal error flag. @@ -4275,7 +4666,7 @@ void THD::restore_sub_statement_state(Sub_statement_state *backup) The following is added to the old values as we are interested in the total complexity of the query */ - examined_row_count+= backup->examined_row_count; + inc_examined_row_count(backup->examined_row_count); cuted_fields+= backup->cuted_fields; DBUG_VOID_RETURN; } @@ -4288,6 +4679,141 @@ void THD::set_statement(Statement *stmt) mysql_mutex_unlock(&LOCK_thd_data); } +void THD::set_sent_row_count(ha_rows count) +{ + m_sent_row_count= count; + MYSQL_SET_STATEMENT_ROWS_SENT(m_statement_psi, m_sent_row_count); +} + +void THD::set_examined_row_count(ha_rows count) +{ + m_examined_row_count= count; + MYSQL_SET_STATEMENT_ROWS_EXAMINED(m_statement_psi, m_examined_row_count); +} + +void THD::inc_sent_row_count(ha_rows count) +{ + m_sent_row_count+= count; + MYSQL_SET_STATEMENT_ROWS_SENT(m_statement_psi, m_sent_row_count); +} + +void THD::inc_examined_row_count(ha_rows count) +{ + m_examined_row_count+= count; + MYSQL_SET_STATEMENT_ROWS_EXAMINED(m_statement_psi, m_examined_row_count); +} + +void THD::inc_status_created_tmp_disk_tables() +{ + status_var_increment(status_var.created_tmp_disk_tables_); +#ifdef HAVE_PSI_STATEMENT_INTERFACE + PSI_STATEMENT_CALL(inc_statement_created_tmp_disk_tables)(m_statement_psi, 1); +#endif +} + +void THD::inc_status_created_tmp_tables() +{ + status_var_increment(status_var.created_tmp_tables_); +#ifdef HAVE_PSI_STATEMENT_INTERFACE + PSI_STATEMENT_CALL(inc_statement_created_tmp_tables)(m_statement_psi, 1); +#endif +} + +void THD::inc_status_select_full_join() +{ + status_var_increment(status_var.select_full_join_count_); +#ifdef HAVE_PSI_STATEMENT_INTERFACE + PSI_STATEMENT_CALL(inc_statement_select_full_join)(m_statement_psi, 1); +#endif +} + +void THD::inc_status_select_full_range_join() +{ + status_var_increment(status_var.select_full_range_join_count_); +#ifdef HAVE_PSI_STATEMENT_INTERFACE + PSI_STATEMENT_CALL(inc_statement_select_full_range_join)(m_statement_psi, 1); +#endif +} + +void THD::inc_status_select_range() +{ + status_var_increment(status_var.select_range_count_); +#ifdef HAVE_PSI_STATEMENT_INTERFACE + PSI_STATEMENT_CALL(inc_statement_select_range)(m_statement_psi, 1); +#endif +} + +void THD::inc_status_select_range_check() +{ + status_var_increment(status_var.select_range_check_count_); +#ifdef HAVE_PSI_STATEMENT_INTERFACE + PSI_STATEMENT_CALL(inc_statement_select_range_check)(m_statement_psi, 1); +#endif +} + +void THD::inc_status_select_scan() +{ + status_var_increment(status_var.select_scan_count_); +#ifdef HAVE_PSI_STATEMENT_INTERFACE + PSI_STATEMENT_CALL(inc_statement_select_scan)(m_statement_psi, 1); +#endif +} + +void THD::inc_status_sort_merge_passes() +{ + status_var_increment(status_var.filesort_merge_passes_); +#ifdef HAVE_PSI_STATEMENT_INTERFACE + PSI_STATEMENT_CALL(inc_statement_sort_merge_passes)(m_statement_psi, 1); +#endif +} + +void THD::inc_status_sort_range() +{ + status_var_increment(status_var.filesort_range_count_); +#ifdef HAVE_PSI_STATEMENT_INTERFACE + PSI_STATEMENT_CALL(inc_statement_sort_range)(m_statement_psi, 1); +#endif +} + +void THD::inc_status_sort_rows(ha_rows count) +{ + statistic_add(status_var.filesort_rows_, count, &LOCK_status); +#ifdef HAVE_PSI_STATEMENT_INTERFACE + PSI_STATEMENT_CALL(inc_statement_sort_rows)(m_statement_psi, count); +#endif +} + +void THD::inc_status_sort_scan() +{ + status_var_increment(status_var.filesort_scan_count_); +#ifdef HAVE_PSI_STATEMENT_INTERFACE + PSI_STATEMENT_CALL(inc_statement_sort_scan)(m_statement_psi, 1); +#endif +} + +void THD::set_status_no_index_used() +{ + server_status|= SERVER_QUERY_NO_INDEX_USED; +#ifdef HAVE_PSI_STATEMENT_INTERFACE + PSI_STATEMENT_CALL(set_statement_no_index_used)(m_statement_psi); +#endif +} + +void THD::set_status_no_good_index_used() +{ + server_status|= SERVER_QUERY_NO_GOOD_INDEX_USED; +#ifdef HAVE_PSI_STATEMENT_INTERFACE + PSI_STATEMENT_CALL(set_statement_no_good_index_used)(m_statement_psi); +#endif +} + +void THD::set_command(enum enum_server_command command) +{ + m_command= command; +#ifdef HAVE_PSI_THREAD_INTERFACE + PSI_STATEMENT_CALL(set_thread_command)(m_command); +#endif +} /** Assign a new value to thd->query. */ @@ -4296,6 +4822,10 @@ void THD::set_query(const CSET_STRING &string_arg) mysql_mutex_lock(&LOCK_thd_data); set_query_inner(string_arg); mysql_mutex_unlock(&LOCK_thd_data); + +#ifdef HAVE_PSI_THREAD_INTERFACE + PSI_THREAD_CALL(set_thread_info)(query(), query_length()); +#endif } /** Assign a new value to thd->query and thd->query_id. */ @@ -4306,17 +4836,8 @@ void THD::set_query_and_id(char *query_arg, uint32 query_length_arg, { mysql_mutex_lock(&LOCK_thd_data); set_query_inner(query_arg, query_length_arg, cs); - query_id= new_query_id; mysql_mutex_unlock(&LOCK_thd_data); -} - -/** Assign a new value to thd->query_id. */ - -void THD::set_query_id(query_id_t new_query_id) -{ - mysql_mutex_lock(&LOCK_thd_data); query_id= new_query_id; - mysql_mutex_unlock(&LOCK_thd_data); } /** Assign a new value to thd->mysys_var. */ @@ -4350,13 +4871,15 @@ void THD::leave_locked_tables_mode() /* Also ensure that we don't release metadata locks for open HANDLERs. */ if (handler_tables_hash.records) mysql_ha_set_explicit_lock_duration(this); + if (ull_hash.records) + mysql_ull_set_explicit_lock_duration(this); } locked_tables_mode= LTM_NONE; } -void THD::get_definer(LEX_USER *definer) +void THD::get_definer(LEX_USER *definer, bool role) { - binlog_invoker(); + binlog_invoker(role); #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) if (slave_thread && has_invoker()) { @@ -4368,7 +4891,7 @@ void THD::get_definer(LEX_USER *definer) } else #endif - get_default_definer(this, definer); + get_default_definer(this, definer, role); } @@ -4561,7 +5084,8 @@ has_write_table_with_auto_increment_and_select(TABLE_LIST *tables) for(TABLE_LIST *table= tables; table; table= table->next_global) { if (!table->placeholder() && - (table->lock_type <= TL_READ_NO_INSERT)) + table->lock_type <= TL_READ_NO_INSERT && + table->prelocking_placeholder != TABLE_LIST::FK) { has_select= true; break; @@ -4667,16 +5191,13 @@ has_write_table_auto_increment_not_first_in_pk(TABLE_LIST *tables) BINLOG_FORMAT = STATEMENT and at least one table uses a storage engine limited to row-logging. - 6. Error: Cannot execute row injection: binlogging impossible since - BINLOG_FORMAT = STATEMENT. - - 7. Warning: Unsafe statement binlogged in statement format since + 6. Warning: Unsafe statement binlogged in statement format since BINLOG_FORMAT = STATEMENT. In addition, we can produce the following error (not depending on the variables of the decision diagram): - 8. Error: Cannot execute statement: binlogging impossible since more + 7. Error: Cannot execute statement: binlogging impossible since more than one engine is involved and at least one engine is self-logging. @@ -4703,6 +5224,8 @@ int THD::decide_logging_format(TABLE_LIST *tables) DBUG_PRINT("info", ("lex->get_stmt_unsafe_flags(): 0x%x", lex->get_stmt_unsafe_flags())); + reset_binlog_local_stmt_filter(); + /* We should not decide logging format if the binlog is closed or binlogging is off, or if the statement is filtered out from the @@ -4745,6 +5268,28 @@ int THD::decide_logging_format(TABLE_LIST *tables) A pointer to a previous table that was accessed. */ TABLE* prev_access_table= NULL; + /** + The number of tables used in the current statement, + that should be replicated. + */ + uint replicated_tables_count= 0; + /** + The number of tables written to in the current statement, + that should not be replicated. + A table should not be replicated when it is considered + 'local' to a MySQL instance. + Currently, these tables are: + - mysql.slow_log + - mysql.general_log + - mysql.slave_relay_log_info + - mysql.slave_master_info + - mysql.slave_worker_info + - performance_schema.* + - TODO: information_schema.* + In practice, from this list, only performance_schema.* tables + are written to by user queries. + */ + uint non_replicated_tables_count= 0; #ifndef DBUG_OFF { @@ -4794,14 +5339,38 @@ int THD::decide_logging_format(TABLE_LIST *tables) if (table->placeholder()) continue; - if (table->table->s->table_category == TABLE_CATEGORY_PERFORMANCE || - table->table->s->table_category == TABLE_CATEGORY_LOG) - lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_TABLE); - handler::Table_flags const flags= table->table->file->ha_table_flags(); DBUG_PRINT("info", ("table: %s; ha_table_flags: 0x%llx", table->table_name, flags)); + + if (table->table->no_replicate) + { + /* + The statement uses a table that is not replicated. + The following properties about the table: + - persistent / transient + - transactional / non transactional + - temporary / permanent + - read or write + - multiple engines involved because of this table + are not relevant, as this table is completely ignored. + Because the statement uses a non replicated table, + using STATEMENT format in the binlog is impossible. + Either this statement will be discarded entirely, + or it will be logged (possibly partially) in ROW format. + */ + lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_TABLE); + + if (table->lock_type >= TL_WRITE_ALLOW_WRITE) + { + non_replicated_tables_count++; + continue; + } + } + + replicated_tables_count++; + if (table->lock_type >= TL_WRITE_ALLOW_WRITE) { if (prev_write_table && prev_write_table->file->ht != @@ -4823,32 +5392,12 @@ int THD::decide_logging_format(TABLE_LIST *tables) prev_write_table= table->table; - /* - INSERT...ON DUPLICATE KEY UPDATE on a table with more than one unique keys - can be unsafe. Check for it if the flag is already not marked for the - given statement. - */ - if (!lex->is_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_TWO_KEYS) && - lex->sql_command == SQLCOM_INSERT && - /* Duplicate key update is not supported by INSERT DELAYED */ - command != COM_DELAYED_INSERT && lex->duplicates == DUP_UPDATE) - { - uint keys= table->table->s->keys, i= 0, unique_keys= 0; - for (KEY* keyinfo= table->table->s->key_info; - i < keys && unique_keys <= 1; i++, keyinfo++) - { - if (keyinfo->flags & HA_NOSAME) - unique_keys++; - } - if (unique_keys > 1 ) - lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_TWO_KEYS); - } } flags_access_some_set |= flags; if (lex->sql_command != SQLCOM_CREATE_TABLE || (lex->sql_command == SQLCOM_CREATE_TABLE && - (lex->create_info.options & HA_LEX_CREATE_TMP_TABLE))) + lex->create_info.tmp_table())) { my_bool trans= table->table->file->has_transactions(); @@ -4954,10 +5503,10 @@ int THD::decide_logging_format(TABLE_LIST *tables) if (lex->is_stmt_row_injection()) { /* - 6. Error: Cannot execute row injection since - BINLOG_FORMAT = STATEMENT + We have to log the statement as row or give an error. + Better to accept what master gives us than stopping replication. */ - my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_MODE), MYF(0)); + set_current_stmt_binlog_format_row(); } else if ((flags_write_all_set & HA_BINLOG_STMT_CAPABLE) == 0 && sqlcom_can_generate_row_events(this)) @@ -4982,7 +5531,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) DBUG_PRINT("info", ("binlog_unsafe_warning_flags: 0x%x", binlog_unsafe_warning_flags)); } - /* log in statement format! */ + /* log in statement format (or row if row event)! */ } /* No statement-only engines and binlog_format != STATEMENT. I.e., nothing prevents us from row logging if needed. */ @@ -4997,6 +5546,30 @@ int THD::decide_logging_format(TABLE_LIST *tables) } } + if (non_replicated_tables_count > 0) + { + if ((replicated_tables_count == 0) || ! is_write) + { + DBUG_PRINT("info", ("decision: no logging, no replicated table affected")); + set_binlog_local_stmt_filter(); + } + else + { + if (! is_current_stmt_binlog_format_row()) + { + my_error((error= ER_BINLOG_STMT_MODE_AND_NO_REPL_TABLES), MYF(0)); + } + else + { + clear_binlog_local_stmt_filter(); + } + } + } + else + { + clear_binlog_local_stmt_filter(); + } + if (error) { DBUG_PRINT("info", ("decision: no logging since an error was generated")); DBUG_RETURN(-1); @@ -5035,7 +5608,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) Replace the last ',' with '.' for table_names */ table_names.replace(table_names.length()-1, 1, ".", 1); - push_warning_printf(this, MYSQL_ERROR::WARN_LEVEL_WARN, + push_warning_printf(this, Sql_condition::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR, "Row events are not logged for %s statements " "that modify BLACKHOLE tables in row format. " @@ -5104,7 +5677,11 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id, DBUG_ASSERT(table->s->table_map_id != ~0UL); /* Fetch the type code for the RowsEventT template parameter */ - int const type_code= RowsEventT::TYPE_CODE; + int const general_type_code= RowsEventT::TYPE_CODE; + + /* Ensure that all events in a GTID group are in the same cache */ + if (variables.option_bits & OPTION_GTID_BEGIN) + is_transactional= 1; /* There is no good place to set up the transactional data, so we @@ -5131,7 +5708,7 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id, if (!pending || pending->server_id != serv_id || pending->get_table_id() != table->s->table_map_id || - pending->get_type_code() != type_code || + pending->get_general_type_code() != general_type_code || pending->get_data_size() + needed > opt_binlog_rows_event_max_size || pending->get_width() != colcnt || !bitmap_cmp(pending->get_cols(), cols)) @@ -5160,27 +5737,6 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id, DBUG_RETURN(pending); /* This is the current pending event */ } -#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION -/* - Instantiate the versions we need, we have -fno-implicit-template as - compiling option. -*/ -template Rows_log_event* -THD::binlog_prepare_pending_rows_event(TABLE*, uint32, MY_BITMAP const*, - size_t, size_t, bool, - Write_rows_log_event*); - -template Rows_log_event* -THD::binlog_prepare_pending_rows_event(TABLE*, uint32, MY_BITMAP const*, - size_t colcnt, size_t, bool, - Delete_rows_log_event *); - -template Rows_log_event* -THD::binlog_prepare_pending_rows_event(TABLE*, uint32, MY_BITMAP const*, - size_t colcnt, size_t, bool, - Update_rows_log_event *); -#endif - /* Declare in unnamed namespace. */ CPP_UNNAMED_NS_START /** @@ -5322,8 +5878,12 @@ int THD::binlog_write_row(TABLE* table, bool is_trans, size_t const len= pack_row(table, cols, row_data, record); + /* Ensure that all events in a GTID group are in the same cache */ + if (variables.option_bits & OPTION_GTID_BEGIN) + is_trans= 1; + Rows_log_event* const ev= - binlog_prepare_pending_rows_event(table, server_id, cols, colcnt, + binlog_prepare_pending_rows_event(table, variables.server_id, cols, colcnt, len, is_trans, static_cast<Write_rows_log_event*>(0)); @@ -5355,6 +5915,10 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, size_t const after_size= pack_row(table, cols, after_row, after_record); + /* Ensure that all events in a GTID group are in the same cache */ + if (variables.option_bits & OPTION_GTID_BEGIN) + is_trans= 1; + /* Don't print debug messages when running valgrind since they can trigger false warnings. @@ -5367,7 +5931,7 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, #endif Rows_log_event* const ev= - binlog_prepare_pending_rows_event(table, server_id, cols, colcnt, + binlog_prepare_pending_rows_event(table, variables.server_id, cols, colcnt, before_size + after_size, is_trans, static_cast<Update_rows_log_event*>(0)); @@ -5397,8 +5961,12 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans, size_t const len= pack_row(table, cols, row_data, record); + /* Ensure that all events in a GTID group are in the same cache */ + if (variables.option_bits & OPTION_GTID_BEGIN) + is_trans= 1; + Rows_log_event* const ev= - binlog_prepare_pending_rows_event(table, server_id, cols, colcnt, + binlog_prepare_pending_rows_event(table, variables.server_id, cols, colcnt, len, is_trans, static_cast<Delete_rows_log_event*>(0)); @@ -5417,6 +5985,10 @@ int THD::binlog_remove_pending_rows_event(bool clear_maps, if (!mysql_bin_log.is_open()) DBUG_RETURN(0); + /* Ensure that all events in a GTID group are in the same cache */ + if (variables.option_bits & OPTION_GTID_BEGIN) + is_transactional= 1; + mysql_bin_log.remove_pending_rows_event(this, is_transactional); if (clear_maps) @@ -5436,6 +6008,10 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional) if (!mysql_bin_log.is_open()) DBUG_RETURN(0); + /* Ensure that all events in a GTID group are in the same cache */ + if (variables.option_bits & OPTION_GTID_BEGIN) + is_transactional= 1; + /* Mark the event as the last event of a statement if the stmt_end flag is set. @@ -5480,23 +6056,35 @@ show_query_type(THD::enum_binlog_query_type qtype) Constants required for the limit unsafe warnings suppression */ //seconds after which the limit unsafe warnings suppression will be activated -#define LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT 50 +#define LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT 5*60 //number of limit unsafe warnings after which the suppression will be activated -#define LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT 50 +#define LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT 10 -static ulonglong limit_unsafe_suppression_start_time= 0; -static bool unsafe_warning_suppression_is_activated= false; -static int limit_unsafe_warning_count= 0; +static ulonglong unsafe_suppression_start_time= 0; +static bool unsafe_warning_suppression_active[LEX::BINLOG_STMT_UNSAFE_COUNT]; +static ulong unsafe_warnings_count[LEX::BINLOG_STMT_UNSAFE_COUNT]; +static ulong total_unsafe_warnings_count; /** Auxiliary function to reset the limit unsafety warning suppression. + This is done without mutex protection, but this should be good + enough as it doesn't matter if we loose a couple of suppressed + messages or if this is called multiple times. */ -static void reset_binlog_unsafe_suppression() + +static void reset_binlog_unsafe_suppression(ulonglong now) { + uint i; DBUG_ENTER("reset_binlog_unsafe_suppression"); - unsafe_warning_suppression_is_activated= false; - limit_unsafe_warning_count= 0; - limit_unsafe_suppression_start_time= my_interval_timer()/10000000; + + unsafe_suppression_start_time= now; + total_unsafe_warnings_count= 0; + + for (i= 0 ; i < LEX::BINLOG_STMT_UNSAFE_COUNT ; i++) + { + unsafe_warnings_count[i]= 0; + unsafe_warning_suppression_active[i]= 0; + } DBUG_VOID_RETURN; } @@ -5514,95 +6102,94 @@ static void print_unsafe_warning_to_log(int unsafe_type, char* buf, } /** - Auxiliary function to check if the warning for limit unsafety should be - thrown or suppressed. Details of the implementation can be found in the - comments inline. + Auxiliary function to check if the warning for unsafe repliction statements + should be thrown or suppressed. + + Logic is: + - If we get more than LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT errors + of one type, that type of errors will be suppressed for + LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT. + - When the time limit has been reached, all suppression is reset. + + This means that if one gets many different types of errors, some of them + may be reset less than LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT. However at + least one error is disable for this time. + SYNOPSIS: @params - buf - buffer to hold the warning message text unsafe_type - The type of unsafety. - query - The actual query statement. - TODO: Remove this function and implement a general service for all warnings - that would prevent flooding the error log. + RETURN: + 0 0k to log + 1 Message suppressed */ -static void do_unsafe_limit_checkout(char* buf, int unsafe_type, char* query) + +static bool protect_against_unsafe_warning_flood(int unsafe_type) { - ulonglong now= 0; - DBUG_ENTER("do_unsafe_limit_checkout"); - DBUG_ASSERT(unsafe_type == LEX::BINLOG_STMT_UNSAFE_LIMIT); - limit_unsafe_warning_count++; + ulong count; + ulonglong now= my_interval_timer()/1000000000ULL; + DBUG_ENTER("protect_against_unsafe_warning_flood"); + + count= ++unsafe_warnings_count[unsafe_type]; + total_unsafe_warnings_count++; + /* INITIALIZING: If this is the first time this function is called with log warning enabled, the monitoring the unsafe warnings should start. */ - if (limit_unsafe_suppression_start_time == 0) + if (unsafe_suppression_start_time == 0) { - limit_unsafe_suppression_start_time= my_interval_timer()/10000000; - print_unsafe_warning_to_log(unsafe_type, buf, query); + reset_binlog_unsafe_suppression(now); + DBUG_RETURN(0); } - else + + /* + The following is true if we got too many errors or if the error was + already suppressed + */ + if (count >= LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT) { - if (!unsafe_warning_suppression_is_activated) - print_unsafe_warning_to_log(unsafe_type, buf, query); + ulonglong diff_time= (now - unsafe_suppression_start_time); - if (limit_unsafe_warning_count >= - LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT) + if (!unsafe_warning_suppression_active[unsafe_type]) { - now= my_interval_timer()/10000000; - if (!unsafe_warning_suppression_is_activated) + /* + ACTIVATION: + We got LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT warnings in + less than LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT we activate the + suppression. + */ + if (diff_time <= LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT) { - /* - ACTIVATION: - We got LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT warnings in - less than LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT we activate the - suppression. - */ - if ((now-limit_unsafe_suppression_start_time) <= - LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT) - { - unsafe_warning_suppression_is_activated= true; - DBUG_PRINT("info",("A warning flood has been detected and the limit \ -unsafety warning suppression has been activated.")); - } - else - { - /* - there is no flooding till now, therefore we restart the monitoring - */ - limit_unsafe_suppression_start_time= my_interval_timer()/10000000; - limit_unsafe_warning_count= 0; - } + unsafe_warning_suppression_active[unsafe_type]= 1; + sql_print_information("Suppressing warnings of type '%s' for up to %d seconds because of flooding", + ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type]), + LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT); } else { /* - Print the suppression note and the unsafe warning. + There is no flooding till now, therefore we restart the monitoring */ - sql_print_information("The following warning was suppressed %d times \ -during the last %d seconds in the error log", - limit_unsafe_warning_count, - (int) - (now-limit_unsafe_suppression_start_time)); - print_unsafe_warning_to_log(unsafe_type, buf, query); - /* - DEACTIVATION: We got LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT - warnings in more than LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT, the - suppression should be deactivated. - */ - if ((now - limit_unsafe_suppression_start_time) > - LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT) - { - reset_binlog_unsafe_suppression(); - DBUG_PRINT("info",("The limit unsafety warning supression has been \ -deactivated")); - } + reset_binlog_unsafe_suppression(now); + } + } + else + { + /* This type of warnings was suppressed */ + if (diff_time > LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT) + { + ulong save_count= total_unsafe_warnings_count; + /* Print a suppression note and remove the suppression */ + reset_binlog_unsafe_suppression(now); + sql_print_information("Suppressed %lu unsafe warnings during " + "the last %d seconds", + save_count, (int) diff_time); } - limit_unsafe_warning_count= 0; } } - DBUG_VOID_RETURN; + DBUG_RETURN(unsafe_warning_suppression_active[unsafe_type]); } /** @@ -5614,6 +6201,7 @@ deactivated")); void THD::issue_unsafe_warnings() { char buf[MYSQL_ERRMSG_SIZE * 2]; + uint32 unsafe_type_flags; DBUG_ENTER("issue_unsafe_warnings"); /* Ensure that binlog_unsafe_warning_flags is big enough to hold all @@ -5621,8 +6209,10 @@ void THD::issue_unsafe_warnings() */ DBUG_ASSERT(LEX::BINLOG_STMT_UNSAFE_COUNT <= sizeof(binlog_unsafe_warning_flags) * CHAR_BIT); - - uint32 unsafe_type_flags= binlog_unsafe_warning_flags; + + if (!(unsafe_type_flags= binlog_unsafe_warning_flags)) + DBUG_VOID_RETURN; // Nothing to do + /* For each unsafe_type, check if the statement is unsafe in this way and issue a warning. @@ -5633,17 +6223,13 @@ void THD::issue_unsafe_warnings() { if ((unsafe_type_flags & (1 << unsafe_type)) != 0) { - push_warning_printf(this, MYSQL_ERROR::WARN_LEVEL_NOTE, + push_warning_printf(this, Sql_condition::WARN_LEVEL_NOTE, ER_BINLOG_UNSAFE_STATEMENT, ER(ER_BINLOG_UNSAFE_STATEMENT), ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type])); - if (global_system_variables.log_warnings) - { - if (unsafe_type == LEX::BINLOG_STMT_UNSAFE_LIMIT) - do_unsafe_limit_checkout( buf, unsafe_type, query()); - else //cases other than LIMIT unsafety - print_unsafe_warning_to_log(unsafe_type, buf, query()); - } + if (global_system_variables.log_warnings > 0 && + !protect_against_unsafe_warning_flood(unsafe_type)) + print_unsafe_warning_to_log(unsafe_type, buf, query()); } } DBUG_VOID_RETURN; @@ -5683,6 +6269,23 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, show_query_type(qtype), (int) query_len, query_arg)); DBUG_ASSERT(query_arg && mysql_bin_log.is_open()); + /* If this is withing a BEGIN ... COMMIT group, don't log it */ + if (variables.option_bits & OPTION_GTID_BEGIN) + { + direct= 0; + is_trans= 1; + } + DBUG_PRINT("info", ("is_trans: %d direct: %d", is_trans, direct)); + + if (get_binlog_local_stmt_filter() == BINLOG_FILTER_SET) + { + /* + The current statement is to be ignored, and not written to + the binlog. Do not call issue_unsafe_warnings(). + */ + DBUG_RETURN(0); + } + /* If we are not in prelocked mode, mysql_unlock_tables() will be called after this binlog_query(), so we have to flush the pending @@ -5794,6 +6397,349 @@ THD::signal_wakeup_ready() } +void THD::rgi_lock_temporary_tables() +{ + mysql_mutex_lock(&rgi_slave->rli->data_lock); + temporary_tables= rgi_slave->rli->save_temporary_tables; +} + +void THD::rgi_unlock_temporary_tables(bool clear) +{ + rgi_slave->rli->save_temporary_tables= temporary_tables; + mysql_mutex_unlock(&rgi_slave->rli->data_lock); + if (clear) + { + /* + Temporary tables are shared with other by sql execution threads. + As a safety messure, clear the pointer to the common area. + */ + temporary_tables= 0; + } +} + +bool THD::rgi_have_temporary_tables() +{ + return rgi_slave->rli->save_temporary_tables != 0; +} + + +void +wait_for_commit::reinit() +{ + subsequent_commits_list= NULL; + next_subsequent_commit= NULL; + waitee= NULL; + opaque_pointer= NULL; + wakeup_error= 0; + wakeup_subsequent_commits_running= false; + commit_started= false; +#ifdef SAFE_MUTEX + /* + When using SAFE_MUTEX, the ordering between taking the LOCK_wait_commit + mutexes is checked. This causes a problem when we re-use a mutex, as then + the expected locking order may change. + + So in this case, do a re-init of the mutex. In release builds, we want to + avoid the overhead of a re-init though. + + To ensure that no one is locking the mutex, we take a lock of it first. + For full explanation, see wait_for_commit::~wait_for_commit() + */ + mysql_mutex_lock(&LOCK_wait_commit); + mysql_mutex_unlock(&LOCK_wait_commit); + + mysql_mutex_destroy(&LOCK_wait_commit); + mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, MY_MUTEX_INIT_FAST); +#endif +} + + +wait_for_commit::wait_for_commit() +{ + mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wait_commit, &COND_wait_commit, 0); + reinit(); +} + + +wait_for_commit::~wait_for_commit() +{ + /* + Since we do a dirty read of the waiting_for_commit flag in + wait_for_prior_commit() and in unregister_wait_for_prior_commit(), we need + to take extra care before freeing the wait_for_commit object. + + It is possible for the waitee to be pre-empted inside wakeup(), just after + it has cleared the waiting_for_commit flag and before it has released the + LOCK_wait_commit mutex. And then it is possible for the waiter to find the + flag cleared in wait_for_prior_commit() and go finish up things and + de-allocate the LOCK_wait_commit and COND_wait_commit objects before the + waitee has time to be re-scheduled and finish unlocking the mutex and + signalling the condition. This would lead to the waitee accessing no + longer valid memory. + + To prevent this, we do an extra lock/unlock of the mutex here before + deallocation; this makes certain that any waitee has completed wakeup() + first. + */ + mysql_mutex_lock(&LOCK_wait_commit); + mysql_mutex_unlock(&LOCK_wait_commit); + + mysql_mutex_destroy(&LOCK_wait_commit); + mysql_cond_destroy(&COND_wait_commit); +} + + +void +wait_for_commit::wakeup(int wakeup_error) +{ + /* + We signal each waiter on their own condition and mutex (rather than using + pthread_cond_broadcast() or something like that). + + Otherwise we would need to somehow ensure that they were done + waking up before we could allow this THD to be destroyed, which would + be annoying and unnecessary. + + Note that wakeup_subsequent_commits2() depends on this function being a + full memory barrier (it is, because it takes a mutex lock). + + */ + mysql_mutex_lock(&LOCK_wait_commit); + waitee= NULL; + this->wakeup_error= wakeup_error; + /* + Note that it is critical that the mysql_cond_signal() here is done while + still holding the mutex. As soon as we release the mutex, the waiter might + deallocate the condition object. + */ + mysql_cond_signal(&COND_wait_commit); + mysql_mutex_unlock(&LOCK_wait_commit); +} + + +/* + Register that the next commit of this THD should wait to complete until + commit in another THD (the waitee) has completed. + + The wait may occur explicitly, with the waiter sitting in + wait_for_prior_commit() until the waitee calls wakeup_subsequent_commits(). + + Alternatively, the TC (eg. binlog) may do the commits of both waitee and + waiter at once during group commit, resolving both of them in the right + order. + + Only one waitee can be registered for a waiter; it must be removed by + wait_for_prior_commit() or unregister_wait_for_prior_commit() before a new + one is registered. But it is ok for several waiters to register a wait for + the same waitee. It is also permissible for one THD to be both a waiter and + a waitee at the same time. +*/ +void +wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee) +{ + DBUG_ASSERT(!this->waitee /* No prior registration allowed */); + wakeup_error= 0; + this->waitee= waitee; + + mysql_mutex_lock(&waitee->LOCK_wait_commit); + /* + If waitee is in the middle of wakeup, then there is nothing to wait for, + so we need not register. This is necessary to avoid a race in unregister, + see comments on wakeup_subsequent_commits2() for details. + */ + if (waitee->wakeup_subsequent_commits_running) + this->waitee= NULL; + else + { + /* + Put ourself at the head of the waitee's list of transactions that must + wait for it to commit first. + */ + this->next_subsequent_commit= waitee->subsequent_commits_list; + waitee->subsequent_commits_list= this; + } + mysql_mutex_unlock(&waitee->LOCK_wait_commit); +} + + +/* + Wait for commit of another transaction to complete, as already registered + with register_wait_for_prior_commit(). If the commit already completed, + returns immediately. +*/ +int +wait_for_commit::wait_for_prior_commit2(THD *thd) +{ + PSI_stage_info old_stage; + wait_for_commit *loc_waitee; + + mysql_mutex_lock(&LOCK_wait_commit); + DEBUG_SYNC(thd, "wait_for_prior_commit_waiting"); + thd->ENTER_COND(&COND_wait_commit, &LOCK_wait_commit, + &stage_waiting_for_prior_transaction_to_commit, + &old_stage); + while ((loc_waitee= this->waitee) && !thd->check_killed()) + mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit); + if (!loc_waitee) + { + if (wakeup_error) + my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); + goto end; + } + /* + Wait was interrupted by kill. We need to unregister our wait and give the + error. But if a wakeup is already in progress, then we must ignore the + kill and not give error, otherwise we get inconsistency between waitee and + waiter as to whether we succeed or fail (eg. we may roll back but waitee + might attempt to commit both us and any subsequent commits waiting for us). + */ + mysql_mutex_lock(&loc_waitee->LOCK_wait_commit); + if (loc_waitee->wakeup_subsequent_commits_running) + { + /* We are being woken up; ignore the kill and just wait. */ + mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); + do + { + mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit); + } while (this->waitee); + if (wakeup_error) + my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); + goto end; + } + remove_from_list(&loc_waitee->subsequent_commits_list); + mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); + this->waitee= NULL; + + wakeup_error= thd->killed_errno(); + if (!wakeup_error) + wakeup_error= ER_QUERY_INTERRUPTED; + my_message(wakeup_error, ER(wakeup_error), MYF(0)); + thd->EXIT_COND(&old_stage); + /* + Must do the DEBUG_SYNC() _after_ exit_cond(), as DEBUG_SYNC is not safe to + use within enter_cond/exit_cond. + */ + DEBUG_SYNC(thd, "wait_for_prior_commit_killed"); + return wakeup_error; + +end: + thd->EXIT_COND(&old_stage); + return wakeup_error; +} + + +/* + Wakeup anyone waiting for us to have committed. + + Note about locking: + + We have a potential race or deadlock between wakeup_subsequent_commits() in + the waitee and unregister_wait_for_prior_commit() in the waiter. + + Both waiter and waitee needs to take their own lock before it is safe to take + a lock on the other party - else the other party might disappear and invalid + memory data could be accessed. But if we take the two locks in different + order, we may end up in a deadlock. + + The waiter needs to lock the waitee to delete itself from the list in + unregister_wait_for_prior_commit(). Thus wakeup_subsequent_commits() can not + hold its own lock while locking waiters, as this could lead to deadlock. + + So we need to prevent unregister_wait_for_prior_commit() running while wakeup + is in progress - otherwise the unregister could complete before the wakeup, + leading to incorrect spurious wakeup or accessing invalid memory. + + However, if we are in the middle of running wakeup_subsequent_commits(), then + there is no need for unregister_wait_for_prior_commit() in the first place - + the waiter can just do a normal wait_for_prior_commit(), as it will be + immediately woken up. + + So the solution to the potential race/deadlock is to set a flag in the waitee + that wakeup_subsequent_commits() is in progress. When this flag is set, + unregister_wait_for_prior_commit() becomes just wait_for_prior_commit(). + + Then also register_wait_for_prior_commit() needs to check if + wakeup_subsequent_commits() is running, and skip the registration if + so. This is needed in case a new waiter manages to register itself and + immediately try to unregister while wakeup_subsequent_commits() is + running. Else the new waiter would also wait rather than unregister, but it + would not be woken up until next wakeup, which could be potentially much + later than necessary. +*/ + +void +wait_for_commit::wakeup_subsequent_commits2(int wakeup_error) +{ + wait_for_commit *waiter; + + mysql_mutex_lock(&LOCK_wait_commit); + wakeup_subsequent_commits_running= true; + waiter= subsequent_commits_list; + subsequent_commits_list= NULL; + mysql_mutex_unlock(&LOCK_wait_commit); + + while (waiter) + { + /* + Important: we must grab the next pointer before waking up the waiter; + once the wakeup is done, the field could be invalidated at any time. + */ + wait_for_commit *next= waiter->next_subsequent_commit; + waiter->wakeup(wakeup_error); + waiter= next; + } + + /* + We need a full memory barrier between walking the list above, and clearing + the flag wakeup_subsequent_commits_running below. This barrier is needed + to ensure that no other thread will start to modify the list pointers + before we are done traversing the list. + + But wait_for_commit::wakeup() does a full memory barrier already (it locks + a mutex), so no extra explicit barrier is needed here. + */ + wakeup_subsequent_commits_running= false; + DBUG_EXECUTE_IF("inject_wakeup_subsequent_commits_sleep", my_sleep(21000);); +} + + +/* Cancel a previously registered wait for another THD to commit before us. */ +void +wait_for_commit::unregister_wait_for_prior_commit2() +{ + wait_for_commit *loc_waitee; + + mysql_mutex_lock(&LOCK_wait_commit); + if ((loc_waitee= this->waitee)) + { + mysql_mutex_lock(&loc_waitee->LOCK_wait_commit); + if (loc_waitee->wakeup_subsequent_commits_running) + { + /* + When a wakeup is running, we cannot safely remove ourselves from the + list without corrupting it. Instead we can just wait, as wakeup is + already in progress and will thus be immediate. + + See comments on wakeup_subsequent_commits2() for more details. + */ + mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); + while (this->waitee) + mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit); + } + else + { + /* Remove ourselves from the list in the waitee. */ + remove_from_list(&loc_waitee->subsequent_commits_list); + mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); + this->waitee= NULL; + } + } + wakeup_error= 0; + mysql_mutex_unlock(&LOCK_wait_commit); +} + + bool Discrete_intervals_list::append(ulonglong start, ulonglong val, ulonglong incr) { |