diff options
Diffstat (limited to 'sql/sql_class.cc')
-rw-r--r-- | sql/sql_class.cc | 1443 |
1 files changed, 850 insertions, 593 deletions
diff --git a/sql/sql_class.cc b/sql/sql_class.cc index c7b55aa9b68..2ebcea2d6f4 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -63,11 +63,14 @@ #include "sql_parse.h" // is_update_query #include "sql_callback.h" #include "lock.h" -#ifdef WITH_WSREP #include "wsrep_mysqld.h" #include "wsrep_thd.h" -#endif #include "sql_connect.h" +#include "my_atomic.h" + +#ifdef HAVE_SYS_SYSCALL_H +#include <sys/syscall.h> +#endif /* The following is used to initialise Table_ident with a internal @@ -112,13 +115,12 @@ bool Key_part_spec::operator==(const Key_part_spec& other) const */ Key::Key(const Key &rhs, MEM_ROOT *mem_root) - :type(rhs.type), + :DDL_options(rhs),type(rhs.type), key_create_info(rhs.key_create_info), columns(rhs.columns, mem_root), name(rhs.name), option_list(rhs.option_list), - generated(rhs.generated), - create_if_not_exists(rhs.create_if_not_exists) + generated(rhs.generated) { list_copy_and_replace_each_value(columns, mem_root); } @@ -331,7 +333,7 @@ void thd_set_psi(THD *thd, PSI_thread *psi) */ void thd_set_killed(THD *thd) { - thd->killed= KILL_CONNECTION; + thd->set_killed(KILL_CONNECTION); } /** @@ -368,16 +370,6 @@ void thd_close_connection(THD *thd) } /** - Get current THD object from thread local data - - @retval The THD object for the thread, NULL if not connection thread -*/ -THD *thd_get_current_thd() -{ - return current_thd; -} - -/** Lock data that needs protection in THD object @param thd THD object @@ -558,41 +550,11 @@ void set_thd_stage_info(void *thd_arg, if (thd == NULL) thd= current_thd; - thd->enter_stage(new_stage, old_stage, calling_func, calling_file, - calling_line); -} - -void THD::enter_stage(const PSI_stage_info *new_stage, - PSI_stage_info *old_stage, - const char *calling_func, - const char *calling_file, - const unsigned int calling_line) -{ - DBUG_PRINT("THD::enter_stage", ("%s:%d", calling_file, calling_line)); - - if (old_stage != NULL) - { - old_stage->m_key= m_current_stage_key; - old_stage->m_name= proc_info; - } - - if (new_stage != NULL) - { - const char *msg= new_stage->m_name; - -#if defined(ENABLED_PROFILING) - profiling.status_change(msg, calling_func, calling_file, calling_line); -#endif + if (old_stage) + thd->backup_stage(old_stage); - m_current_stage_key= new_stage->m_key; - proc_info= msg; - -#ifdef HAVE_PSI_THREAD_INTERFACE - PSI_THREAD_CALL(set_thread_state)(msg); - MYSQL_SET_STAGE(m_current_stage_key, calling_file, calling_line); -#endif - } - return; + if (new_stage) + thd->enter_stage(new_stage, calling_func, calling_file, calling_line); } void thd_enter_cond(MYSQL_THD thd, mysql_cond_t *cond, mysql_mutex_t *mutex, @@ -817,180 +779,6 @@ char *thd_get_error_context_description(THD *thd, char *buffer, return buffer; } -#ifdef WITH_WSREP -extern int wsrep_on(void *thd) -{ - return (int)(WSREP(((THD*)thd))); -} -extern "C" bool wsrep_thd_is_wsrep_on(THD *thd) -{ - return thd->variables.wsrep_on; -} - -extern "C" bool wsrep_consistency_check(void *thd) -{ - return ((THD*)thd)->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING; -} - -extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode) -{ - thd->wsrep_exec_mode= mode; -} -extern "C" void wsrep_thd_set_query_state( - THD *thd, enum wsrep_query_state state) -{ - thd->wsrep_query_state= state; -} -extern "C" void wsrep_thd_set_conflict_state( - THD *thd, enum wsrep_conflict_state state) -{ - if (WSREP(thd)) thd->wsrep_conflict_state= state; -} - - -extern "C" enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd) -{ - return thd->wsrep_exec_mode; -} - -extern "C" const char *wsrep_thd_exec_mode_str(THD *thd) -{ - return - (!thd) ? "void" : - (thd->wsrep_exec_mode == LOCAL_STATE) ? "local" : - (thd->wsrep_exec_mode == REPL_RECV) ? "applier" : - (thd->wsrep_exec_mode == TOTAL_ORDER) ? "total order" : - (thd->wsrep_exec_mode == LOCAL_COMMIT) ? "local commit" : "void"; -} - -extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd) -{ - return thd->wsrep_query_state; -} - -extern "C" const char *wsrep_thd_query_state_str(THD *thd) -{ - return - (!thd) ? "void" : - (thd->wsrep_query_state == QUERY_IDLE) ? "idle" : - (thd->wsrep_query_state == QUERY_EXEC) ? "executing" : - (thd->wsrep_query_state == QUERY_COMMITTING) ? "committing" : - (thd->wsrep_query_state == QUERY_EXITING) ? "exiting" : - (thd->wsrep_query_state == QUERY_ROLLINGBACK) ? "rolling back" : "void"; -} - -extern "C" enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd) -{ - return thd->wsrep_conflict_state; -} -extern "C" const char *wsrep_thd_conflict_state_str(THD *thd) -{ - return - (!thd) ? "void" : - (thd->wsrep_conflict_state == NO_CONFLICT) ? "no conflict" : - (thd->wsrep_conflict_state == MUST_ABORT) ? "must abort" : - (thd->wsrep_conflict_state == ABORTING) ? "aborting" : - (thd->wsrep_conflict_state == MUST_REPLAY) ? "must replay" : - (thd->wsrep_conflict_state == REPLAYING) ? "replaying" : - (thd->wsrep_conflict_state == RETRY_AUTOCOMMIT) ? "retrying" : - (thd->wsrep_conflict_state == CERT_FAILURE) ? "cert failure" : "void"; -} - -extern "C" wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd) -{ - return &thd->wsrep_ws_handle; -} - -extern "C" void wsrep_thd_LOCK(THD *thd) -{ - mysql_mutex_lock(&thd->LOCK_wsrep_thd); -} -extern "C" void wsrep_thd_UNLOCK(THD *thd) -{ - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); -} -extern "C" time_t wsrep_thd_query_start(THD *thd) -{ - return thd->query_start(); -} -extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd) -{ - return thd->wsrep_rand; -} -extern "C" my_thread_id wsrep_thd_thread_id(THD *thd) -{ - return thd->thread_id; -} -extern "C" wsrep_seqno_t wsrep_thd_trx_seqno(THD *thd) -{ - return (thd) ? thd->wsrep_trx_meta.gtid.seqno : WSREP_SEQNO_UNDEFINED; -} -extern "C" query_id_t wsrep_thd_query_id(THD *thd) -{ - return thd->query_id; -} -extern "C" char *wsrep_thd_query(THD *thd) -{ - return (thd) ? thd->query() : NULL; -} -extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd) -{ - return thd->wsrep_last_query_id; -} -extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id) -{ - thd->wsrep_last_query_id= id; -} -extern "C" void wsrep_thd_awake(THD *thd, my_bool signal) -{ - if (signal) - { - mysql_mutex_lock(&thd->LOCK_thd_data); - thd->awake(KILL_QUERY); - mysql_mutex_unlock(&thd->LOCK_thd_data); - } - else - { - mysql_mutex_lock(&LOCK_wsrep_replaying); - mysql_cond_broadcast(&COND_wsrep_replaying); - mysql_mutex_unlock(&LOCK_wsrep_replaying); - } -} -extern "C" int wsrep_thd_retry_counter(THD *thd) -{ - return(thd->wsrep_retry_counter); -} -extern "C" bool wsrep_thd_skip_append_keys(THD *thd) -{ - return thd->wsrep_skip_append_keys; -} - -extern int -wsrep_trx_order_before(void *thd1, void *thd2) -{ - if (wsrep_thd_trx_seqno((THD*)thd1) < wsrep_thd_trx_seqno((THD*)thd2)) { - WSREP_DEBUG("BF conflict, order: %lld %lld\n", - (long long)wsrep_thd_trx_seqno((THD*)thd1), - (long long)wsrep_thd_trx_seqno((THD*)thd2)); - return 1; - } - WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n", - (long long)wsrep_thd_trx_seqno((THD*)thd1), - (long long)wsrep_thd_trx_seqno((THD*)thd2)); - return 0; -} -extern "C" int -wsrep_trx_is_aborting(void *thd_ptr) -{ - if (thd_ptr) { - if ((((THD *)thd_ptr)->wsrep_conflict_state == MUST_ABORT) || - (((THD *)thd_ptr)->wsrep_conflict_state == ABORTING)) { - return 1; - } - } - return 0; -} -#endif #if MARIA_PLUGIN_INTERFACE_VERSION < 0x0200 /** @@ -1034,14 +822,28 @@ bool Drop_table_error_handler::handle_condition(THD *thd, } -#ifdef WITH_WSREP -THD::THD(bool is_applier) -#else -THD::THD() -#endif - :Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION, - /* statement id */ 0), +/** + Send timeout to thread. + + Note that this is always safe as the thread will always remove it's + timeouts at end of query (and thus before THD is destroyed) +*/ + +extern "C" void thd_kill_timeout(THD* thd) +{ + thd->status_var.max_statement_time_exceeded++; + mysql_mutex_lock(&thd->LOCK_thd_data); + /* Kill queries that can't cause data corruptions */ + thd->awake(KILL_TIMEOUT); + mysql_mutex_unlock(&thd->LOCK_thd_data); +} + + +THD::THD(bool is_wsrep_applier) + :Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION, + /* statement id */ 0), rli_fake(0), rgi_fake(0), rgi_slave(NULL), + protocol_text(this), protocol_binary(this), in_sub_stmt(0), log_all_errors(0), binlog_unsafe_warning_flags(0), binlog_table_maps(0), @@ -1057,6 +859,7 @@ THD::THD() m_statement_psi(NULL), m_idle_psi(NULL), thread_id(0), + os_thread_id(0), global_disable_checkpoint(0), failed_com_change_user(0), is_fatal_error(0), @@ -1074,33 +877,39 @@ THD::THD() debug_sync_control(0), #endif /* defined(ENABLED_DEBUG_SYNC) */ wait_for_commit_ptr(0), + m_internal_handler(0), main_da(0, false, false), - m_stmt_da(&main_da) + m_stmt_da(&main_da), + tdc_hash_pins(0), + xid_hash_pins(0) #ifdef WITH_WSREP , - wsrep_applier(is_applier), - wsrep_applier_closing(FALSE), - wsrep_client_thread(0), + wsrep_applier(is_wsrep_applier), + wsrep_applier_closing(false), + wsrep_client_thread(false), + wsrep_apply_toi(false), wsrep_po_handle(WSREP_PO_INITIALIZER), wsrep_po_cnt(0), - wsrep_po_in_trans(FALSE), wsrep_apply_format(0), - wsrep_apply_toi(false), - wsrep_skip_append_keys(false) + wsrep_ignore_table(false) #endif { ulong tmp; + bzero(&variables, sizeof(variables)); - mdl_context.init(this); /* We set THR_THD to temporally point to this THD to register all the variables that allocates memory for this THD */ THD *old_THR_THD= current_thd; set_current_thd(this); - status_var.memory_used= 0; + status_var.local_memory_used= sizeof(THD); + status_var.global_memory_used= 0; + variables.max_mem_used= global_system_variables.max_mem_used; main_da.init(); + mdl_context.init(this); + /* Pass nominal parameters to init_alloc_root only to ensure that the destructor works OK in case of an error. The main_mem_root @@ -1124,6 +933,7 @@ THD::THD() query_start_used= query_start_sec_part_used= 0; count_cuted_fields= CHECK_FIELD_IGNORE; killed= NOT_KILLED; + killed_err= 0; col_access=0; is_slave_error= thread_specific_used= FALSE; my_hash_clear(&handler_tables_hash); @@ -1137,7 +947,7 @@ THD::THD() // Must be reset to handle error with THD's created for init of mysqld lex->current_select= 0; user_time.val= start_time= start_time_sec_part= 0; - start_utime= prior_thr_create_utime= 0L; + start_utime= utime_after_query= prior_thr_create_utime= 0L; utime_after_lock= 0L; progress.arena= 0; progress.report_to_client= 0; @@ -1147,7 +957,6 @@ THD::THD() connection_name.str= 0; connection_name.length= 0; - bzero(&variables, sizeof(variables)); file_id = 0; query_id= 0; query_name_consts= 0; @@ -1182,6 +991,7 @@ THD::THD() #endif mysql_mutex_init(key_LOCK_thd_data, &LOCK_thd_data, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_wakeup_ready, &LOCK_wakeup_ready, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_thd_kill, &LOCK_thd_kill, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wakeup_ready, &COND_wakeup_ready, 0); /* LOCK_thread_count goes before LOCK_thd_data - the former is called around @@ -1207,10 +1017,10 @@ THD::THD() wsrep_retry_query_len = 0; wsrep_retry_command = COM_CONNECT; wsrep_consistency_check = NO_CONSISTENCY_CHECK; - wsrep_status_vars = 0; wsrep_mysql_replicated = 0; wsrep_TOI_pre_query = NULL; wsrep_TOI_pre_query_len = 0; + wsrep_info[sizeof(wsrep_info) - 1] = '\0'; /* make sure it is 0-terminated */ wsrep_sync_wait_gtid = WSREP_GTID_UNDEFINED; wsrep_affected_rows = 0; wsrep_replicate_GTID = false; @@ -1243,6 +1053,8 @@ THD::THD() protocol_text.init(this); protocol_binary.init(this); + thr_timer_init(&query_timer, (void (*)(void*)) thd_kill_timeout, this); + tablespace_op=FALSE; /* @@ -1255,13 +1067,7 @@ THD::THD() my_rnd_init(&rand, tmp + (ulong) &rand, tmp + (ulong) ::global_query_id); substitute_null_with_insert_id = FALSE; thr_lock_info_init(&lock_info); /* safety: will be reset after start */ -#ifdef WITH_WSREP lock_info.mysql_thd= (void *)this; - lock_info.in_lock_tables= false; -#ifdef WSREP_PROC_INFO - wsrep_info[sizeof(wsrep_info) - 1] = '\0'; /* make sure it is 0-terminated */ -#endif /* WSREP_PROC_INFO */ -#endif /* WITH_WSREP */ m_token_array= NULL; if (max_digest_length > 0) @@ -1270,7 +1076,6 @@ THD::THD() MYF(MY_WME|MY_THREAD_SPECIFIC)); } - m_internal_handler= NULL; m_binlog_invoker= INVOKER_NONE; memset(&invoker_user, 0, sizeof(invoker_user)); memset(&invoker_host, 0, sizeof(invoker_host)); @@ -1335,7 +1140,7 @@ Internal_error_handler *THD::pop_internal_handler() void THD::raise_error(uint sql_errno) { - const char* msg= ER(sql_errno); + const char* msg= ER_THD(this, sql_errno); (void) raise_condition(sql_errno, NULL, Sql_condition::WARN_LEVEL_ERROR, @@ -1348,7 +1153,7 @@ void THD::raise_error_printf(uint sql_errno, ...) char ebuff[MYSQL_ERRMSG_SIZE]; DBUG_ENTER("THD::raise_error_printf"); DBUG_PRINT("my", ("nr: %d errno: %d", sql_errno, errno)); - const char* format= ER(sql_errno); + const char* format= ER_THD(this, sql_errno); va_start(args, sql_errno); my_vsnprintf(ebuff, sizeof(ebuff), format, args); va_end(args); @@ -1361,7 +1166,7 @@ void THD::raise_error_printf(uint sql_errno, ...) void THD::raise_warning(uint sql_errno) { - const char* msg= ER(sql_errno); + const char* msg= ER_THD(this, sql_errno); (void) raise_condition(sql_errno, NULL, Sql_condition::WARN_LEVEL_WARN, @@ -1374,7 +1179,7 @@ void THD::raise_warning_printf(uint sql_errno, ...) char ebuff[MYSQL_ERRMSG_SIZE]; DBUG_ENTER("THD::raise_warning_printf"); DBUG_PRINT("enter", ("warning: %u", sql_errno)); - const char* format= ER(sql_errno); + const char* format= ER_THD(this, sql_errno); va_start(args, sql_errno); my_vsnprintf(ebuff, sizeof(ebuff), format, args); va_end(args); @@ -1391,7 +1196,7 @@ void THD::raise_note(uint sql_errno) DBUG_PRINT("enter", ("code: %d", sql_errno)); if (!(variables.option_bits & OPTION_SQL_NOTES)) DBUG_VOID_RETURN; - const char* msg= ER(sql_errno); + const char* msg= ER_THD(this, sql_errno); (void) raise_condition(sql_errno, NULL, Sql_condition::WARN_LEVEL_NOTE, @@ -1407,7 +1212,7 @@ void THD::raise_note_printf(uint sql_errno, ...) DBUG_PRINT("enter",("code: %u", sql_errno)); if (!(variables.option_bits & OPTION_SQL_NOTES)) DBUG_VOID_RETURN; - const char* format= ER(sql_errno); + const char* format= ER_THD(this, sql_errno); va_start(args, sql_errno); my_vsnprintf(ebuff, sizeof(ebuff), format, args); va_end(args); @@ -1441,7 +1246,7 @@ Sql_condition* THD::raise_condition(uint sql_errno, if (sql_errno == 0) sql_errno= ER_UNKNOWN_ERROR; if (msg == NULL) - msg= ER(sql_errno); + msg= ER_THD(this, sql_errno); if (sqlstate == NULL) sqlstate= mysql_errno_to_sqlstate(sql_errno); @@ -1453,7 +1258,7 @@ Sql_condition* THD::raise_condition(uint sql_errno, push_warning and strict SQL_MODE case. */ level= Sql_condition::WARN_LEVEL_ERROR; - killed= KILL_BAD_DATA; + set_killed(KILL_BAD_DATA); } switch (level) @@ -1484,7 +1289,7 @@ Sql_condition* THD::raise_condition(uint sql_errno, } } - query_cache_abort(&query_cache_tls); + query_cache_abort(this, &query_cache_tls); /* Avoid pushing a condition for fatal out of memory errors as this will @@ -1570,6 +1375,7 @@ extern "C" THD *_current_thd_noinline(void) return my_pthread_getspecific_ptr(THD*,THR_THD); } #endif + /* Init common variables that has to be reset on start and on change_user */ @@ -1580,8 +1386,8 @@ void THD::init(void) mysql_mutex_lock(&LOCK_global_system_variables); plugin_thdvar_init(this); /* - variables= global_system_variables above has reset - variables.pseudo_thread_id to 0. We need to correct it here to + plugin_thd_var_init() sets variables= global_system_variables, which + has reset variables.pseudo_thread_id to 0. We need to correct it here to avoid temporary tables replication failure. */ variables.pseudo_thread_id= thread_id; @@ -1601,6 +1407,9 @@ void THD::init(void) transaction.all.modified_non_trans_table= transaction.stmt.modified_non_trans_table= FALSE; + transaction.all.m_unsafe_rollback_flags= + transaction.stmt.m_unsafe_rollback_flags= 0; + open_options=ha_open_options; update_lock_default= (variables.low_priority_updates ? TL_WRITE_LOW_PRIORITY : @@ -1615,7 +1424,6 @@ void THD::init(void) start_bytes_received= 0; last_commit_gtid.seq_no= 0; status_in_global= 0; - #ifdef WITH_WSREP wsrep_exec_mode= wsrep_applier ? REPL_RECV : LOCAL_STATE; wsrep_conflict_state= NO_CONFLICT; @@ -1642,6 +1450,8 @@ void THD::init(void) else variables.option_bits&= ~OPTION_BIN_LOG; + variables.sql_log_bin_off= 0; + select_commands= update_commands= other_commands= 0; /* Set to handle counting of aborted connections */ userstat_running= opt_userstat_running; @@ -1719,7 +1529,6 @@ void THD::init_for_queries() variables.trans_alloc_block_size, variables.trans_prealloc_size); transaction.xid_state.xid.null(); - transaction.xid_state.in_thd=1; } @@ -1759,7 +1568,7 @@ void THD::cleanup(void) DBUG_ENTER("THD::cleanup"); DBUG_ASSERT(cleanup_done == 0); - killed= KILL_CONNECTION; + set_killed(KILL_CONNECTION); #ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE if (transaction.xid_state.xa_state == XA_PREPARED) { @@ -1775,7 +1584,7 @@ void THD::cleanup(void) transaction.xid_state.xa_state= XA_NOTR; trans_rollback(this); - xid_cache_delete(&transaction.xid_state); + xid_cache_delete(this, &transaction.xid_state); DBUG_ASSERT(open_tables == NULL); /* @@ -1827,6 +1636,8 @@ THD::~THD() that memory allocation counting is done correctly */ set_current_thd(this); + if (!status_in_global) + add_status_to_global(); /* Ensure that no one is using THD */ mysql_mutex_lock(&LOCK_thd_data); @@ -1837,7 +1648,6 @@ THD::~THD() mysql_mutex_unlock(&LOCK_wsrep_thd); mysql_mutex_destroy(&LOCK_wsrep_thd); if (wsrep_rgi) delete wsrep_rgi; - wsrep_free_status(this); #endif /* Close connection */ #ifndef EMBEDDED_LIBRARY @@ -1861,6 +1671,7 @@ THD::~THD() mysql_cond_destroy(&COND_wakeup_ready); mysql_mutex_destroy(&LOCK_wakeup_ready); mysql_mutex_destroy(&LOCK_thd_data); + mysql_mutex_destroy(&LOCK_thd_kill); #ifndef DBUG_OFF dbug_sentry= THD_SENTRY_GONE; #endif @@ -1881,17 +1692,23 @@ THD::~THD() rgi_slave->cleanup_after_session(); my_free(semisync_info); #endif - + main_lex.free_set_stmt_mem_root(); free_root(&main_mem_root, MYF(0)); my_free(m_token_array); main_da.free_memory(); - if (status_var.memory_used != 0) + if (tdc_hash_pins) + lf_hash_put_pins(tdc_hash_pins); + if (xid_hash_pins) + lf_hash_put_pins(xid_hash_pins); + /* Ensure everything is freed */ + status_var.local_memory_used-= sizeof(THD); + if (status_var.local_memory_used != 0) { - DBUG_PRINT("error", ("memory_used: %lld", status_var.memory_used)); + DBUG_PRINT("error", ("memory_used: %lld", status_var.local_memory_used)); SAFEMALLOC_REPORT_MEMORY(my_thread_dbug_id()); - DBUG_ASSERT(status_var.memory_used == 0); // Ensure everything is freed + DBUG_ASSERT(status_var.local_memory_used == 0); } - + update_global_memory_status(status_var.global_memory_used); set_current_thd(orig_thd == this ? 0 : orig_thd); DBUG_VOID_RETURN; } @@ -1910,6 +1727,7 @@ THD::~THD() other types are handled explicitely */ + void add_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var) { ulong *end= (ulong*) ((uchar*) to_var + @@ -1929,6 +1747,21 @@ void add_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var) to_var->binlog_bytes_written+= from_var->binlog_bytes_written; to_var->cpu_time+= from_var->cpu_time; to_var->busy_time+= from_var->busy_time; + + /* + Update global_memory_used. We have to do this with atomic_add as the + global value can change outside of LOCK_status. + */ + if (to_var == &global_status_var) + { + DBUG_PRINT("info", ("global memory_used: %lld size: %lld", + (longlong) global_status_var.global_memory_used, + (longlong) from_var->global_memory_used)); + } + // workaround for gcc 4.2.4-1ubuntu4 -fPIE (from DEB_BUILD_HARDENING=1) + int64 volatile * volatile ptr= &to_var->global_memory_used; + my_atomic_add64_explicit(ptr, from_var->global_memory_used, + MY_MEMORY_ORDER_RELAXED); } /* @@ -1966,6 +1799,11 @@ void add_diff_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var, dec_var->binlog_bytes_written; to_var->cpu_time+= from_var->cpu_time - dec_var->cpu_time; to_var->busy_time+= from_var->busy_time - dec_var->busy_time; + + /* + We don't need to accumulate memory_used as these are not reset or used by + the calling functions. See execute_show_status(). + */ } #define SECONDS_TO_WAIT_FOR_KILL 2 @@ -1985,6 +1823,7 @@ void add_diff_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var, This is normally called from another thread's THD object. @note Do always call this while holding LOCK_thd_data. + NOT_KILLED is used to awake a thread for a slave */ void THD::awake(killed_state state_to_set) @@ -1997,8 +1836,16 @@ void THD::awake(killed_state state_to_set) print_aborted_warning(3, "KILLED"); + /* + Don't degrade killed state, for example from a KILL_CONNECTION to + STATEMENT TIMEOUT + */ + if (killed >= KILL_CONNECTION) + state_to_set= killed; + /* Set the 'killed' flag of 'this', which is the target THD object. */ - killed= state_to_set; + mysql_mutex_lock(&LOCK_thd_kill); + set_killed_no_mutex(state_to_set); if (state_to_set >= KILL_CONNECTION || state_to_set == NOT_KILLED) { @@ -2020,9 +1867,6 @@ void THD::awake(killed_state state_to_set) /* Interrupt target waiting inside a storage engine. */ if (state_to_set != NOT_KILLED) -#ifdef WITH_WSREP - /* TODO: prevent applier close here */ -#endif /* WITH_WSREP */ ha_kill_query(this, thd_kill_level(this)); /* Broadcast a condition to kick the target if it is waiting on it. */ @@ -2031,6 +1875,7 @@ void THD::awake(killed_state state_to_set) mysql_mutex_lock(&mysys_var->mutex); if (!system_thread) // Don't abort locks mysys_var->abort=1; + /* This broadcast could be up in the air if the victim thread exits the cond in the time between read and broadcast, but that is @@ -2086,6 +1931,7 @@ void THD::awake(killed_state state_to_set) } mysql_mutex_unlock(&mysys_var->mutex); } + mysql_mutex_unlock(&LOCK_thd_kill); DBUG_VOID_RETURN; } @@ -2103,7 +1949,7 @@ void THD::disconnect() mysql_mutex_lock(&LOCK_thd_data); - killed= KILL_CONNECTION; + set_killed(KILL_CONNECTION); #ifdef SIGNAL_WITH_VIO_CLOSE /* @@ -2118,6 +1964,7 @@ void THD::disconnect() /* Disconnect even if a active vio is not associated. */ if (net.vio != vio) vio_close(net.vio); + net.thd= 0; // Don't collect statistics mysql_mutex_unlock(&LOCK_thd_data); } @@ -2138,7 +1985,7 @@ bool THD::notify_shared_lock(MDL_context_owner *ctx_in_use, DBUG_PRINT("info", ("kill delayed thread")); mysql_mutex_lock(&in_use->LOCK_thd_data); if (in_use->killed < KILL_CONNECTION) - in_use->killed= KILL_CONNECTION; + in_use->set_killed(KILL_CONNECTION); if (in_use->mysys_var) { mysql_mutex_lock(&in_use->mysys_var->mutex); @@ -2171,19 +2018,15 @@ bool THD::notify_shared_lock(MDL_context_owner *ctx_in_use, thread can see those instances (e.g. see partitioning code). */ if (!thd_table->needs_reopen()) -#ifdef WITH_WSREP { signalled|= mysql_lock_abort_for_thread(this, thd_table); - if (WSREP(this) && wsrep_thd_is_BF((void *)this, FALSE)) + if (WSREP(this) && wsrep_thd_is_BF(this, FALSE)) { WSREP_DEBUG("remove_table_from_cache: %llu", (unsigned long long) this->real_id); wsrep_abort_thd((void *)this, (void *)in_use, FALSE); } } -#else - signalled|= mysql_lock_abort_for_thread(this, thd_table); -#endif } } mysql_mutex_unlock(&in_use->LOCK_thd_data); @@ -2195,13 +2038,21 @@ bool THD::notify_shared_lock(MDL_context_owner *ctx_in_use, /* Get error number for killed state Note that the error message can't have any parameters. + If one needs parameters, one should use THD::killed_err_msg See thd::kill_message() */ -int killed_errno(killed_state killed) +int THD::killed_errno() { DBUG_ENTER("killed_errno"); - DBUG_PRINT("enter", ("killed: %d", killed)); + DBUG_PRINT("enter", ("killed: %d killed_errno: %d", + killed, killed_err ? killed_err->no: 0)); + + /* Ensure that killed_err is not set if we are not killed */ + DBUG_ASSERT(!killed_err || killed != NOT_KILLED); + + if (killed_err) + DBUG_RETURN(killed_err->no); switch (killed) { case NOT_KILLED: @@ -2220,6 +2071,9 @@ int killed_errno(killed_state killed) case KILL_QUERY: case KILL_QUERY_HARD: DBUG_RETURN(ER_QUERY_INTERRUPTED); + case KILL_TIMEOUT: + case KILL_TIMEOUT_HARD: + DBUG_RETURN(ER_STATEMENT_TIMEOUT); case KILL_SERVER: case KILL_SERVER_HARD: DBUG_RETURN(ER_SERVER_SHUTDOWN); @@ -2259,10 +2113,19 @@ bool THD::store_globals() This allows us to move THD to different threads if needed. */ mysys_var->id= thread_id; +#ifdef __NR_gettid + os_thread_id= (uint32)syscall(__NR_gettid); +#else + os_thread_id= 0; +#endif real_id= pthread_self(); // For debugging mysys_var->stack_ends_here= thread_stack + // for consistency, see libevent_thread_proc STACK_DIRECTION * (long)my_thread_stack_size; - vio_set_thread_id(net.vio, real_id); + if (net.vio) + { + vio_set_thread_id(net.vio, real_id); + net.thd= this; + } /* We have to call thr_lock_info_init() again here as THD may have been created in another thread @@ -2287,7 +2150,7 @@ void THD::reset_globals() /* Undocking the thread specific data. */ set_current_thd(0); my_pthread_setspecific_ptr(THR_MALLOC, NULL); - + net.thd= 0; } /* @@ -2367,11 +2230,10 @@ void THD::cleanup_after_query() table_map_for_update= 0; m_binlog_invoker= INVOKER_NONE; #ifdef WITH_WSREP - if (TOTAL_ORDER == wsrep_exec_mode) - { - wsrep_exec_mode = LOCAL_STATE; - } - //wsrep_trx_seqno = 0; + if (TOTAL_ORDER == wsrep_exec_mode) + { + wsrep_exec_mode = LOCAL_STATE; + } #endif /* WITH_WSREP */ #ifndef EMBEDDED_LIBRARY @@ -2413,18 +2275,88 @@ bool THD::convert_string(LEX_STRING *to, CHARSET_INFO *to_cs, const char *from, uint from_length, CHARSET_INFO *from_cs) { - DBUG_ENTER("convert_string"); + DBUG_ENTER("THD::convert_string"); size_t new_length= to_cs->mbmaxlen * from_length; uint dummy_errors; - if (!(to->str= (char*) alloc(new_length+1))) - { - to->length= 0; // Safety fix - DBUG_RETURN(1); // EOM - } + if (alloc_lex_string(to, new_length + 1)) + DBUG_RETURN(true); // EOM to->length= copy_and_convert((char*) to->str, new_length, to_cs, from, from_length, from_cs, &dummy_errors); - to->str[to->length]=0; // Safety - DBUG_RETURN(0); + to->str[to->length]= 0; // Safety + DBUG_RETURN(false); +} + + +/* + Convert a string between two character sets. + dstcs and srccs cannot be &my_charset_bin. +*/ +bool THD::convert_fix(CHARSET_INFO *dstcs, LEX_STRING *dst, + CHARSET_INFO *srccs, const char *src, uint src_length, + String_copier *status) +{ + DBUG_ENTER("THD::convert_fix"); + size_t dst_length= dstcs->mbmaxlen * src_length; + if (alloc_lex_string(dst, dst_length + 1)) + DBUG_RETURN(true); // EOM + dst->length= status->convert_fix(dstcs, (char*) dst->str, dst_length, + srccs, src, src_length, src_length); + dst->str[dst->length]= 0; // Safety + DBUG_RETURN(false); +} + + +/* + Copy or convert a string. +*/ +bool THD::copy_fix(CHARSET_INFO *dstcs, LEX_STRING *dst, + CHARSET_INFO *srccs, const char *src, uint src_length, + String_copier *status) +{ + DBUG_ENTER("THD::copy_fix"); + size_t dst_length= dstcs->mbmaxlen * src_length; + if (alloc_lex_string(dst, dst_length + 1)) + DBUG_RETURN(true); // EOM + dst->length= status->well_formed_copy(dstcs, dst->str, dst_length, + srccs, src, src_length, src_length); + dst->str[dst->length]= '\0'; + DBUG_RETURN(false); +} + + +class String_copier_with_error: public String_copier +{ +public: + bool check_errors(CHARSET_INFO *srccs, const char *src, uint src_length) + { + if (most_important_error_pos()) + { + ErrConvString err(src, src_length, &my_charset_bin); + my_error(ER_INVALID_CHARACTER_STRING, MYF(0), srccs->csname, err.ptr()); + return true; + } + return false; + } +}; + + +bool THD::convert_with_error(CHARSET_INFO *dstcs, LEX_STRING *dst, + CHARSET_INFO *srccs, + const char *src, uint src_length) +{ + String_copier_with_error status; + return convert_fix(dstcs, dst, srccs, src, src_length, &status) || + status.check_errors(srccs, src, src_length); +} + + +bool THD::copy_with_error(CHARSET_INFO *dstcs, LEX_STRING *dst, + CHARSET_INFO *srccs, + const char *src, uint src_length) +{ + String_copier_with_error status; + return copy_fix(dstcs, dst, srccs, src, src_length, &status) || + status.check_errors(srccs, src, src_length); } @@ -2561,7 +2493,7 @@ CHANGED_TABLE_LIST* THD::changed_table_dup(const char *key, long key_length) { my_error(EE_OUTOFMEMORY, MYF(ME_BELL+ME_FATALERROR), ALIGN_SIZE(sizeof(TABLE_LIST)) + key_length + 1); - killed= KILL_CONNECTION; + set_killed(KILL_CONNECTION); return 0; } @@ -2573,10 +2505,14 @@ CHANGED_TABLE_LIST* THD::changed_table_dup(const char *key, long key_length) } -int THD::send_explain_fields(select_result *result) +int THD::send_explain_fields(select_result *result, uint8 explain_flags, bool is_analyze) { List<Item> field_list; - make_explain_field_list(field_list); + if (lex->explain_json) + make_explain_json_field_list(field_list, is_analyze); + else + make_explain_field_list(field_list, explain_flags, is_analyze); + result->prepare(field_list, NULL); return (result->send_result_set_metadata(field_list, Protocol::SEND_NUM_ROWS | @@ -2584,51 +2520,102 @@ int THD::send_explain_fields(select_result *result) } +void THD::make_explain_json_field_list(List<Item> &field_list, bool is_analyze) +{ + Item *item= new (mem_root) Item_empty_string(this, (is_analyze ? + "ANALYZE" : + "EXPLAIN"), + 78, system_charset_info); + field_list.push_back(item, mem_root); +} + + /* Populate the provided field_list with EXPLAIN output columns. this->lex->describe has the EXPLAIN flags + + The set/order of columns must be kept in sync with + Explain_query::print_explain and co. */ -void THD::make_explain_field_list(List<Item> &field_list) +void THD::make_explain_field_list(List<Item> &field_list, uint8 explain_flags, + bool is_analyze) { Item *item; CHARSET_INFO *cs= system_charset_info; - field_list.push_back(item= new Item_return_int("id",3, MYSQL_TYPE_LONGLONG)); + field_list.push_back(item= new (mem_root) + Item_return_int(this, "id", 3, + MYSQL_TYPE_LONGLONG), mem_root); item->maybe_null= 1; - field_list.push_back(new Item_empty_string("select_type", 19, cs)); - field_list.push_back(item= new Item_empty_string("table", NAME_CHAR_LEN, cs)); + field_list.push_back(new (mem_root) + Item_empty_string(this, "select_type", 19, cs), + mem_root); + field_list.push_back(item= new (mem_root) + Item_empty_string(this, "table", NAME_CHAR_LEN, cs), + mem_root); item->maybe_null= 1; - if (lex->describe & DESCRIBE_PARTITIONS) + if (explain_flags & DESCRIBE_PARTITIONS) { /* Maximum length of string that make_used_partitions_str() can produce */ - item= new Item_empty_string("partitions", MAX_PARTITIONS * (1 + FN_LEN), - cs); - field_list.push_back(item); + item= new (mem_root) Item_empty_string(this, "partitions", + MAX_PARTITIONS * (1 + FN_LEN), cs); + field_list.push_back(item, mem_root); item->maybe_null= 1; } - field_list.push_back(item= new Item_empty_string("type", 10, cs)); + field_list.push_back(item= new (mem_root) + Item_empty_string(this, "type", 10, cs), + mem_root); item->maybe_null= 1; - field_list.push_back(item=new Item_empty_string("possible_keys", - NAME_CHAR_LEN*MAX_KEY, cs)); + field_list.push_back(item= new (mem_root) + Item_empty_string(this, "possible_keys", + NAME_CHAR_LEN*MAX_KEY, cs), + mem_root); item->maybe_null=1; - field_list.push_back(item=new Item_empty_string("key", NAME_CHAR_LEN, cs)); + field_list.push_back(item=new (mem_root) + Item_empty_string(this, "key", NAME_CHAR_LEN, cs), + mem_root); item->maybe_null=1; - field_list.push_back(item=new Item_empty_string("key_len", - NAME_CHAR_LEN*MAX_KEY)); + field_list.push_back(item=new (mem_root) + Item_empty_string(this, "key_len", + NAME_CHAR_LEN*MAX_KEY), + mem_root); item->maybe_null=1; - field_list.push_back(item=new Item_empty_string("ref", - NAME_CHAR_LEN*MAX_REF_PARTS, - cs)); + field_list.push_back(item=new (mem_root) + Item_empty_string(this, "ref", + NAME_CHAR_LEN*MAX_REF_PARTS, cs), + mem_root); item->maybe_null=1; - field_list.push_back(item= new Item_return_int("rows", 10, - MYSQL_TYPE_LONGLONG)); - if (lex->describe & DESCRIBE_EXTENDED) + field_list.push_back(item= new (mem_root) + Item_return_int(this, "rows", 10, MYSQL_TYPE_LONGLONG), + mem_root); + if (is_analyze) { - field_list.push_back(item= new Item_float("filtered", 0.1234, 2, 4)); + field_list.push_back(item= new (mem_root) + Item_float(this, "r_rows", 0.1234, 10, 4), + mem_root); item->maybe_null=1; } + + if (is_analyze || (explain_flags & DESCRIBE_EXTENDED)) + { + field_list.push_back(item= new (mem_root) + Item_float(this, "filtered", 0.1234, 2, 4), + mem_root); + item->maybe_null=1; + } + + if (is_analyze) + { + field_list.push_back(item= new (mem_root) + Item_float(this, "r_filtered", 0.1234, 2, 4), + mem_root); + item->maybe_null=1; + } + item->maybe_null= 1; - field_list.push_back(new Item_empty_string("Extra", 255, cs)); + field_list.push_back(new (mem_root) + Item_empty_string(this, "Extra", 255, cs), + mem_root); } @@ -2739,11 +2726,6 @@ void THD::rollback_item_tree_changes() ** Functions to provide a interface to select results *****************************************************************************/ -select_result::select_result() -{ - thd=current_thd; -} - void select_result::cleanup() { /* do nothing */ @@ -2898,7 +2880,7 @@ bool select_to_file::send_eof() if (mysql_file_close(file, MYF(MY_WME)) || thd->is_error()) error= true; - if (!error) + if (!error && !suppress_my_ok) { ::my_ok(thd,row_count); } @@ -3061,7 +3043,7 @@ select_export::prepare(List<Item> &list, SELECT_LEX_UNIT *u) */ push_warning(thd, Sql_condition::WARN_LEVEL_WARN, WARN_NON_ASCII_SEPARATOR_NOT_IMPLEMENTED, - ER(WARN_NON_ASCII_SEPARATOR_NOT_IMPLEMENTED)); + ER_THD(thd, WARN_NON_ASCII_SEPARATOR_NOT_IMPLEMENTED)); } field_term_length=exchange->field_term->length(); field_term_char= field_term_length ? @@ -3091,7 +3073,8 @@ select_export::prepare(List<Item> &list, SELECT_LEX_UNIT *u) field_term_length && strchr(NUMERIC_CHARS, field_term_char))) { push_warning(thd, Sql_condition::WARN_LEVEL_WARN, - ER_AMBIGUOUS_FIELD_TERM, ER(ER_AMBIGUOUS_FIELD_TERM)); + ER_AMBIGUOUS_FIELD_TERM, + ER_THD(thd, ER_AMBIGUOUS_FIELD_TERM)); is_ambiguous_field_term= TRUE; } else @@ -3142,9 +3125,7 @@ int select_export::send_data(List<Item> &items) if (res && !my_charset_same(write_cs, res->charset()) && !my_charset_same(write_cs, &my_charset_bin)) { - const char *well_formed_error_pos; - const char *cannot_convert_error_pos; - const char *from_end_pos; + String_copier copier; const char *error_pos; uint32 bytes; uint64 estimated_bytes= @@ -3157,16 +3138,11 @@ int select_export::send_data(List<Item> &items) goto err; } - bytes= well_formed_copy_nchars(write_cs, (char *) cvt_str.ptr(), + bytes= copier.well_formed_copy(write_cs, (char *) cvt_str.ptr(), cvt_str.alloced_length(), - res->charset(), res->ptr(), res->length(), - UINT_MAX32, // copy all input chars, - // i.e. ignore nchars parameter - &well_formed_error_pos, - &cannot_convert_error_pos, - &from_end_pos); - error_pos= well_formed_error_pos ? well_formed_error_pos - : cannot_convert_error_pos; + res->charset(), + res->ptr(), res->length()); + error_pos= copier.most_important_error_pos(); if (error_pos) { char printable_buff[32]; @@ -3175,17 +3151,18 @@ int select_export::send_data(List<Item> &items) res->charset(), 6); push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, ER_TRUNCATED_WRONG_VALUE_FOR_FIELD, - ER(ER_TRUNCATED_WRONG_VALUE_FOR_FIELD), + ER_THD(thd, ER_TRUNCATED_WRONG_VALUE_FOR_FIELD), "string", printable_buff, item->name, static_cast<long>(row_count)); } - else if (from_end_pos < res->ptr() + res->length()) + else if (copier.source_end_pos() < res->ptr() + res->length()) { /* result is longer than UINT_MAX32 and doesn't fit into String */ push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, - WARN_DATA_TRUNCATED, ER(WARN_DATA_TRUNCATED), + WARN_DATA_TRUNCATED, + ER_THD(thd, WARN_DATA_TRUNCATED), item->full_name(), static_cast<long>(row_count)); } cvt_str.length(bytes); @@ -3385,7 +3362,7 @@ int select_dump::send_data(List<Item> &items) if (row_count++ > 1) { - my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0)); + my_message(ER_TOO_MANY_ROWS, ER_THD(thd, ER_TOO_MANY_ROWS), MYF(0)); goto err; } while ((item=li++)) @@ -3408,19 +3385,13 @@ err: } -select_subselect::select_subselect(Item_subselect *item_arg) -{ - item= item_arg; -} - - int select_singlerow_subselect::send_data(List<Item> &items) { DBUG_ENTER("select_singlerow_subselect::send_data"); Item_singlerow_subselect *it= (Item_singlerow_subselect *)item; if (it->assigned()) { - my_message(ER_SUBQUERY_NO_1_ROW, ER(ER_SUBQUERY_NO_1_ROW), + my_message(ER_SUBQUERY_NO_1_ROW, ER_THD(thd, ER_SUBQUERY_NO_1_ROW), MYF(current_thd->lex->ignore ? ME_JUST_WARNING : 0)); DBUG_RETURN(1); } @@ -3465,7 +3436,7 @@ int select_max_min_finder_subselect::send_data(List<Item> &items) { if (!cache) { - cache= Item_cache::get_cache(val_item); + cache= Item_cache::get_cache(thd, val_item); switch (val_item->result_type()) { case REAL_RESULT: op= &select_max_min_finder_subselect::cmp_real; @@ -3481,7 +3452,6 @@ int select_max_min_finder_subselect::send_data(List<Item> &items) break; case ROW_RESULT: case TIME_RESULT: - case IMPOSSIBLE_RESULT: // This case should never be choosen DBUG_ASSERT(0); op= 0; @@ -3593,7 +3563,7 @@ int select_dumpvar::prepare(List<Item> &list, SELECT_LEX_UNIT *u) if (var_list.elements != list.elements) { my_message(ER_WRONG_NUMBER_OF_COLUMNS_IN_SELECT, - ER(ER_WRONG_NUMBER_OF_COLUMNS_IN_SELECT), MYF(0)); + ER_THD(thd, ER_WRONG_NUMBER_OF_COLUMNS_IN_SELECT), MYF(0)); return 1; } return 0; @@ -3629,6 +3599,7 @@ void Query_arena::free_items() { next= free_list->next; DBUG_ASSERT(free_list != next); + DBUG_PRINT("info", ("free item: 0x%lx", (ulong) free_list)); free_list->delete_self(); } /* Postcondition: free_list is 0 */ @@ -3919,6 +3890,18 @@ Statement_map::~Statement_map() my_hash_free(&st_hash); } +bool my_var_user::set(THD *thd, Item *item) +{ + Item_func_set_user_var *suv= new (thd->mem_root) Item_func_set_user_var(thd, name, item); + suv->save_item_result(item); + return suv->fix_fields(thd, 0) || suv->update(); +} + +bool my_var_sp::set(THD *thd, Item *item) +{ + return thd->spcont->set_variable(thd, offset, &item); +} + int select_dumpvar::send_data(List<Item> &items) { List_iterator_fast<my_var> var_li(var_list); @@ -3934,25 +3917,13 @@ int select_dumpvar::send_data(List<Item> &items) } if (row_count++) { - my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0)); + my_message(ER_TOO_MANY_ROWS, ER_THD(thd, ER_TOO_MANY_ROWS), MYF(0)); DBUG_RETURN(1); } while ((mv= var_li++) && (item= it++)) { - if (mv->local) - { - if (thd->spcont->set_variable(thd, mv->offset, &item)) - DBUG_RETURN(1); - } - else - { - Item_func_set_user_var *suv= new Item_func_set_user_var(mv->s, item); - suv->save_item_result(item); - if (suv->fix_fields(thd, 0)) - DBUG_RETURN (1); - if (suv->update()) - DBUG_RETURN (1); - } + if (mv->set(thd, item)) + DBUG_RETURN(1); } DBUG_RETURN(thd->is_error()); } @@ -3961,7 +3932,7 @@ bool select_dumpvar::send_eof() { if (! row_count) push_warning(thd, Sql_condition::WARN_LEVEL_WARN, - ER_SP_FETCH_NO_DATA, ER(ER_SP_FETCH_NO_DATA)); + ER_SP_FETCH_NO_DATA, ER_THD(thd, ER_SP_FETCH_NO_DATA)); /* Don't send EOF if we're in error condition (which implies we've already sent or are sending an error) @@ -3969,11 +3940,14 @@ bool select_dumpvar::send_eof() if (thd->is_error()) return true; - ::my_ok(thd,row_count); + if (!suppress_my_ok) + ::my_ok(thd,row_count); + return 0; } + bool select_materialize_with_stats:: create_result_table(THD *thd_arg, List<Item> *column_types, @@ -3989,7 +3963,7 @@ create_result_table(THD *thd_arg, List<Item> *column_types, if (! (table= create_tmp_table(thd_arg, &tmp_table_param, *column_types, (ORDER*) 0, is_union_distinct, 1, options, HA_POS_ERROR, (char*) table_alias, - keep_row_order))) + !create_table, keep_row_order))) return TRUE; col_stat= (Column_statistics*) table->in_use->alloc(table->s->fields * @@ -4089,13 +4063,12 @@ void TMP_TABLE_PARAM::init() } -void thd_increment_bytes_sent(ulong length) +void thd_increment_bytes_sent(void *thd, ulong length) { - THD *thd=current_thd; + /* thd == 0 when close_connection() calls net_send_error() */ if (likely(thd != 0)) { - /* current_thd == 0 when close_connection() calls net_send_error() */ - thd->status_var.bytes_sent+= length; + ((THD*) thd)->status_var.bytes_sent+= length; } } @@ -4106,15 +4079,15 @@ my_bool thd_net_is_killed() } -void thd_increment_bytes_received(ulong length) +void thd_increment_bytes_received(void *thd, ulong length) { - current_thd->status_var.bytes_received+= length; + ((THD*) thd)->status_var.bytes_received+= length; } -void thd_increment_net_big_packet_count(ulong length) +void thd_increment_net_big_packet_count(void *thd, ulong length) { - current_thd->status_var.net_big_packet_count+= length; + ((THD*) thd)->status_var.net_big_packet_count+= length; } @@ -4336,23 +4309,30 @@ extern "C" int thd_killed(const MYSQL_THD thd) /* return thd->killed status to the client, mapped to the API enum thd_kill_levels values. + + @note Since this function is called quite frequently thd_kill_level(NULL) is + forbidden for performance reasons (saves one conditional branch). If your ever + need to call thd_kill_level() when THD is not available, you options are (most + to least preferred): + - try to pass THD through to thd_kill_level() + - add current_thd to some service and use thd_killed(current_thd) + - add thd_killed_current() function to kill statement service + - add if (!thd) thd= current_thd here */ extern "C" enum thd_kill_levels thd_kill_level(const MYSQL_THD thd) { - THD* current= current_thd; - - if (!thd) - thd= current; - - if (thd == current) - { - Apc_target *apc_target= (Apc_target*)&thd->apc_target; - if (apc_target->have_apc_requests()) - apc_target->process_apc_requests(); - } + DBUG_ASSERT(thd); if (likely(thd->killed == NOT_KILLED)) + { + Apc_target *apc_target= (Apc_target*) &thd->apc_target; + if (unlikely(apc_target->have_apc_requests())) + { + if (thd == current_thd) + apc_target->process_apc_requests(); + } return THD_IS_NOT_KILLED; + } return thd->killed & KILL_HARD_BIT ? THD_ABORT_ASAP : THD_ABORT_SOFTLY; } @@ -4506,6 +4486,28 @@ extern "C" int thd_is_connected(MYSQL_THD thd) } +extern "C" double thd_rnd(MYSQL_THD thd) +{ + return my_rnd(&thd->rand); +} + + +/** + Generate string of printable random characters of requested length. + + @param to[out] Buffer for generation; must be at least length+1 bytes + long; result string is always null-terminated + @param length[in] How many random characters to put in buffer +*/ +extern "C" void thd_create_random_password(MYSQL_THD thd, + char *to, size_t length) +{ + for (char *end= to + length; to < end; to++) + *to= (char) (my_rnd(&thd->rand)*94 + 33); + *to= '\0'; +} + + #ifdef INNODB_COMPATIBILITY_HOOKS extern "C" const struct charset_info_st *thd_charset(MYSQL_THD thd) { @@ -4543,6 +4545,15 @@ extern "C" int thd_rpl_is_parallel(const MYSQL_THD thd) return thd->rgi_slave && thd->rgi_slave->is_parallel_exec; } + +/* Returns high resolution timestamp for the start + of the current query. */ +extern "C" unsigned long long thd_start_utime(const MYSQL_THD thd) +{ + return thd->start_utime; +} + + /* This function can optionally be called to check if thd_report_wait_for() needs to be called for waits done by a given transaction. @@ -4560,7 +4571,7 @@ thd_need_wait_for(const MYSQL_THD thd) { rpl_group_info *rgi; - if (mysql_bin_log.is_open() && opt_binlog_commit_wait_count > 0) + if (mysql_bin_log.is_open()) return true; if (!thd) return false; @@ -4601,7 +4612,11 @@ thd_report_wait_for(MYSQL_THD thd, MYSQL_THD other_thd) rpl_group_info *rgi; rpl_group_info *other_rgi; - if (!thd || !other_thd) + if (!thd) + return; + DEBUG_SYNC(thd, "thd_report_wait_for"); + thd->transaction.stmt.mark_trans_did_wait(); + if (!other_thd) return; binlog_report_wait_for(thd, other_thd); rgi= thd->rgi_slave; @@ -4626,13 +4641,88 @@ thd_report_wait_for(MYSQL_THD thd, MYSQL_THD other_thd) cause replication to rollback (and later re-try) the other transaction, releasing the lock for this transaction so replication can proceed. */ - other_rgi->killed_for_retry= true; + other_rgi->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED; mysql_mutex_lock(&other_thd->LOCK_thd_data); other_thd->awake(KILL_CONNECTION); mysql_mutex_unlock(&other_thd->LOCK_thd_data); } /* + Used by storage engines (currently TokuDB) to report that one transaction + THD is about to go to wait for a transactional lock held by another + transactions OTHER_THD. + + This is used for parallel replication, where transactions are required to + commit in the same order on the slave as they did on the master. If the + transactions on the slave encounter lock conflicts on the slave that did not + exist on the master, this can cause deadlocks. This is primarily used in + optimistic (and aggressive) modes. + + Normally, such conflicts will not occur in conservative mode, because the + same conflict would have prevented the two transactions from committing in + parallel on the master, thus preventing them from running in parallel on the + slave in the first place. However, it is possible in case when the optimizer + chooses a different plan on the slave than on the master (eg. table scan + instead of index scan). + + InnoDB/XtraDB reports lock waits using this call. If a lock wait causes a + deadlock with the pre-determined commit order, we kill the later transaction, + and later re-try it, to resolve the deadlock. + + This call need only receive reports about waits for locks that will remain + until the holding transaction commits. InnoDB/XtraDB auto-increment locks, + for example, are released earlier, and so need not be reported. (Such false + positives are not harmful, but could lead to unnecessary kill and retry, so + best avoided). + + Returns 1 if the OTHER_THD will be killed to resolve deadlock, 0 if not. The + actual kill will happen later, asynchronously from another thread. The + caller does not need to take any actions on the return value if the + handlerton kill_query method is implemented to abort the to-be-killed + transaction. +*/ +extern "C" int +thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd) +{ + rpl_group_info *rgi; + rpl_group_info *other_rgi; + + if (!thd) + return 0; + DEBUG_SYNC(thd, "thd_report_wait_for"); + thd->transaction.stmt.mark_trans_did_wait(); + if (!other_thd) + return 0; + binlog_report_wait_for(thd, other_thd); + rgi= thd->rgi_slave; + other_rgi= other_thd->rgi_slave; + if (!rgi || !other_rgi) + return 0; + if (!rgi->is_parallel_exec) + return 0; + if (rgi->rli != other_rgi->rli) + return 0; + if (!rgi->gtid_sub_id || !other_rgi->gtid_sub_id) + return 0; + if (rgi->current_gtid.domain_id != other_rgi->current_gtid.domain_id) + return 0; + if (rgi->gtid_sub_id > other_rgi->gtid_sub_id) + return 0; + /* + This transaction is about to wait for another transaction that is required + by replication binlog order to commit after. This would cause a deadlock. + + So send a kill to the other transaction, with a temporary error; this will + cause replication to rollback (and later re-try) the other transaction, + releasing the lock for this transaction so replication can proceed. + */ +#ifdef HAVE_REPLICATION + slave_background_kill_request(other_thd); +#endif + return 1; +} + +/* This function is called from InnoDB/XtraDB to check if the commit order of two transactions has already been decided by the upper layer. This happens in parallel replication, where the commit order is forced to be the same on @@ -4768,17 +4858,14 @@ extern "C" int thd_non_transactional_update(const MYSQL_THD thd) extern "C" int thd_binlog_format(const MYSQL_THD thd) { -#ifdef WITH_WSREP if (WSREP(thd)) { /* for wsrep binlog format is meaningful also when binlogging is off */ - return (int) WSREP_FORMAT(thd->variables.binlog_format); + return (int) thd->wsrep_binlog_format(); } -#endif /* WITH_WSREP */ if (mysql_bin_log.is_open() && (thd->variables.option_bits & OPTION_BIN_LOG)) return (int) thd->variables.binlog_format; - else - return BINLOG_FORMAT_UNSPEC; + return BINLOG_FORMAT_UNSPEC; } extern "C" void thd_mark_transaction_to_rollback(MYSQL_THD thd, bool all) @@ -5181,27 +5268,6 @@ void THD::set_status_no_good_index_used() #endif } -void THD::set_command(enum enum_server_command command) -{ - m_command= command; -#ifdef HAVE_PSI_THREAD_INTERFACE - PSI_STATEMENT_CALL(set_thread_command)(m_command); -#endif -} - -/** Assign a new value to thd->query. */ - -void THD::set_query(const CSET_STRING &string_arg) -{ - mysql_mutex_lock(&LOCK_thd_data); - set_query_inner(string_arg); - mysql_mutex_unlock(&LOCK_thd_data); - -#ifdef HAVE_PSI_THREAD_INTERFACE - PSI_THREAD_CALL(set_thread_info)(query(), query_length()); -#endif -} - /** Assign a new value to thd->query and thd->query_id. */ void THD::set_query_and_id(char *query_arg, uint32 query_length_arg, @@ -5263,9 +5329,7 @@ void THD::get_definer(LEX_USER *definer, bool role) { definer->user = invoker_user; definer->host= invoker_host; - definer->password= null_lex_str; - definer->plugin= empty_lex_str; - definer->auth= empty_lex_str; + definer->reset_auth(); } else #endif @@ -5292,120 +5356,259 @@ void THD::mark_transaction_to_rollback(bool all) /*************************************************************************** Handling of XA id cacheing ***************************************************************************/ +class XID_cache_element +{ + /* + m_state is used to prevent elements from being deleted while XA RECOVER + iterates xid cache and to prevent recovered elments from being acquired by + multiple threads. -mysql_mutex_t LOCK_xid_cache; -HASH xid_cache; + bits 1..29 are reference counter + bit 30 is RECOVERED flag + bit 31 is ACQUIRED flag (thread owns this xid) + bit 32 is unused -extern "C" uchar *xid_get_hash_key(const uchar *, size_t *, my_bool); -extern "C" void xid_free_hash(void *); + Newly allocated and deleted elements have m_state set to 0. -uchar *xid_get_hash_key(const uchar *ptr, size_t *length, - my_bool not_used __attribute__((unused))) -{ - *length=((XID_STATE*)ptr)->xid.key_length(); - return ((XID_STATE*)ptr)->xid.key(); -} + On lock() m_state is atomically incremented. It also creates load-ACQUIRE + memory barrier to make sure m_state is actually updated before furhter + memory accesses. Attempting to lock an element that has neither ACQUIRED + nor RECOVERED flag set returns failure and further accesses to element + memory are forbidden. -void xid_free_hash(void *ptr) -{ - if (!((XID_STATE*)ptr)->in_thd) - my_free(ptr); -} + On unlock() m_state is decremented. It also creates store-RELEASE memory + barrier to make sure m_state is actually updated after preceding memory + accesses. -#ifdef HAVE_PSI_INTERFACE -static PSI_mutex_key key_LOCK_xid_cache; + ACQUIRED flag is set when thread registers it's xid or when thread acquires + recovered xid. -static PSI_mutex_info all_xid_mutexes[]= -{ - { &key_LOCK_xid_cache, "LOCK_xid_cache", PSI_FLAG_GLOBAL} + RECOVERED flag is set for elements found during crash recovery. + + ACQUIRED and RECOVERED flags are cleared before element is deleted from + hash in a spin loop, after last reference is released. + */ + int32 m_state; +public: + static const int32 ACQUIRED= 1 << 30; + static const int32 RECOVERED= 1 << 29; + XID_STATE *m_xid_state; + bool is_set(int32 flag) + { return my_atomic_load32_explicit(&m_state, MY_MEMORY_ORDER_RELAXED) & flag; } + void set(int32 flag) + { + DBUG_ASSERT(!is_set(ACQUIRED | RECOVERED)); + my_atomic_add32_explicit(&m_state, flag, MY_MEMORY_ORDER_RELAXED); + } + bool lock() + { + int32 old= my_atomic_add32_explicit(&m_state, 1, MY_MEMORY_ORDER_ACQUIRE); + if (old & (ACQUIRED | RECOVERED)) + return true; + unlock(); + return false; + } + void unlock() + { my_atomic_add32_explicit(&m_state, -1, MY_MEMORY_ORDER_RELEASE); } + void mark_uninitialized() + { + int32 old= ACQUIRED; + while (!my_atomic_cas32_weak_explicit(&m_state, &old, 0, + MY_MEMORY_ORDER_RELAXED, + MY_MEMORY_ORDER_RELAXED)) + { + old&= ACQUIRED | RECOVERED; + (void) LF_BACKOFF; + } + } + bool acquire_recovered() + { + int32 old= RECOVERED; + while (!my_atomic_cas32_weak_explicit(&m_state, &old, ACQUIRED | RECOVERED, + MY_MEMORY_ORDER_RELAXED, + MY_MEMORY_ORDER_RELAXED)) + { + if (!(old & RECOVERED) || (old & ACQUIRED)) + return false; + old= RECOVERED; + (void) LF_BACKOFF; + } + return true; + } + static void lf_hash_initializer(LF_HASH *hash __attribute__((unused)), + XID_cache_element *element, + XID_STATE *xid_state) + { + DBUG_ASSERT(!element->is_set(ACQUIRED | RECOVERED)); + element->m_xid_state= xid_state; + xid_state->xid_cache_element= element; + } + static void lf_alloc_constructor(uchar *ptr) + { + XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD); + element->m_state= 0; + } + static void lf_alloc_destructor(uchar *ptr) + { + XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD); + DBUG_ASSERT(!element->is_set(ACQUIRED)); + if (element->is_set(RECOVERED)) + my_free(element->m_xid_state); + } + static uchar *key(const XID_cache_element *element, size_t *length, + my_bool not_used __attribute__((unused))) + { + *length= element->m_xid_state->xid.key_length(); + return element->m_xid_state->xid.key(); + } }; -static void init_xid_psi_keys(void) -{ - const char* category= "sql"; - int count; - if (PSI_server == NULL) - return; +static LF_HASH xid_cache; +static bool xid_cache_inited; - count= array_elements(all_xid_mutexes); - PSI_server->register_mutex(category, all_xid_mutexes, count); -} -#endif /* HAVE_PSI_INTERFACE */ -bool xid_cache_init() +bool THD::fix_xid_hash_pins() { -#ifdef HAVE_PSI_INTERFACE - init_xid_psi_keys(); -#endif + if (!xid_hash_pins) + xid_hash_pins= lf_hash_get_pins(&xid_cache); + return !xid_hash_pins; +} + - mysql_mutex_init(key_LOCK_xid_cache, &LOCK_xid_cache, MY_MUTEX_INIT_FAST); - return my_hash_init(&xid_cache, &my_charset_bin, 100, 0, 0, - xid_get_hash_key, xid_free_hash, 0) != 0; +void xid_cache_init() +{ + xid_cache_inited= true; + lf_hash_init(&xid_cache, sizeof(XID_cache_element), LF_HASH_UNIQUE, 0, 0, + (my_hash_get_key) XID_cache_element::key, &my_charset_bin); + xid_cache.alloc.constructor= XID_cache_element::lf_alloc_constructor; + xid_cache.alloc.destructor= XID_cache_element::lf_alloc_destructor; + xid_cache.initializer= + (lf_hash_initializer) XID_cache_element::lf_hash_initializer; } + void xid_cache_free() { - if (my_hash_inited(&xid_cache)) + if (xid_cache_inited) { - my_hash_free(&xid_cache); - mysql_mutex_destroy(&LOCK_xid_cache); + lf_hash_destroy(&xid_cache); + xid_cache_inited= false; } } -XID_STATE *xid_cache_search(XID *xid) + +/** + Find recovered XA transaction by XID. +*/ + +XID_STATE *xid_cache_search(THD *thd, XID *xid) { - mysql_mutex_lock(&LOCK_xid_cache); - XID_STATE *res=(XID_STATE *)my_hash_search(&xid_cache, xid->key(), - xid->key_length()); - mysql_mutex_unlock(&LOCK_xid_cache); - return res; + XID_STATE *xs= 0; + DBUG_ASSERT(thd->xid_hash_pins); + XID_cache_element *element= + (XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins, + xid->key(), xid->key_length()); + if (element) + { + if (element->acquire_recovered()) + xs= element->m_xid_state; + lf_hash_search_unpin(thd->xid_hash_pins); + DEBUG_SYNC(thd, "xa_after_search"); + } + return xs; } bool xid_cache_insert(XID *xid, enum xa_states xa_state) { XID_STATE *xs; - my_bool res; - mysql_mutex_lock(&LOCK_xid_cache); - if (my_hash_search(&xid_cache, xid->key(), xid->key_length())) - res=0; - else if (!(xs=(XID_STATE *)my_malloc(sizeof(*xs), MYF(MY_WME)))) - res=1; - else + LF_PINS *pins; + int res= 1; + + if (!(pins= lf_hash_get_pins(&xid_cache))) + return true; + + if ((xs= (XID_STATE*) my_malloc(sizeof(*xs), MYF(MY_WME)))) { xs->xa_state=xa_state; xs->xid.set(xid); - xs->in_thd=0; xs->rm_error=0; - res=my_hash_insert(&xid_cache, (uchar*)xs); + + if ((res= lf_hash_insert(&xid_cache, pins, xs))) + my_free(xs); + else + xs->xid_cache_element->set(XID_cache_element::RECOVERED); + if (res == 1) + res= 0; } - mysql_mutex_unlock(&LOCK_xid_cache); + lf_hash_put_pins(pins); return res; } -bool xid_cache_insert(XID_STATE *xid_state) +bool xid_cache_insert(THD *thd, XID_STATE *xid_state) { - mysql_mutex_lock(&LOCK_xid_cache); - if (my_hash_search(&xid_cache, xid_state->xid.key(), - xid_state->xid.key_length())) + if (thd->fix_xid_hash_pins()) + return true; + + int res= lf_hash_insert(&xid_cache, thd->xid_hash_pins, xid_state); + switch (res) { - mysql_mutex_unlock(&LOCK_xid_cache); + case 0: + xid_state->xid_cache_element->set(XID_cache_element::ACQUIRED); + break; + case 1: my_error(ER_XAER_DUPID, MYF(0)); - return true; + /* fall through */ + default: + xid_state->xid_cache_element= 0; } - bool res= my_hash_insert(&xid_cache, (uchar*)xid_state); - mysql_mutex_unlock(&LOCK_xid_cache); return res; } -void xid_cache_delete(XID_STATE *xid_state) +void xid_cache_delete(THD *thd, XID_STATE *xid_state) { - mysql_mutex_lock(&LOCK_xid_cache); - my_hash_delete(&xid_cache, (uchar *)xid_state); - mysql_mutex_unlock(&LOCK_xid_cache); + if (xid_state->xid_cache_element) + { + bool recovered= xid_state->xid_cache_element->is_set(XID_cache_element::RECOVERED); + DBUG_ASSERT(thd->xid_hash_pins); + xid_state->xid_cache_element->mark_uninitialized(); + lf_hash_delete(&xid_cache, thd->xid_hash_pins, + xid_state->xid.key(), xid_state->xid.key_length()); + xid_state->xid_cache_element= 0; + if (recovered) + my_free(xid_state); + } +} + + +struct xid_cache_iterate_arg +{ + my_hash_walk_action action; + void *argument; +}; + +static my_bool xid_cache_iterate_callback(XID_cache_element *element, + xid_cache_iterate_arg *arg) +{ + my_bool res= FALSE; + if (element->lock()) + { + res= arg->action(element->m_xid_state, arg->argument); + element->unlock(); + } + return res; +} + +int xid_cache_iterate(THD *thd, my_hash_walk_action action, void *arg) +{ + xid_cache_iterate_arg argument= { action, arg }; + return thd->fix_xid_hash_pins() ? -1 : + lf_hash_iterate(&xid_cache, thd->xid_hash_pins, + (my_hash_walk_action) xid_cache_iterate_callback, + &argument); } /* @@ -5609,7 +5812,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) binlog by filtering rules. */ if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) && - !(WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT && + !(wsrep_binlog_format() == BINLOG_FORMAT_STMT && !binlog_filter->db_ok(db))) { /* @@ -5682,7 +5885,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) } #endif - if (variables.binlog_format != BINLOG_FORMAT_ROW && tables) + if (wsrep_binlog_format() != BINLOG_FORMAT_ROW && tables) { /* DML statements that modify a table with an auto_increment column based on @@ -5773,8 +5976,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) flags_access_some_set |= flags; if (lex->sql_command != SQLCOM_CREATE_TABLE || - (lex->sql_command == SQLCOM_CREATE_TABLE && - lex->create_info.tmp_table())) + (lex->sql_command == SQLCOM_CREATE_TABLE && lex->tmp_table())) { my_bool trans= table->table->file->has_transactions(); @@ -5846,7 +6048,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) */ my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE), MYF(0)); } - else if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_ROW && + else if (wsrep_binlog_format() == BINLOG_FORMAT_ROW && sqlcom_can_generate_row_events(this)) { /* @@ -5867,7 +6069,8 @@ int THD::decide_logging_format(TABLE_LIST *tables) unsafe_type++) if (unsafe_flags & (1 << unsafe_type)) my_error((error= ER_BINLOG_UNSAFE_AND_STMT_ENGINE), MYF(0), - ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type])); + ER_THD(this, + LEX::binlog_stmt_unsafe_errcode[unsafe_type])); } /* log in statement format! */ } @@ -5875,7 +6078,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) else { /* binlog_format = STATEMENT */ - if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT) + if (wsrep_binlog_format() == BINLOG_FORMAT_STMT) { if (lex->is_stmt_row_injection()) { @@ -5892,14 +6095,10 @@ int THD::decide_logging_format(TABLE_LIST *tables) 5. Error: Cannot modify table that uses a storage engine limited to row-logging when binlog_format = STATEMENT */ -#ifdef WITH_WSREP - if (!WSREP(this) || wsrep_exec_mode == LOCAL_STATE) + if (IF_WSREP((!WSREP(this) || wsrep_exec_mode == LOCAL_STATE),1)) { -#endif /* WITH_WSREP */ - my_error((error= ER_BINLOG_STMT_MODE_AND_ROW_ENGINE), MYF(0), ""); -#ifdef WITH_WSREP + my_error((error= ER_BINLOG_STMT_MODE_AND_ROW_ENGINE), MYF(0), ""); } -#endif /* WITH_WSREP */ } else if (is_write && (unsafe_flags= lex->get_stmt_unsafe_flags()) != 0) { @@ -5911,7 +6110,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) DBUG_PRINT("info", ("Scheduling warning to be issued by " "binlog_query: '%s'", - ER(ER_BINLOG_UNSAFE_STATEMENT))); + ER_THD(this, ER_BINLOG_UNSAFE_STATEMENT))); DBUG_PRINT("info", ("binlog_unsafe_warning_flags: 0x%x", binlog_unsafe_warning_flags)); } @@ -6007,11 +6206,11 @@ int THD::decide_logging_format(TABLE_LIST *tables) DBUG_PRINT("info", ("decision: no logging since " "mysql_bin_log.is_open() = %d " "and (options & OPTION_BIN_LOG) = 0x%llx " - "and binlog_format = %lu " + "and binlog_format = %u " "and binlog_filter->db_ok(db) = %d", mysql_bin_log.is_open(), (variables.option_bits & OPTION_BIN_LOG), - WSREP_FORMAT(variables.binlog_format), + (uint) wsrep_binlog_format(), binlog_filter->db_ok(db))); #endif @@ -6048,13 +6247,11 @@ int THD::decide_logging_format(TABLE_LIST *tables) If error, NULL. */ -template <class RowsEventT> Rows_log_event* +template <class RowsEventT> Rows_log_event* THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id, - MY_BITMAP const* cols, - size_t colcnt, size_t needed, bool is_transactional, - RowsEventT *hint __attribute__((unused))) + RowsEventT *hint __attribute__((unused))) { DBUG_ENTER("binlog_prepare_pending_rows_event"); /* Pre-conditions */ @@ -6090,16 +6287,15 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id, event. */ if (!pending || - pending->server_id != serv_id || + pending->server_id != serv_id || pending->get_table_id() != table->s->table_map_id || - pending->get_general_type_code() != general_type_code || - pending->get_data_size() + needed > opt_binlog_rows_event_max_size || - pending->get_width() != colcnt || - !bitmap_cmp(pending->get_cols(), cols)) + pending->get_general_type_code() != general_type_code || + pending->get_data_size() + needed > opt_binlog_rows_event_max_size || + pending->read_write_bitmaps_cmp(table) == FALSE) { /* Create a new RowsEventT... */ Rows_log_event* const - ev= new RowsEventT(this, table, table->s->table_map_id, cols, + ev= new RowsEventT(this, table, table->s->table_map_id, is_transactional); if (unlikely(!ev)) DBUG_RETURN(NULL); @@ -6244,18 +6440,12 @@ CPP_UNNAMED_NS_START CPP_UNNAMED_NS_END -int THD::binlog_write_row(TABLE* table, bool is_trans, - MY_BITMAP const* cols, size_t colcnt, - uchar const *record) -{ -#ifdef WITH_WSREP - DBUG_ASSERT(is_current_stmt_binlog_format_row() && - ((WSREP(this) && wsrep_emulate_bin_log) || - mysql_bin_log.is_open())); -#else - DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); -#endif +int THD::binlog_write_row(TABLE* table, bool is_trans, + uchar const *record) +{ + DBUG_ASSERT(is_current_stmt_binlog_format_row() && + ((WSREP(this) && wsrep_emulate_bin_log) || mysql_bin_log.is_open())); /* Pack records into format for transfer. We are allocating more memory than needed, but that doesn't matter. @@ -6266,14 +6456,14 @@ int THD::binlog_write_row(TABLE* table, bool is_trans, uchar *row_data= memory.slot(0); - size_t const len= pack_row(table, cols, row_data, record); + size_t const len= pack_row(table, table->rpl_write_set, row_data, record); /* Ensure that all events in a GTID group are in the same cache */ if (variables.option_bits & OPTION_GTID_BEGIN) is_trans= 1; Rows_log_event* const ev= - binlog_prepare_pending_rows_event(table, variables.server_id, cols, colcnt, + binlog_prepare_pending_rows_event(table, variables.server_id, len, is_trans, static_cast<Write_rows_log_event*>(0)); @@ -6284,17 +6474,11 @@ int THD::binlog_write_row(TABLE* table, bool is_trans, } int THD::binlog_update_row(TABLE* table, bool is_trans, - MY_BITMAP const* cols, size_t colcnt, const uchar *before_record, const uchar *after_record) -{ -#ifdef WITH_WSREP - DBUG_ASSERT(is_current_stmt_binlog_format_row() && - ((WSREP(this) && wsrep_emulate_bin_log) - || mysql_bin_log.is_open())); -#else - DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); -#endif +{ + DBUG_ASSERT(is_current_stmt_binlog_format_row() && + ((WSREP(this) && wsrep_emulate_bin_log) || mysql_bin_log.is_open())); size_t const before_maxlen = max_row_length(table, before_record); size_t const after_maxlen = max_row_length(table, after_record); @@ -6306,9 +6490,9 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, uchar *before_row= row_data.slot(0); uchar *after_row= row_data.slot(1); - size_t const before_size= pack_row(table, cols, before_row, + size_t const before_size= pack_row(table, table->read_set, before_row, before_record); - size_t const after_size= pack_row(table, cols, after_row, + size_t const after_size= pack_row(table, table->rpl_write_set, after_row, after_record); /* Ensure that all events in a GTID group are in the same cache */ @@ -6327,31 +6511,42 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, #endif Rows_log_event* const ev= - binlog_prepare_pending_rows_event(table, variables.server_id, cols, colcnt, - before_size + after_size, is_trans, - static_cast<Update_rows_log_event*>(0)); + binlog_prepare_pending_rows_event(table, variables.server_id, + before_size + after_size, is_trans, + static_cast<Update_rows_log_event*>(0)); if (unlikely(ev == 0)) return HA_ERR_OUT_OF_MEM; - return - ev->add_row_data(before_row, before_size) || - ev->add_row_data(after_row, after_size); + int error= ev->add_row_data(before_row, before_size) || + ev->add_row_data(after_row, after_size); + + return error; + } int THD::binlog_delete_row(TABLE* table, bool is_trans, - MY_BITMAP const* cols, size_t colcnt, uchar const *record) -{ -#ifdef WITH_WSREP - DBUG_ASSERT(is_current_stmt_binlog_format_row() && - ((WSREP(this) && wsrep_emulate_bin_log) - || mysql_bin_log.is_open())); -#else - DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); -#endif +{ + DBUG_ASSERT(is_current_stmt_binlog_format_row() && + ((WSREP(this) && wsrep_emulate_bin_log) || mysql_bin_log.is_open())); + /** + Save a reference to the original read bitmaps + We will need this to restore the bitmaps at the end as + binlog_prepare_row_images() may change table->read_set. + table->read_set is used by pack_row and deep in + binlog_prepare_pending_events(). + */ + MY_BITMAP *old_read_set= table->read_set; - /* + /** + This will remove spurious fields required during execution but + not needed for binlogging. This is done according to the: + binlog-row-image option. + */ + binlog_prepare_row_images(table); + + /* Pack records into format for transfer. We are allocating more memory than needed, but that doesn't matter. */ @@ -6361,34 +6556,102 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans, uchar *row_data= memory.slot(0); - size_t const len= pack_row(table, cols, row_data, record); + DBUG_DUMP("table->read_set", (uchar*) table->read_set->bitmap, (table->s->fields + 7) / 8); + size_t const len= pack_row(table, table->read_set, row_data, record); /* Ensure that all events in a GTID group are in the same cache */ if (variables.option_bits & OPTION_GTID_BEGIN) is_trans= 1; Rows_log_event* const ev= - binlog_prepare_pending_rows_event(table, variables.server_id, cols, colcnt, - len, is_trans, - static_cast<Delete_rows_log_event*>(0)); + binlog_prepare_pending_rows_event(table, variables.server_id, + len, is_trans, + static_cast<Delete_rows_log_event*>(0)); if (unlikely(ev == 0)) return HA_ERR_OUT_OF_MEM; - return ev->add_row_data(row_data, len); + + int error= ev->add_row_data(row_data, len); + + /* restore read set for the rest of execution */ + table->column_bitmaps_set_no_signal(old_read_set, + table->write_set); + + return error; } +void THD::binlog_prepare_row_images(TABLE *table) +{ + DBUG_ENTER("THD::binlog_prepare_row_images"); + /** + Remove from read_set spurious columns. The write_set has been + handled before in table->mark_columns_needed_for_update. + */ + + DBUG_PRINT_BITSET("debug", "table->read_set (before preparing): %s", table->read_set); + THD *thd= table->in_use; + + /** + if there is a primary key in the table (ie, user declared PK or a + non-null unique index) and we dont want to ship the entire image, + and the handler involved supports this. + */ + if (table->s->primary_key < MAX_KEY && + (thd->variables.binlog_row_image < BINLOG_ROW_IMAGE_FULL) && + !ha_check_storage_engine_flag(table->s->db_type(), HTON_NO_BINLOG_ROW_OPT)) + { + /** + Just to be sure that tmp_set is currently not in use as + the read_set already. + */ + DBUG_ASSERT(table->read_set != &table->tmp_set); + + bitmap_clear_all(&table->tmp_set); + + switch(thd->variables.binlog_row_image) + { + case BINLOG_ROW_IMAGE_MINIMAL: + /* MINIMAL: Mark only PK */ + table->mark_columns_used_by_index_no_reset(table->s->primary_key, + &table->tmp_set); + break; + case BINLOG_ROW_IMAGE_NOBLOB: + /** + NOBLOB: Remove unnecessary BLOB fields from read_set + (the ones that are not part of PK). + */ + bitmap_union(&table->tmp_set, table->read_set); + for (Field **ptr=table->field ; *ptr ; ptr++) + { + Field *field= (*ptr); + if ((field->type() == MYSQL_TYPE_BLOB) && + !(field->flags & PRI_KEY_FLAG)) + bitmap_clear_bit(&table->tmp_set, field->field_index); + } + break; + default: + DBUG_ASSERT(0); // impossible. + } + + /* set the temporary read_set */ + table->column_bitmaps_set_no_signal(&table->tmp_set, + table->write_set); + } + + DBUG_PRINT_BITSET("debug", "table->read_set (after preparing): %s", table->read_set); + DBUG_VOID_RETURN; +} + + + int THD::binlog_remove_pending_rows_event(bool clear_maps, bool is_transactional) { DBUG_ENTER("THD::binlog_remove_pending_rows_event"); -#ifdef WITH_WSREP - if (!(WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open())) -#else - if (!mysql_bin_log.is_open()) -#endif + if(!WSREP_EMULATE_BINLOG(this) && !mysql_bin_log.is_open()) DBUG_RETURN(0); /* Ensure that all events in a GTID group are in the same cache */ @@ -6411,11 +6674,7 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional) mode: it might be the case that we left row-based mode before flushing anything (e.g., if we have explicitly locked tables). */ -#ifdef WITH_WSREP - if (!(WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open())) -#else - if (!mysql_bin_log.is_open()) -#endif + if(!WSREP_EMULATE_BINLOG(this) && !mysql_bin_log.is_open()) DBUG_RETURN(0); /* Ensure that all events in a GTID group are in the same cache */ @@ -6494,20 +6753,20 @@ static void reset_binlog_unsafe_suppression(ulonglong now) { unsafe_warnings_count[i]= 0; unsafe_warning_suppression_active[i]= 0; - } + } DBUG_VOID_RETURN; } /** Auxiliary function to print warning in the error log. */ -static void print_unsafe_warning_to_log(int unsafe_type, char* buf, - char* query) +static void print_unsafe_warning_to_log(THD *thd, int unsafe_type, char* buf, + char* query) { DBUG_ENTER("print_unsafe_warning_in_log"); - sprintf(buf, ER(ER_BINLOG_UNSAFE_STATEMENT), - ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type])); - sql_print_warning(ER(ER_MESSAGE_AND_STATEMENT), buf, query); + sprintf(buf, ER_THD(thd, ER_BINLOG_UNSAFE_STATEMENT), + ER_THD(thd, LEX::binlog_stmt_unsafe_errcode[unsafe_type])); + sql_print_warning(ER_THD(thd, ER_MESSAGE_AND_STATEMENT), buf, query); DBUG_VOID_RETURN; } @@ -6622,7 +6881,7 @@ void THD::issue_unsafe_warnings() if (!(unsafe_type_flags= binlog_unsafe_warning_flags)) DBUG_VOID_RETURN; // Nothing to do - + /* For each unsafe_type, check if the statement is unsafe in this way and issue a warning. @@ -6635,11 +6894,11 @@ void THD::issue_unsafe_warnings() { push_warning_printf(this, Sql_condition::WARN_LEVEL_NOTE, ER_BINLOG_UNSAFE_STATEMENT, - ER(ER_BINLOG_UNSAFE_STATEMENT), - ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type])); + ER_THD(this, ER_BINLOG_UNSAFE_STATEMENT), + ER_THD(this, LEX::binlog_stmt_unsafe_errcode[unsafe_type])); if (global_system_variables.log_warnings > 0 && !protect_against_unsafe_warning_flood(unsafe_type)) - print_unsafe_warning_to_log(unsafe_type, buf, query()); + print_unsafe_warning_to_log(this, unsafe_type, buf, query()); } } DBUG_VOID_RETURN; @@ -6677,12 +6936,9 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, DBUG_ENTER("THD::binlog_query"); DBUG_PRINT("enter", ("qtype: %s query: '%-.*s'", show_query_type(qtype), (int) query_len, query_arg)); -#ifdef WITH_WSREP - DBUG_ASSERT(query_arg && (WSREP_EMULATE_BINLOG(this) - || mysql_bin_log.is_open())); -#else - DBUG_ASSERT(query_arg && mysql_bin_log.is_open()); -#endif + + DBUG_ASSERT(query_arg); + DBUG_ASSERT(WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open()); /* If this is withing a BEGIN ... COMMIT group, don't log it */ if (variables.option_bits & OPTION_GTID_BEGIN) @@ -6700,6 +6956,7 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, */ DBUG_RETURN(0); } + /* If we are not in prelocked mode, mysql_unlock_tables() will be called after this binlog_query(), so we have to flush the pending @@ -7028,7 +7285,7 @@ wait_for_commit::wait_for_prior_commit2(THD *thd) wakeup_error= thd->killed_errno(); if (!wakeup_error) wakeup_error= ER_QUERY_INTERRUPTED; - my_message(wakeup_error, ER(wakeup_error), MYF(0)); + my_message(wakeup_error, ER_THD(thd, wakeup_error), MYF(0)); thd->EXIT_COND(&old_stage); /* Must do the DEBUG_SYNC() _after_ exit_cond(), as DEBUG_SYNC is not safe to |