diff options
Diffstat (limited to 'sql/sql_class.cc')
-rw-r--r-- | sql/sql_class.cc | 1636 |
1 files changed, 1120 insertions, 516 deletions
diff --git a/sql/sql_class.cc b/sql/sql_class.cc index a1f8dbab175..2fcb6120076 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -25,13 +25,24 @@ #pragma implementation // gcc: Class implementation #endif -#include "mysql_priv.h" +#include "my_global.h" /* NO_EMBEDDED_ACCESS_CHECKS */ +#include "sql_priv.h" +#include "unireg.h" // REQUIRED: for other includes +#include "sql_class.h" +#include "sql_cache.h" // query_cache_abort +#include "sql_base.h" // close_thread_tables +#include "sql_time.h" // date_time_format_copy +#include "sql_acl.h" // NO_ACCESS, + // acl_getroot_no_password +#include "sql_base.h" // close_temporary_tables +#include "sql_handler.h" // mysql_ha_cleanup #include "rpl_rli.h" #include "rpl_filter.h" #include "rpl_record.h" #include "slave.h" #include <my_bitmap.h> #include "log_event.h" +#include "sql_audit.h" #include <m_ctype.h> #include <sys/stat.h> #include <thr_alarm.h> @@ -39,11 +50,15 @@ #include <io.h> #endif #include <mysys_err.h> +#include <limits.h> #include "sp_rcontext.h" #include "sp_cache.h" +#include "transaction.h" #include "sql_select.h" /* declares create_tmp_table() */ #include "debug_sync.h" +#include "sql_parse.h" // is_update_query +#include "sql_callback.h" /* The following is used to initialise Table_ident with a internal @@ -86,13 +101,15 @@ extern "C" void free_user_var(user_var_entry *entry) { char *pos= (char*) entry+ALIGN_SIZE(sizeof(*entry)); if (entry->value && entry->value != pos) - my_free(entry->value, MYF(0)); - my_free((char*) entry,MYF(0)); + my_free(entry->value); + my_free(entry); } bool Key_part_spec::operator==(const Key_part_spec& other) const { - return length == other.length && !strcmp(field_name, other.field_name); + return length == other.length && + !my_strcasecmp(system_charset_info, field_name.str, + other.field_name.str); } /** @@ -255,25 +272,6 @@ bool Foreign_key::validate(List<Create_field> &table_fields) ** Thread specific functions ****************************************************************************/ -/** Push an error to the error stack and return TRUE for now. */ - -bool -Reprepare_observer::report_error(THD *thd) -{ - my_error(ER_NEED_REPREPARE, MYF(ME_NO_WARNING_FOR_ERROR|ME_NO_SP_HANDLER)); - - m_invalidated= TRUE; - - return TRUE; -} - - -Open_tables_state::Open_tables_state(ulong version_arg) - :version(version_arg), state_flags(0U) -{ - reset_open_tables_state(); -} - /* The following functions form part of the C plugin API */ @@ -327,9 +325,8 @@ const char *set_thd_proc_info(THD *thd, const char *info, thd= current_thd; const char *old_info= thd->proc_info; - DBUG_PRINT("proc_info", ("%s:%d %s", calling_file, calling_line, - (info != NULL) ? info : "")); -#if defined(ENABLED_PROFILING) && defined(COMMUNITY_SERVER) + DBUG_PRINT("proc_info", ("%s:%d %s", calling_file, calling_line, info)); +#if defined(ENABLED_PROFILING) thd->profiling.status_change(info, calling_function, calling_file, calling_line); #endif thd->proc_info= info; @@ -337,11 +334,36 @@ const char *set_thd_proc_info(THD *thd, const char *info, } extern "C" +const char* thd_enter_cond(MYSQL_THD thd, mysql_cond_t *cond, + mysql_mutex_t *mutex, const char *msg) +{ + if (!thd) + thd= current_thd; + + return thd->enter_cond(cond, mutex, msg); +} + +extern "C" +void thd_exit_cond(MYSQL_THD thd, const char *old_msg) +{ + if (!thd) + thd= current_thd; + + thd->exit_cond(old_msg); + return; +} + +extern "C" void **thd_ha_data(const THD *thd, const struct handlerton *hton) { return (void **) &thd->ha_data[hton->slot].ha_ptr; } +extern "C" +void thd_storage_lock_wait(THD *thd, long long value) +{ + thd->utime_after_lock+= value; +} /** Provide a handler data getter to simplify coding @@ -376,7 +398,7 @@ void thd_set_ha_data(THD *thd, const struct handlerton *hton, extern "C" long long thd_test_options(const THD *thd, long long test_options) { - return thd->options & test_options; + return thd->variables.option_bits & test_options; } extern "C" @@ -388,13 +410,13 @@ int thd_sql_command(const THD *thd) extern "C" int thd_tx_isolation(const THD *thd) { - return (int) thd->variables.tx_isolation; + return (int) thd->tx_isolation; } extern "C" void thd_inc_row_count(THD *thd) { - thd->row_count++; + thd->warning_info->inc_current_row_for_warning(); } @@ -465,7 +487,7 @@ char *thd_security_context(THD *thd, char *buffer, unsigned int length, str.append(proc_info); } - pthread_mutex_lock(&thd->LOCK_thd_data); + mysql_mutex_lock(&thd->LOCK_thd_data); if (thd->query()) { @@ -477,7 +499,7 @@ char *thd_security_context(THD *thd, char *buffer, unsigned int length, str.append(thd->query(), len); } - pthread_mutex_unlock(&thd->LOCK_thd_data); + mysql_mutex_unlock(&thd->LOCK_thd_data); if (str.c_ptr_safe() == buffer) return buffer; @@ -496,7 +518,7 @@ char *thd_security_context(THD *thd, char *buffer, unsigned int length, /** - Implementation of Drop_table_error_handler::handle_error(). + Implementation of Drop_table_error_handler::handle_condition(). The reason in having this implementation is to silence technical low-level warnings during DROP TABLE operation. Currently we don't want to expose the following warnings during DROP TABLE: @@ -509,160 +531,26 @@ char *thd_security_context(THD *thd, char *buffer, unsigned int length, @return TRUE if the condition is handled. */ -bool Drop_table_error_handler::handle_error(uint sql_errno, - const char *message, - MYSQL_ERROR::enum_warning_level level, - THD *thd) -{ +bool Drop_table_error_handler::handle_condition(THD *thd, + uint sql_errno, + const char* sqlstate, + MYSQL_ERROR::enum_warning_level level, + const char* msg, + MYSQL_ERROR ** cond_hdl) +{ + *cond_hdl= NULL; return ((sql_errno == EE_DELETE && my_errno == ENOENT) || sql_errno == ER_TRG_NO_DEFINER); } -/** - Clear this diagnostics area. - - Normally called at the end of a statement. -*/ - -void -Diagnostics_area::reset_diagnostics_area() -{ - DBUG_ENTER("reset_diagnostics_area"); -#ifdef DBUG_OFF - can_overwrite_status= FALSE; - /** Don't take chances in production */ - m_message[0]= '\0'; - m_sql_errno= 0; - m_server_status= 0; - m_affected_rows= 0; - m_last_insert_id= 0; - m_total_warn_count= 0; -#endif - is_sent= FALSE; - /** Tiny reset in debug mode to see garbage right away */ - m_status= DA_EMPTY; - DBUG_VOID_RETURN; -} - - -/** - Set OK status -- ends commands that do not return a - result set, e.g. INSERT/UPDATE/DELETE. -*/ - -void -Diagnostics_area::set_ok_status(THD *thd, ha_rows affected_rows_arg, - ulonglong last_insert_id_arg, - const char *message_arg) -{ - DBUG_ENTER("set_ok_status"); - DBUG_ASSERT(! is_set()); - /* - In production, refuse to overwrite an error or a custom response - with an OK packet. - */ - if (is_error() || is_disabled()) - return; - - m_server_status= thd->server_status; - m_total_warn_count= thd->total_warn_count; - m_affected_rows= affected_rows_arg; - m_last_insert_id= last_insert_id_arg; - if (message_arg) - strmake(m_message, message_arg, sizeof(m_message) - 1); - else - m_message[0]= '\0'; - m_status= DA_OK; - DBUG_VOID_RETURN; -} - - -/** - Set EOF status. -*/ - -void -Diagnostics_area::set_eof_status(THD *thd) -{ - DBUG_ENTER("set_eof_status"); - /* Only allowed to report eof if has not yet reported an error */ - DBUG_ASSERT(! is_set()); - /* - In production, refuse to overwrite an error or a custom response - with an EOF packet. - */ - if (is_error() || is_disabled()) - return; - - m_server_status= thd->server_status; - /* - If inside a stored procedure, do not return the total - number of warnings, since they are not available to the client - anyway. - */ - m_total_warn_count= thd->spcont ? 0 : thd->total_warn_count; - - m_status= DA_EOF; - DBUG_VOID_RETURN; -} - -/** - Set ERROR status. -*/ - -void -Diagnostics_area::set_error_status(THD *thd, uint sql_errno_arg, - const char *message_arg) -{ - DBUG_ENTER("set_error_status"); - /* - Only allowed to report error if has not yet reported a success - The only exception is when we flush the message to the client, - an error can happen during the flush. - */ - DBUG_ASSERT(! is_set() || can_overwrite_status); -#ifdef DBUG_OFF - /* - In production, refuse to overwrite a custom response with an - ERROR packet. - */ - if (is_disabled()) - return; -#endif - - m_sql_errno= sql_errno_arg; - strmake(m_message, message_arg, sizeof(m_message)-1); - - m_status= DA_ERROR; - DBUG_VOID_RETURN; -} - - -/** - Mark the diagnostics area as 'DISABLED'. - - This is used in rare cases when the COM_ command at hand sends a response - in a custom format. One example is the query cache, another is - COM_STMT_PREPARE. -*/ - -void -Diagnostics_area::disable_status() -{ - DBUG_ASSERT(! is_set()); - m_status= DA_DISABLED; -} - - THD::THD() :Statement(&main_lex, &main_mem_root, CONVENTIONAL_EXECUTION, /* statement id */ 0), - Open_tables_state(refresh_version), rli_fake(0), - lock_id(&main_lock_id), + rli_fake(0), user_time(0), in_sub_stmt(0), - sql_log_bin_toplevel(false), - binlog_table_maps(0), binlog_flags(0UL), + binlog_unsafe_warning_flags(0), + binlog_table_maps(0), table_map_for_update(0), arg_of_last_insert_id_function(FALSE), first_successful_insert_id_in_prev_stmt(0), @@ -670,7 +558,8 @@ THD::THD() first_successful_insert_id_in_cur_stmt(0), stmt_depends_on_first_successful_insert_id_in_prev_stmt(FALSE), examined_row_count(0), - global_read_lock(0), + warning_info(&main_warning_info), + stmt_da(&main_da), is_fatal_error(0), transaction_rollback_request(0), is_fatal_sub_stmt_error(0), @@ -680,13 +569,15 @@ THD::THD() bootstrap(0), derived_tables_processing(FALSE), spcont(NULL), - m_parser_state(NULL) + m_parser_state(NULL), #if defined(ENABLED_DEBUG_SYNC) - , debug_sync_control(0) + debug_sync_control(0), #endif /* defined(ENABLED_DEBUG_SYNC) */ + main_warning_info(0) { ulong tmp; + mdl_context.init(this); /* Pass nominal parameters to init_alloc_root only to ensure that the destructor works OK in case of an error. The main_mem_root @@ -700,22 +591,20 @@ THD::THD() catalog= (char*)"std"; // the only catalog we have for now main_security_ctx.init(); security_ctx= &main_security_ctx; - locked=some_tables_deleted=no_errors=password= 0; + no_errors=password= 0; query_start_used= 0; count_cuted_fields= CHECK_FIELD_IGNORE; killed= NOT_KILLED; col_access=0; is_slave_error= thread_specific_used= FALSE; - hash_clear(&handler_tables_hash); + my_hash_clear(&handler_tables_hash); tmp_table=0; used_tables=0; - cuted_fields= sent_row_count= row_count= 0L; + cuted_fields= 0L; + sent_row_count= 0L; limit_found_rows= 0; - row_count_func= -1; + m_row_count_func= -1; statement_id_counter= 0UL; -#ifdef ERROR_INJECT_SUPPORT - error_inject_value= 0UL; -#endif // Must be reset to handle error with THD's created for init of mysqld lex->current_select= 0; start_time=(time_t) 0; @@ -729,7 +618,6 @@ THD::THD() file_id = 0; query_id= 0; query_name_consts= 0; - warn_id= 0; db_charset= global_system_variables.collation_database; bzero(ha_data, sizeof(ha_data)); mysys_var=0; @@ -740,12 +628,10 @@ THD::THD() dbug_sentry=THD_SENTRY_MAGIC; #endif #ifndef EMBEDDED_LIBRARY + mysql_audit_init_thd(this); net.vio=0; #endif client_capabilities= 0; // minimalistic client -#ifdef HAVE_QUERY_CACHE - query_cache_init_query(&net); // If error on boot -#endif ull=0; system_thread= NON_SYSTEM_THREAD; cleanup_done= abort_on_warning= no_warnings_for_error= 0; @@ -759,7 +645,7 @@ THD::THD() #ifdef SIGNAL_WITH_VIO_CLOSE active_vio = 0; #endif - pthread_mutex_init(&LOCK_thd_data, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_thd_data, &LOCK_thd_data, MY_MUTEX_INIT_FAST); /* Variables with default values */ proc_info="login"; @@ -769,16 +655,17 @@ THD::THD() command=COM_CONNECT; *scramble= '\0'; + /* Call to init() below requires fully initialized Open_tables_state. */ + reset_open_tables_state(this); + init(); - /* Initialize sub structures */ - init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE); -#if defined(ENABLED_PROFILING) && defined(COMMUNITY_SERVER) +#if defined(ENABLED_PROFILING) profiling.set_thd(this); #endif user_connect=(USER_CONN *)0; - hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0, - (hash_get_key) get_var_key, - (hash_free_key) free_user_var, 0); + my_hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0, + (my_hash_get_key) get_var_key, + (my_hash_free_key) free_user_var, 0); sp_proc_cache= NULL; sp_func_cache= NULL; @@ -800,7 +687,6 @@ THD::THD() my_rnd_init(&rand, tmp + (ulong) &rand, tmp + (ulong) ::global_query_id); substitute_null_with_insert_id = FALSE; thr_lock_info_init(&lock_info); /* safety: will be reset after start */ - thr_lock_owner_init(&main_lock_id, &lock_info); m_internal_handler= NULL; arena_for_cached_items= 0; @@ -825,16 +711,27 @@ void THD::push_internal_handler(Internal_error_handler *handler) DBUG_VOID_RETURN; } - -bool THD::handle_error(uint sql_errno, const char *message, - MYSQL_ERROR::enum_warning_level level) +bool THD::handle_condition(uint sql_errno, + const char* sqlstate, + MYSQL_ERROR::enum_warning_level level, + const char* msg, + MYSQL_ERROR ** cond_hdl) { + if (!m_internal_handler) + { + *cond_hdl= NULL; + return FALSE; + } + for (Internal_error_handler *error_handler= m_internal_handler; error_handler; error_handler= error_handler->m_prev_internal_handler) { - if (error_handler->handle_error(sql_errno, message, level, this)) + if (error_handler->handle_condition(this, sql_errno, sqlstate, level, msg, + cond_hdl)) + { return TRUE; + } } return FALSE; } @@ -849,6 +746,185 @@ Internal_error_handler *THD::pop_internal_handler() DBUG_RETURN(popped_handler); } + +void THD::raise_error(uint sql_errno) +{ + const char* msg= ER(sql_errno); + (void) raise_condition(sql_errno, + NULL, + MYSQL_ERROR::WARN_LEVEL_ERROR, + msg); +} + +void THD::raise_error_printf(uint sql_errno, ...) +{ + va_list args; + char ebuff[MYSQL_ERRMSG_SIZE]; + DBUG_ENTER("THD::raise_error_printf"); + DBUG_PRINT("my", ("nr: %d errno: %d", sql_errno, errno)); + const char* format= ER(sql_errno); + va_start(args, sql_errno); + my_vsnprintf(ebuff, sizeof(ebuff), format, args); + va_end(args); + (void) raise_condition(sql_errno, + NULL, + MYSQL_ERROR::WARN_LEVEL_ERROR, + ebuff); + DBUG_VOID_RETURN; +} + +void THD::raise_warning(uint sql_errno) +{ + const char* msg= ER(sql_errno); + (void) raise_condition(sql_errno, + NULL, + MYSQL_ERROR::WARN_LEVEL_WARN, + msg); +} + +void THD::raise_warning_printf(uint sql_errno, ...) +{ + va_list args; + char ebuff[MYSQL_ERRMSG_SIZE]; + DBUG_ENTER("THD::raise_warning_printf"); + DBUG_PRINT("enter", ("warning: %u", sql_errno)); + const char* format= ER(sql_errno); + va_start(args, sql_errno); + my_vsnprintf(ebuff, sizeof(ebuff), format, args); + va_end(args); + (void) raise_condition(sql_errno, + NULL, + MYSQL_ERROR::WARN_LEVEL_WARN, + ebuff); + DBUG_VOID_RETURN; +} + +void THD::raise_note(uint sql_errno) +{ + DBUG_ENTER("THD::raise_note"); + DBUG_PRINT("enter", ("code: %d", sql_errno)); + if (!(variables.option_bits & OPTION_SQL_NOTES)) + DBUG_VOID_RETURN; + const char* msg= ER(sql_errno); + (void) raise_condition(sql_errno, + NULL, + MYSQL_ERROR::WARN_LEVEL_NOTE, + msg); + DBUG_VOID_RETURN; +} + +void THD::raise_note_printf(uint sql_errno, ...) +{ + va_list args; + char ebuff[MYSQL_ERRMSG_SIZE]; + DBUG_ENTER("THD::raise_note_printf"); + DBUG_PRINT("enter",("code: %u", sql_errno)); + if (!(variables.option_bits & OPTION_SQL_NOTES)) + DBUG_VOID_RETURN; + const char* format= ER(sql_errno); + va_start(args, sql_errno); + my_vsnprintf(ebuff, sizeof(ebuff), format, args); + va_end(args); + (void) raise_condition(sql_errno, + NULL, + MYSQL_ERROR::WARN_LEVEL_NOTE, + ebuff); + DBUG_VOID_RETURN; +} + +MYSQL_ERROR* THD::raise_condition(uint sql_errno, + const char* sqlstate, + MYSQL_ERROR::enum_warning_level level, + const char* msg) +{ + MYSQL_ERROR *cond= NULL; + DBUG_ENTER("THD::raise_condition"); + + if (!(variables.option_bits & OPTION_SQL_NOTES) && + (level == MYSQL_ERROR::WARN_LEVEL_NOTE)) + DBUG_RETURN(NULL); + + warning_info->opt_clear_warning_info(query_id); + + /* + TODO: replace by DBUG_ASSERT(sql_errno != 0) once all bugs similar to + Bug#36768 are fixed: a SQL condition must have a real (!=0) error number + so that it can be caught by handlers. + */ + if (sql_errno == 0) + sql_errno= ER_UNKNOWN_ERROR; + if (msg == NULL) + msg= ER(sql_errno); + if (sqlstate == NULL) + sqlstate= mysql_errno_to_sqlstate(sql_errno); + + if ((level == MYSQL_ERROR::WARN_LEVEL_WARN) && + really_abort_on_warning()) + { + /* + FIXME: + push_warning and strict SQL_MODE case. + */ + level= MYSQL_ERROR::WARN_LEVEL_ERROR; + killed= THD::KILL_BAD_DATA; + } + + switch (level) + { + case MYSQL_ERROR::WARN_LEVEL_NOTE: + case MYSQL_ERROR::WARN_LEVEL_WARN: + got_warning= 1; + break; + case MYSQL_ERROR::WARN_LEVEL_ERROR: + break; + default: + DBUG_ASSERT(FALSE); + } + + if (handle_condition(sql_errno, sqlstate, level, msg, &cond)) + DBUG_RETURN(cond); + + if (level == MYSQL_ERROR::WARN_LEVEL_ERROR) + { + is_slave_error= 1; // needed to catch query errors during replication + + /* + thd->lex->current_select == 0 if lex structure is not inited + (not query command (COM_QUERY)) + */ + if (lex->current_select && + lex->current_select->no_error && !is_fatal_error) + { + DBUG_PRINT("error", + ("Error converted to warning: current_select: no_error %d " + "fatal_error: %d", + (lex->current_select ? + lex->current_select->no_error : 0), + (int) is_fatal_error)); + } + else + { + if (! stmt_da->is_error()) + { + set_row_count_func(-1); + stmt_da->set_error_status(this, sql_errno, msg, sqlstate); + } + } + } + + query_cache_abort(&query_cache_tls); + + /* FIXME: broken special case */ + if (no_warnings_for_error && (level == MYSQL_ERROR::WARN_LEVEL_ERROR)) + DBUG_RETURN(NULL); + + /* When simulating OOM, skip writing to error log to avoid mtr errors */ + DBUG_EXECUTE_IF("simulate_out_of_memory", DBUG_RETURN(NULL);); + + cond= warning_info->push_warning(this, sql_errno, sqlstate, level, msg); + DBUG_RETURN(cond); +} + extern "C" void *thd_alloc(MYSQL_THD thd, unsigned int size) { @@ -906,45 +982,36 @@ extern "C" THD *_current_thd_noinline(void) void THD::init(void) { - pthread_mutex_lock(&LOCK_global_system_variables); + mysql_mutex_lock(&LOCK_global_system_variables); plugin_thdvar_init(this); - variables.time_format= date_time_format_copy((THD*) 0, - variables.time_format); - variables.date_format= date_time_format_copy((THD*) 0, - variables.date_format); - variables.datetime_format= date_time_format_copy((THD*) 0, - variables.datetime_format); /* variables= global_system_variables above has reset variables.pseudo_thread_id to 0. We need to correct it here to avoid temporary tables replication failure. */ variables.pseudo_thread_id= thread_id; - pthread_mutex_unlock(&LOCK_global_system_variables); + mysql_mutex_unlock(&LOCK_global_system_variables); server_status= SERVER_STATUS_AUTOCOMMIT; if (variables.sql_mode & MODE_NO_BACKSLASH_ESCAPES) server_status|= SERVER_STATUS_NO_BACKSLASH_ESCAPES; - options= thd_startup_options; - if (variables.max_join_size == HA_POS_ERROR) - options |= OPTION_BIG_SELECTS; - else - options &= ~OPTION_BIG_SELECTS; - - transaction.all.modified_non_trans_table= transaction.stmt.modified_non_trans_table= FALSE; + transaction.all.modified_non_trans_table= + transaction.stmt.modified_non_trans_table= FALSE; open_options=ha_open_options; update_lock_default= (variables.low_priority_updates ? TL_WRITE_LOW_PRIORITY : TL_WRITE); - session_tx_isolation= (enum_tx_isolation) variables.tx_isolation; - warn_list.empty(); - bzero((char*) warn_count, sizeof(warn_count)); - total_warn_count= 0; + tx_isolation= (enum_tx_isolation) variables.tx_isolation; update_charset(); - reset_current_stmt_binlog_row_based(); + reset_current_stmt_binlog_format_row(); bzero((char *) &status_var, sizeof(status_var)); bzero((char *) &org_status_var, sizeof(org_status_var)); - sql_log_bin_toplevel= options & OPTION_BIN_LOG; + + if (variables.sql_log_bin) + variables.option_bits|= OPTION_BIN_LOG; + else + variables.option_bits&= ~OPTION_BIN_LOG; + select_commands= update_commands= other_commands= 0; /* Set to handle counting of aborted connections */ userstat_running= opt_userstat_running; @@ -1016,11 +1083,9 @@ void THD::init_for_queries() reset_root_defaults(mem_root, variables.query_alloc_block_size, variables.query_prealloc_size); -#ifdef USING_TRANSACTIONS reset_root_defaults(&transaction.mem_root, variables.trans_alloc_block_size, variables.trans_prealloc_size); -#endif transaction.xid_state.xid.null(); transaction.xid_state.in_thd=1; } @@ -1039,18 +1104,18 @@ void THD::init_for_queries() void THD::change_user(void) { - pthread_mutex_lock(&LOCK_status); + mysql_mutex_lock(&LOCK_status); add_to_status(&global_status_var, &status_var); - pthread_mutex_unlock(&LOCK_status); + mysql_mutex_unlock(&LOCK_status); cleanup(); killed= NOT_KILLED; cleanup_done= 0; init(); stmt_map.reset(); - hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0, - (hash_get_key) get_var_key, - (hash_free_key) free_user_var, 0); + my_hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0, + (my_hash_get_key) get_var_key, + (my_hash_free_key) free_user_var, 0); sp_cache_clear(&sp_proc_cache); sp_cache_clear(&sp_func_cache); } @@ -1071,14 +1136,29 @@ void THD::cleanup(void) } #endif { - ha_rollback(this); + transaction.xid_state.xa_state= XA_NOTR; + trans_rollback(this); xid_cache_delete(&transaction.xid_state); } - if (locked_tables) - { - lock=locked_tables; locked_tables=0; - close_thread_tables(this); - } + + locked_tables_list.unlock_locked_tables(this); + mysql_ha_cleanup(this); + + DBUG_ASSERT(open_tables == NULL); + /* + If the thread was in the middle of an ongoing transaction (rolled + back a few lines above) or under LOCK TABLES (unlocked the tables + and left the mode a few lines above), there will be outstanding + metadata locks. Release them. + */ + mdl_context.release_transactional_locks(); + + /* Release the global read lock, if acquired. */ + if (global_read_lock.is_acquired()) + global_read_lock.unlock_global_read_lock(this); + + /* All metadata locks must have been released by now. */ + DBUG_ASSERT(!mdl_context.has_locks()); wt_thd_destroy(&transaction.wt); #if defined(ENABLED_DEBUG_SYNC) @@ -1086,24 +1166,17 @@ void THD::cleanup(void) debug_sync_end_thread(this); #endif /* defined(ENABLED_DEBUG_SYNC) */ - mysql_ha_cleanup(this); delete_dynamic(&user_var_events); - hash_free(&user_vars); + my_hash_free(&user_vars); close_temporary_tables(this); - my_free((char*) variables.time_format, MYF(MY_ALLOW_ZERO_PTR)); - my_free((char*) variables.date_format, MYF(MY_ALLOW_ZERO_PTR)); - my_free((char*) variables.datetime_format, MYF(MY_ALLOW_ZERO_PTR)); - sp_cache_clear(&sp_proc_cache); sp_cache_clear(&sp_func_cache); - if (global_read_lock) - unlock_global_read_lock(this); if (ull) { - pthread_mutex_lock(&LOCK_user_locks); + mysql_mutex_lock(&LOCK_user_locks); item_user_lock_release(ull); - pthread_mutex_unlock(&LOCK_user_locks); + mysql_mutex_unlock(&LOCK_user_locks); ull= NULL; } @@ -1117,8 +1190,9 @@ THD::~THD() THD_CHECK_SENTRY(this); DBUG_ENTER("~THD()"); /* Ensure that no one is using THD */ - pthread_mutex_lock(&LOCK_thd_data); - pthread_mutex_unlock(&LOCK_thd_data); + mysql_mutex_lock(&LOCK_thd_data); + mysys_var=0; // Safety (shouldn't be needed) + mysql_mutex_unlock(&LOCK_thd_data); add_to_status(&global_status_var, &status_var); /* Close connection */ @@ -1130,23 +1204,20 @@ THD::~THD() } #endif stmt_map.reset(); /* close all prepared statements */ - DBUG_ASSERT(lock_info.n_cursors == 0); - if (!cleanup_done) cleanup(); + mdl_context.destroy(); ha_close_connection(this); + mysql_audit_release(this); plugin_thdvar_cleanup(this); DBUG_PRINT("info", ("freeing security context")); main_security_ctx.destroy(); - safeFree(db); - free_root(&warn_root,MYF(0)); -#ifdef USING_TRANSACTIONS + my_free(db); + db= NULL; free_root(&transaction.mem_root,MYF(0)); -#endif - mysys_var=0; // Safety (shouldn't be needed) - pthread_mutex_destroy(&LOCK_thd_data); + mysql_mutex_destroy(&LOCK_thd_data); #ifndef DBUG_OFF dbug_sentry= THD_SENTRY_GONE; #endif @@ -1156,6 +1227,8 @@ THD::~THD() delete rli_fake; rli_fake= NULL; } + + mysql_audit_free_thd(this); #endif free_root(&main_mem_root, MYF(0)); @@ -1242,14 +1315,14 @@ void THD::awake(THD::killed_state state_to_set) DBUG_ENTER("THD::awake"); DBUG_PRINT("enter", ("this: 0x%lx", (long) this)); THD_CHECK_SENTRY(this); - safe_mutex_assert_owner(&LOCK_thd_data); + mysql_mutex_assert_owner(&LOCK_thd_data); killed= state_to_set; if (state_to_set != THD::KILL_QUERY) { thr_alarm_kill(thread_id); if (!slave_thread) - thread_scheduler.post_kill_notification(this); + MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (this)); #ifdef SIGNAL_WITH_VIO_CLOSE if (this != current_thd) { @@ -1269,7 +1342,7 @@ void THD::awake(THD::killed_state state_to_set) } if (mysys_var) { - pthread_mutex_lock(&mysys_var->mutex); + mysql_mutex_lock(&mysys_var->mutex); if (!system_thread) // Don't abort locks mysys_var->abort=1; /* @@ -1290,37 +1363,14 @@ void THD::awake(THD::killed_state state_to_set) we issue a second KILL or the status it's waiting for happens). It's true that we have set its thd->killed but it may not see it immediately and so may have time to reach the cond_wait(). - - We have to do the loop with trylock, because if we would use - pthread_mutex_lock(), we can cause a deadlock as we are here locking - the mysys_var->mutex and mysys_var->current_mutex in a different order - than in the thread we are trying to kill. - We only sleep for 2 seconds as we don't want to have LOCK_thd_data - locked too long time. - - There is a small change we may not succeed in aborting a thread that - is not yet waiting for a mutex, but as this happens only for a - thread that was doing something else when the kill was issued and - which should detect the kill flag before it starts to wait, this - should be good enough. */ if (mysys_var->current_cond && mysys_var->current_mutex) { - uint i; - for (i= 0; i < WAIT_FOR_KILL_TRY_TIMES * SECONDS_TO_WAIT_FOR_KILL; i++) - { - int ret= pthread_mutex_trylock(mysys_var->current_mutex); - pthread_cond_broadcast(mysys_var->current_cond); - if (!ret) - { - /* Signal is sure to get through */ - pthread_mutex_unlock(mysys_var->current_mutex); - break; - } - } - my_sleep(1000000L / WAIT_FOR_KILL_TRY_TIMES); + mysql_mutex_lock(mysys_var->current_mutex); + mysql_cond_broadcast(mysys_var->current_cond); + mysql_mutex_unlock(mysys_var->current_mutex); } - pthread_mutex_unlock(&mysys_var->mutex); + mysql_mutex_unlock(&mysys_var->mutex); } DBUG_VOID_RETURN; } @@ -1341,6 +1391,15 @@ bool THD::store_globals() if (my_pthread_setspecific_ptr(THR_THD, this) || my_pthread_setspecific_ptr(THR_MALLOC, &mem_root)) return 1; + /* + mysys_var is concurrently readable by a killer thread. + It is protected by LOCK_thd_data, it is not needed to lock while the + pointer is changing from NULL not non-NULL. If the kill thread reads + NULL it doesn't refer to anything, but if it is non-NULL we need to + ensure that the thread doesn't proceed to assign another thread to + have the mysys_var reference (which in fact refers to the worker + threads local storage with key THR_KEY_mysys. + */ mysys_var=my_thread_var; /* Let mysqld define the thread id (not mysys) @@ -1357,14 +1416,7 @@ bool THD::store_globals() */ thr_lock_info_init(&lock_info); -#ifdef SAFE_MUTEX - /* Register order of mutex for wrong mutex deadlock detector */ - pthread_mutex_lock(&LOCK_thd_data); - pthread_mutex_lock(&mysys_var->mutex); - - pthread_mutex_unlock(&mysys_var->mutex); - pthread_mutex_unlock(&LOCK_thd_data); -#endif +#warning add registration of mutex order if needed return 0; } @@ -1537,15 +1589,21 @@ bool THD::convert_string(String *s, CHARSET_INFO *from_cs, CHARSET_INFO *to_cs) void THD::update_charset() { uint32 not_used; - charset_is_system_charset= !String::needs_conversion(0,charset(), - system_charset_info, - ¬_used); + charset_is_system_charset= + !String::needs_conversion(0, + variables.character_set_client, + system_charset_info, + ¬_used); charset_is_collation_connection= - !String::needs_conversion(0,charset(),variables.collation_connection, + !String::needs_conversion(0, + variables.character_set_client, + variables.collation_connection, ¬_used); charset_is_character_set_filesystem= - !String::needs_conversion(0, charset(), - variables.character_set_filesystem, ¬_used); + !String::needs_conversion(0, + variables.character_set_client, + variables.character_set_filesystem, + ¬_used); } @@ -1568,8 +1626,7 @@ void THD::add_changed_table(TABLE *table) { DBUG_ENTER("THD::add_changed_table(table)"); - DBUG_ASSERT((options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && - table->file->has_transactions()); + DBUG_ASSERT(in_multi_stmt_transaction_mode() && table->file->has_transactions()); add_changed_table(table->s->table_cache_key.str, (long) table->s->table_cache_key.length); DBUG_VOID_RETURN; @@ -1679,15 +1736,15 @@ int THD::send_explain_fields(select_result *result) } item->maybe_null= 1; field_list.push_back(new Item_empty_string("Extra", 255, cs)); - return (result->send_fields(field_list, - Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)); + return (result->send_result_set_metadata(field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)); } #ifdef SIGNAL_WITH_VIO_CLOSE void THD::close_active_vio() { DBUG_ENTER("close_active_vio"); - safe_mutex_assert_owner(&LOCK_thd_data); + mysql_mutex_assert_owner(&LOCK_thd_data); #ifndef EMBEDDED_LIBRARY if (active_vio) { @@ -1763,7 +1820,6 @@ void THD::rollback_item_tree_changes() select_result::select_result() { thd=current_thd; - nest_level= (uint) -1; } void select_result::send_error(uint errcode,const char *err) @@ -1787,13 +1843,17 @@ bool select_result::check_simple_select() const static String default_line_term("\n",default_charset_info); static String default_escaped("\\",default_charset_info); static String default_field_term("\t",default_charset_info); +static String default_xml_row_term("<row>", default_charset_info); -sql_exchange::sql_exchange(char *name,bool flag) +sql_exchange::sql_exchange(char *name, bool flag, + enum enum_filetype filetype_arg) :file_name(name), opt_enclosed(0), dumpfile(flag), skip_lines(0) { + filetype= filetype_arg; field_term= &default_field_term; enclosed= line_start= &my_empty_string; - line_term= &default_line_term; + line_term= filetype == FILETYPE_CSV ? + &default_line_term : &default_xml_row_term; escaped= &default_escaped; cs= NULL; } @@ -1804,32 +1864,30 @@ bool sql_exchange::escaped_given(void) } -bool select_send::send_fields(List<Item> &list, uint flags) +bool select_send::send_result_set_metadata(List<Item> &list, uint flags) { bool res; - if (!(res= thd->protocol->send_fields(&list, flags))) + if (!(res= thd->protocol->send_result_set_metadata(&list, flags))) is_result_set_started= 1; return res; } -void select_send::abort() +void select_send::abort_result_set() { - DBUG_ENTER("select_send::abort"); - if (is_result_set_started && thd->spcont && - thd->spcont->find_handler(thd, thd->main_da.sql_errno(), - MYSQL_ERROR::WARN_LEVEL_ERROR)) + DBUG_ENTER("select_send::abort_result_set"); + + if (is_result_set_started && thd->spcont) { /* We're executing a stored procedure, have an open result - set, an SQL exception condition and a handler for it. - In this situation we must abort the current statement, - silence the error and start executing the continue/exit - handler. + set and an SQL exception condition. In this situation we + must abort the current statement, silence the error and + start executing the continue/exit handler if one is found. Before aborting the statement, let's end the open result set, as otherwise the client will hang due to the violation of the client/server protocol. */ - thd->protocol->end_partial_result_set(thd); + thd->spcont->end_partial_result_set= TRUE; } DBUG_VOID_RETURN; } @@ -1850,10 +1908,13 @@ void select_send::cleanup() bool select_send::send_data(List<Item> &items) { + Protocol *protocol= thd->protocol; + DBUG_ENTER("select_send::send_data"); + if (unit->offset_limit_cnt) { // using limit offset,count unit->offset_limit_cnt--; - return 0; + DBUG_RETURN(FALSE); } /* @@ -1863,36 +1924,18 @@ bool select_send::send_data(List<Item> &items) */ ha_release_temporary_latches(thd); - List_iterator_fast<Item> li(items); - Protocol *protocol= thd->protocol; - char buff[MAX_FIELD_WIDTH]; - String buffer(buff, sizeof(buff), &my_charset_bin); - DBUG_ENTER("select_send::send_data"); - protocol->prepare_for_resend(); - Item *item; - while ((item=li++)) - { - if (item->send(protocol, &buffer)) - { - protocol->free(); // Free used buffer - my_message(ER_OUT_OF_RESOURCES, ER(ER_OUT_OF_RESOURCES), MYF(0)); - break; - } - /* - Reset buffer to its original state, as it may have been altered in - Item::send(). - */ - buffer.set(buff, sizeof(buff), &my_charset_bin); - } - thd->sent_row_count++; - if (thd->is_error()) + if (protocol->send_result_set_row(&items)) { protocol->remove_last_row(); - DBUG_RETURN(1); + DBUG_RETURN(TRUE); } + + thd->sent_row_count++; + if (thd->vio_ok()) DBUG_RETURN(protocol->write()); + DBUG_RETURN(0); } @@ -1905,12 +1948,6 @@ bool select_send::send_eof() */ ha_release_temporary_latches(thd); - /* Unlock tables before sending packet to gain some speed */ - if (thd->lock) - { - mysql_unlock_tables(thd, thd->lock); - thd->lock=0; - } /* Don't send EOF if we're in error condition (which implies we've already sent or are sending an error) @@ -1933,8 +1970,9 @@ void select_to_file::send_error(uint errcode,const char *err) if (file > 0) { (void) end_io_cache(&cache); - (void) my_close(file,MYF(0)); - (void) my_delete(path,MYF(0)); // Delete file on error + mysql_file_close(file, MYF(0)); + /* Delete file on error */ + mysql_file_delete(key_select_to_file, path, MYF(0)); file= -1; } } @@ -1943,15 +1981,10 @@ void select_to_file::send_error(uint errcode,const char *err) bool select_to_file::send_eof() { int error= test(end_io_cache(&cache)); - if (my_close(file,MYF(MY_WME))) + if (mysql_file_close(file, MYF(MY_WME))) error= 1; if (!error) { - /* - In order to remember the value of affected rows for ROW_COUNT() - function, SELECT INTO has to have an own SQLCOM. - TODO: split from SQLCOM_SELECT - */ ::my_ok(thd,row_count); } file= -1; @@ -1965,7 +1998,7 @@ void select_to_file::cleanup() if (file >= 0) { (void) end_io_cache(&cache); - (void) my_close(file,MYF(0)); + mysql_file_close(file, MYF(0)); file= -1; } path[0]= '\0'; @@ -1978,7 +2011,7 @@ select_to_file::~select_to_file() if (file >= 0) { // This only happens in case of error (void) end_io_cache(&cache); - (void) my_close(file,MYF(0)); + mysql_file_close(file, MYF(0)); file= -1; } } @@ -2041,7 +2074,8 @@ static File create_file(THD *thd, char *path, sql_exchange *exchange, return -1; } /* Create the file world readable */ - if ((file= my_create(path, 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0) + if ((file= mysql_file_create(key_select_to_file, + path, 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0) return file; #ifdef HAVE_FCHMOD (void) fchmod(file, 0666); // Because of umask() @@ -2050,8 +2084,9 @@ static File create_file(THD *thd, char *path, sql_exchange *exchange, #endif if (init_io_cache(cache, file, 0L, WRITE_CACHE, 0L, 1, MYF(MY_WME))) { - my_close(file, MYF(0)); - my_delete(path, MYF(0)); // Delete file on error, it was just created + mysql_file_close(file, MYF(0)); + /* Delete file on error, it was just created */ + mysql_file_delete(key_select_to_file, path, MYF(0)); return -1; } return file; @@ -2686,7 +2721,6 @@ Statement::Statement(LEX *lex_arg, MEM_ROOT *mem_root_arg, id(id_arg), mark_used_columns(MARK_COLUMNS_READ), lex(lex_arg), - cursor(0), db(NULL), db_length(0) { @@ -2708,7 +2742,6 @@ void Statement::set_statement(Statement *stmt) mark_used_columns= stmt->mark_used_columns; lex= stmt->lex; query_string= stmt->query_string; - cursor= stmt->cursor; } @@ -2818,12 +2851,12 @@ Statement_map::Statement_map() : START_STMT_HASH_SIZE = 16, START_NAME_HASH_SIZE = 16 }; - hash_init(&st_hash, &my_charset_bin, START_STMT_HASH_SIZE, 0, 0, - get_statement_id_as_hash_key, - delete_statement_as_hash_key, MYF(0)); - hash_init(&names_hash, system_charset_info, START_NAME_HASH_SIZE, 0, 0, - (hash_get_key) get_stmt_name_hash_key, - NULL,MYF(0)); + my_hash_init(&st_hash, &my_charset_bin, START_STMT_HASH_SIZE, 0, 0, + get_statement_id_as_hash_key, + delete_statement_as_hash_key, MYF(0)); + my_hash_init(&names_hash, system_charset_info, START_NAME_HASH_SIZE, 0, 0, + (my_hash_get_key) get_stmt_name_hash_key, + NULL,MYF(0)); } @@ -2865,7 +2898,7 @@ int Statement_map::insert(THD *thd, Statement *statement) my_error(ER_OUT_OF_RESOURCES, MYF(0)); goto err_names_hash; } - pthread_mutex_lock(&LOCK_prepared_stmt_count); + mysql_mutex_lock(&LOCK_prepared_stmt_count); /* We don't check that prepared_stmt_count is <= max_prepared_stmt_count because we would like to allow to lower the total limit @@ -2875,22 +2908,22 @@ int Statement_map::insert(THD *thd, Statement *statement) */ if (prepared_stmt_count >= max_prepared_stmt_count) { - pthread_mutex_unlock(&LOCK_prepared_stmt_count); + mysql_mutex_unlock(&LOCK_prepared_stmt_count); my_error(ER_MAX_PREPARED_STMT_COUNT_REACHED, MYF(0), max_prepared_stmt_count); goto err_max; } prepared_stmt_count++; - pthread_mutex_unlock(&LOCK_prepared_stmt_count); + mysql_mutex_unlock(&LOCK_prepared_stmt_count); last_found_statement= statement; return 0; err_max: if (statement->name.str) - hash_delete(&names_hash, (uchar*) statement); + my_hash_delete(&names_hash, (uchar*) statement); err_names_hash: - hash_delete(&st_hash, (uchar*) statement); + my_hash_delete(&st_hash, (uchar*) statement); err_st_hash: return 1; } @@ -2911,23 +2944,23 @@ void Statement_map::erase(Statement *statement) if (statement == last_found_statement) last_found_statement= 0; if (statement->name.str) - hash_delete(&names_hash, (uchar *) statement); + my_hash_delete(&names_hash, (uchar *) statement); - hash_delete(&st_hash, (uchar *) statement); - pthread_mutex_lock(&LOCK_prepared_stmt_count); + my_hash_delete(&st_hash, (uchar *) statement); + mysql_mutex_lock(&LOCK_prepared_stmt_count); DBUG_ASSERT(prepared_stmt_count > 0); prepared_stmt_count--; - pthread_mutex_unlock(&LOCK_prepared_stmt_count); + mysql_mutex_unlock(&LOCK_prepared_stmt_count); } void Statement_map::reset() { /* Must be first, hash_free will reset st_hash.records */ - pthread_mutex_lock(&LOCK_prepared_stmt_count); + mysql_mutex_lock(&LOCK_prepared_stmt_count); DBUG_ASSERT(prepared_stmt_count >= st_hash.records); prepared_stmt_count-= st_hash.records; - pthread_mutex_unlock(&LOCK_prepared_stmt_count); + mysql_mutex_unlock(&LOCK_prepared_stmt_count); my_hash_reset(&names_hash); my_hash_reset(&st_hash); @@ -2938,13 +2971,13 @@ void Statement_map::reset() Statement_map::~Statement_map() { /* Must go first, hash_free will reset st_hash.records */ - pthread_mutex_lock(&LOCK_prepared_stmt_count); + mysql_mutex_lock(&LOCK_prepared_stmt_count); DBUG_ASSERT(prepared_stmt_count >= st_hash.records); prepared_stmt_count-= st_hash.records; - pthread_mutex_unlock(&LOCK_prepared_stmt_count); + mysql_mutex_unlock(&LOCK_prepared_stmt_count); - hash_free(&names_hash); - hash_free(&st_hash); + my_hash_free(&names_hash); + my_hash_free(&st_hash); } bool select_dumpvar::send_data(List<Item> &items) @@ -2990,11 +3023,6 @@ bool select_dumpvar::send_eof() if (! row_count) push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN, ER_SP_FETCH_NO_DATA, ER(ER_SP_FETCH_NO_DATA)); - /* - In order to remember the value of affected rows for ROW_COUNT() - function, SELECT INTO has to have an own SQLCOM. - TODO: split from SQLCOM_SELECT - */ ::my_ok(thd,row_count); return 0; } @@ -3142,10 +3170,18 @@ void Security_context::destroy() { // If not pointer to constant if (host != my_localhost) - safeFree(host); + { + my_free(host); + host= NULL; + } if (user != delayed_user) - safeFree(user); - safeFree(ip); + { + my_free(user); + user= NULL; + } + + my_free(ip); + ip= NULL; } @@ -3160,7 +3196,7 @@ void Security_context::skip_grants() bool Security_context::set_user(char *user_arg) { - safeFree(user); + my_free(user); user= my_strdup(user_arg, MYF(0)); return user == 0; } @@ -3275,28 +3311,31 @@ bool Security_context::user_matches(Security_context *them) access to mysql.proc table to find definitions of stored routines. ****************************************************************************/ -void THD::reset_n_backup_open_tables_state(Open_tables_state *backup) +void THD::reset_n_backup_open_tables_state(Open_tables_backup *backup) { DBUG_ENTER("reset_n_backup_open_tables_state"); backup->set_open_tables_state(this); - reset_open_tables_state(); + backup->mdl_system_tables_svp= mdl_context.mdl_savepoint(); + reset_open_tables_state(this); state_flags|= Open_tables_state::BACKUPS_AVAIL; DBUG_VOID_RETURN; } -void THD::restore_backup_open_tables_state(Open_tables_state *backup) +void THD::restore_backup_open_tables_state(Open_tables_backup *backup) { DBUG_ENTER("restore_backup_open_tables_state"); + mdl_context.rollback_to_savepoint(backup->mdl_system_tables_svp); /* Before we will throw away current open tables state we want to be sure that it was properly cleaned up. */ DBUG_ASSERT(open_tables == 0 && temporary_tables == 0 && - handler_tables == 0 && derived_tables == 0 && - lock == 0 && locked_tables == 0 && - prelocked_mode == NON_PRELOCKED && + derived_tables == 0 && + lock == 0 && + locked_tables_mode == LTM_NONE && m_reprepare_observer == NULL); + set_open_tables_state(backup); DBUG_VOID_RETURN; } @@ -3361,7 +3400,7 @@ extern "C" int thd_non_transactional_update(const MYSQL_THD thd) extern "C" int thd_binlog_format(const MYSQL_THD thd) { - if (mysql_bin_log.is_open() && (thd->options & OPTION_BIN_LOG)) + if (mysql_bin_log.is_open() && (thd->variables.option_bits & OPTION_BIN_LOG)) return (int) thd->variables.binlog_format; else return BINLOG_FORMAT_UNSPEC; @@ -3376,6 +3415,65 @@ extern "C" bool thd_binlog_filter_ok(const MYSQL_THD thd) { return binlog_filter->db_ok(thd->db); } + +extern "C" bool thd_sqlcom_can_generate_row_events(const MYSQL_THD thd) +{ + return sqlcom_can_generate_row_events(thd); +} + +#ifndef EMBEDDED_LIBRARY +extern "C" void thd_pool_wait_begin(MYSQL_THD thd, int wait_type); +extern "C" void thd_pool_wait_end(MYSQL_THD thd); + +/* + Interface for MySQL Server, plugins and storage engines to report + when they are going to sleep/stall. + + SYNOPSIS + thd_wait_begin() + thd Thread object + wait_type Type of wait + 1 -- short wait (e.g. for mutex) + 2 -- medium wait (e.g. for disk io) + 3 -- large wait (e.g. for locked row/table) + NOTES + This is used by the threadpool to have better knowledge of which + threads that currently are actively running on CPUs. When a thread + reports that it's going to sleep/stall, the threadpool scheduler is + free to start another thread in the pool most likely. The expected wait + time is simply an indication of how long the wait is expected to + become, the real wait time could be very different. + + thd_wait_end MUST be called immediately after waking up again. +*/ +extern "C" void thd_wait_begin(MYSQL_THD thd, thd_wait_type wait_type) +{ + MYSQL_CALLBACK(thread_scheduler, thd_wait_begin, (thd, wait_type)); +} + +/** + Interface for MySQL Server, plugins and storage engines to report + when they waking up from a sleep/stall. + + @param thd Thread handle +*/ +extern "C" void thd_wait_end(MYSQL_THD thd) +{ + MYSQL_CALLBACK(thread_scheduler, thd_wait_end, (thd)); +} +#else +extern "C" void thd_wait_begin(MYSQL_THD thd, thd_wait_type wait_type) +{ + /* do NOTHING for the embedded library */ + return; +} + +extern "C" void thd_wait_end(MYSQL_THD thd) +{ + /* do NOTHING for the embedded library */ + return; +} +#endif #endif // INNODB_COMPATIBILITY_HOOKS */ /**************************************************************************** @@ -3422,8 +3520,8 @@ void THD::reset_sub_statement_state(Sub_statement_state *backup, } #endif + backup->option_bits= variables.option_bits; backup->count_cuted_fields= count_cuted_fields; - backup->options= options; backup->in_sub_stmt= in_sub_stmt; backup->enable_slow_log= enable_slow_log; backup->query_plan_flags= query_plan_flags; @@ -3439,13 +3537,14 @@ void THD::reset_sub_statement_state(Sub_statement_state *backup, first_successful_insert_id_in_cur_stmt; if ((!lex->requires_prelocking() || is_update_query(lex->sql_command)) && - !current_stmt_binlog_row_based) + !is_current_stmt_binlog_format_row()) { - options&= ~OPTION_BIN_LOG; + variables.option_bits&= ~OPTION_BIN_LOG; } - if ((backup->options & OPTION_BIN_LOG) && is_update_query(lex->sql_command)&& - !current_stmt_binlog_row_based) + if ((backup->option_bits & OPTION_BIN_LOG) && + is_update_query(lex->sql_command) && + !is_current_stmt_binlog_format_row()) mysql_bin_log.start_union_events(this, this->query_id); /* Disable result sets */ @@ -3490,7 +3589,7 @@ void THD::restore_sub_statement_state(Sub_statement_state *backup) } count_cuted_fields= backup->count_cuted_fields; transaction.savepoints= backup->savepoints; - options= backup->options; + variables.option_bits= backup->option_bits; in_sub_stmt= backup->in_sub_stmt; enable_slow_log= backup->enable_slow_log; query_plan_flags= backup->query_plan_flags; @@ -3509,8 +3608,8 @@ void THD::restore_sub_statement_state(Sub_statement_state *backup) if (!in_sub_stmt) is_fatal_sub_stmt_error= FALSE; - if ((options & OPTION_BIN_LOG) && is_update_query(lex->sql_command) && - !current_stmt_binlog_row_based) + if ((variables.option_bits & OPTION_BIN_LOG) && is_update_query(lex->sql_command) && + !is_current_stmt_binlog_format_row()) mysql_bin_log.stop_union_events(this); /* @@ -3525,9 +3624,9 @@ void THD::restore_sub_statement_state(Sub_statement_state *backup) void THD::set_statement(Statement *stmt) { - pthread_mutex_lock(&LOCK_thd_data); + mysql_mutex_lock(&LOCK_thd_data); Statement::set_statement(stmt); - pthread_mutex_unlock(&LOCK_thd_data); + mysql_mutex_unlock(&LOCK_thd_data); } @@ -3535,9 +3634,52 @@ void THD::set_statement(Statement *stmt) void THD::set_query(char *query_arg, uint32 query_length_arg) { - pthread_mutex_lock(&LOCK_thd_data); + mysql_mutex_lock(&LOCK_thd_data); set_query_inner(query_arg, query_length_arg); - pthread_mutex_unlock(&LOCK_thd_data); + mysql_mutex_unlock(&LOCK_thd_data); +} + +/** Assign a new value to thd->query and thd->query_id. */ + +void THD::set_query_and_id(char *query_arg, uint32 query_length_arg, + query_id_t new_query_id) +{ + mysql_mutex_lock(&LOCK_thd_data); + set_query_inner(query_arg, query_length_arg); + query_id= new_query_id; + mysql_mutex_unlock(&LOCK_thd_data); +} + +/** Assign a new value to thd->query_id. */ + +void THD::set_query_id(query_id_t new_query_id) +{ + mysql_mutex_lock(&LOCK_thd_data); + query_id= new_query_id; + mysql_mutex_unlock(&LOCK_thd_data); +} + +/** Assign a new value to thd->mysys_var. */ +void THD::set_mysys_var(struct st_my_thread_var *new_mysys_var) +{ + mysql_mutex_lock(&LOCK_thd_data); + mysys_var= new_mysys_var; + mysql_mutex_unlock(&LOCK_thd_data); +} + +/** + Leave explicit LOCK TABLES or prelocked mode and restore value of + transaction sentinel in MDL subsystem. +*/ + +void THD::leave_locked_tables_mode() +{ + locked_tables_mode= LTM_NONE; + /* Make sure we don't release the global read lock when leaving LTM. */ + mdl_context.reset_trans_sentinel(global_read_lock.global_shared_lock()); + /* Also ensure that we don't release metadata locks for open HANDLERs. */ + if (handler_tables_hash.records) + mysql_ha_move_tickets_after_trans_sentinel(this); } void THD::get_definer(LEX_USER *definer) @@ -3587,7 +3729,7 @@ void mark_transaction_to_rollback(THD *thd, bool all) Handling of XA id cacheing ***************************************************************************/ -pthread_mutex_t LOCK_xid_cache; +mysql_mutex_t LOCK_xid_cache; HASH xid_cache; extern "C" uchar *xid_get_hash_key(const uchar *, size_t *, my_bool); @@ -3603,30 +3745,56 @@ uchar *xid_get_hash_key(const uchar *ptr, size_t *length, void xid_free_hash(void *ptr) { if (!((XID_STATE*)ptr)->in_thd) - my_free((uchar*)ptr, MYF(0)); + my_free(ptr); +} + +#ifdef HAVE_PSI_INTERFACE +static PSI_mutex_key key_LOCK_xid_cache; + +static PSI_mutex_info all_xid_mutexes[]= +{ + { &key_LOCK_xid_cache, "LOCK_xid_cache", PSI_FLAG_GLOBAL} +}; + +static void init_xid_psi_keys(void) +{ + const char* category= "sql"; + int count; + + if (PSI_server == NULL) + return; + + count= array_elements(all_xid_mutexes); + PSI_server->register_mutex(category, all_xid_mutexes, count); } +#endif /* HAVE_PSI_INTERFACE */ bool xid_cache_init() { - pthread_mutex_init(&LOCK_xid_cache, MY_MUTEX_INIT_FAST); - return hash_init(&xid_cache, &my_charset_bin, 100, 0, 0, - xid_get_hash_key, xid_free_hash, 0) != 0; +#ifdef HAVE_PSI_INTERFACE + init_xid_psi_keys(); +#endif + + mysql_mutex_init(key_LOCK_xid_cache, &LOCK_xid_cache, MY_MUTEX_INIT_FAST); + return my_hash_init(&xid_cache, &my_charset_bin, 100, 0, 0, + xid_get_hash_key, xid_free_hash, 0) != 0; } void xid_cache_free() { - if (hash_inited(&xid_cache)) + if (my_hash_inited(&xid_cache)) { - hash_free(&xid_cache); - pthread_mutex_destroy(&LOCK_xid_cache); + my_hash_free(&xid_cache); + mysql_mutex_destroy(&LOCK_xid_cache); } } XID_STATE *xid_cache_search(XID *xid) { - pthread_mutex_lock(&LOCK_xid_cache); - XID_STATE *res=(XID_STATE *)hash_search(&xid_cache, xid->key(), xid->key_length()); - pthread_mutex_unlock(&LOCK_xid_cache); + mysql_mutex_lock(&LOCK_xid_cache); + XID_STATE *res=(XID_STATE *)my_hash_search(&xid_cache, xid->key(), + xid->key_length()); + mysql_mutex_unlock(&LOCK_xid_cache); return res; } @@ -3635,8 +3803,8 @@ bool xid_cache_insert(XID *xid, enum xa_states xa_state) { XID_STATE *xs; my_bool res; - pthread_mutex_lock(&LOCK_xid_cache); - if (hash_search(&xid_cache, xid->key(), xid->key_length())) + mysql_mutex_lock(&LOCK_xid_cache); + if (my_hash_search(&xid_cache, xid->key(), xid->key_length())) res=0; else if (!(xs=(XID_STATE *)my_malloc(sizeof(*xs), MYF(MY_WME)))) res=1; @@ -3647,33 +3815,409 @@ bool xid_cache_insert(XID *xid, enum xa_states xa_state) xs->in_thd=0; res=my_hash_insert(&xid_cache, (uchar*)xs); } - pthread_mutex_unlock(&LOCK_xid_cache); + mysql_mutex_unlock(&LOCK_xid_cache); return res; } bool xid_cache_insert(XID_STATE *xid_state) { - pthread_mutex_lock(&LOCK_xid_cache); - if (hash_search(&xid_cache, xid_state->xid.key(), xid_state->xid.key_length())) - { - pthread_mutex_unlock(&LOCK_xid_cache); - my_error(ER_XAER_DUPID, MYF(0)); - return TRUE; - } - my_bool res= my_hash_insert(&xid_cache, (uchar*)xid_state); - pthread_mutex_unlock(&LOCK_xid_cache); + mysql_mutex_lock(&LOCK_xid_cache); + DBUG_ASSERT(my_hash_search(&xid_cache, xid_state->xid.key(), + xid_state->xid.key_length())==0); + my_bool res=my_hash_insert(&xid_cache, (uchar*)xid_state); + mysql_mutex_unlock(&LOCK_xid_cache); return res; } void xid_cache_delete(XID_STATE *xid_state) { - pthread_mutex_lock(&LOCK_xid_cache); - hash_delete(&xid_cache, (uchar *)xid_state); - pthread_mutex_unlock(&LOCK_xid_cache); + mysql_mutex_lock(&LOCK_xid_cache); + my_hash_delete(&xid_cache, (uchar *)xid_state); + mysql_mutex_unlock(&LOCK_xid_cache); } + +/** + Decide on logging format to use for the statement and issue errors + or warnings as needed. The decision depends on the following + parameters: + + - The logging mode, i.e., the value of binlog_format. Can be + statement, mixed, or row. + + - The type of statement. There are three types of statements: + "normal" safe statements; unsafe statements; and row injections. + An unsafe statement is one that, if logged in statement format, + might produce different results when replayed on the slave (e.g., + INSERT DELAYED). A row injection is either a BINLOG statement, or + a row event executed by the slave's SQL thread. + + - The capabilities of tables modified by the statement. The + *capabilities vector* for a table is a set of flags associated + with the table. Currently, it only includes two flags: *row + capability flag* and *statement capability flag*. + + The row capability flag is set if and only if the engine can + handle row-based logging. The statement capability flag is set if + and only if the table can handle statement-based logging. + + Decision table for logging format + --------------------------------- + + The following table summarizes how the format and generated + warning/error depends on the tables' capabilities, the statement + type, and the current binlog_format. + + Row capable N NNNNNNNNN YYYYYYYYY YYYYYYYYY + Statement capable N YYYYYYYYY NNNNNNNNN YYYYYYYYY + + Statement type * SSSUUUIII SSSUUUIII SSSUUUIII + + binlog_format * SMRSMRSMR SMRSMRSMR SMRSMRSMR + + Logged format - SS-S----- -RR-RR-RR SRRSRR-RR + Warning/Error 1 --2732444 5--5--6-- ---7--6-- + + Legend + ------ + + Row capable: N - Some table not row-capable, Y - All tables row-capable + Stmt capable: N - Some table not stmt-capable, Y - All tables stmt-capable + Statement type: (S)afe, (U)nsafe, or Row (I)njection + binlog_format: (S)TATEMENT, (M)IXED, or (R)OW + Logged format: (S)tatement or (R)ow + Warning/Error: Warnings and error messages are as follows: + + 1. Error: Cannot execute statement: binlogging impossible since both + row-incapable engines and statement-incapable engines are + involved. + + 2. Error: Cannot execute statement: binlogging impossible since + BINLOG_FORMAT = ROW and at least one table uses a storage engine + limited to statement-logging. + + 3. Error: Cannot execute statement: binlogging of unsafe statement + is impossible when storage engine is limited to statement-logging + and BINLOG_FORMAT = MIXED. + + 4. Error: Cannot execute row injection: binlogging impossible since + at least one table uses a storage engine limited to + statement-logging. + + 5. Error: Cannot execute statement: binlogging impossible since + BINLOG_FORMAT = STATEMENT and at least one table uses a storage + engine limited to row-logging. + + 6. Error: Cannot execute row injection: binlogging impossible since + BINLOG_FORMAT = STATEMENT. + + 7. Warning: Unsafe statement binlogged in statement format since + BINLOG_FORMAT = STATEMENT. + + In addition, we can produce the following error (not depending on + the variables of the decision diagram): + + 8. Error: Cannot execute statement: binlogging impossible since more + than one engine is involved and at least one engine is + self-logging. + + For each error case above, the statement is prevented from being + logged, we report an error, and roll back the statement. For + warnings, we set the thd->binlog_flags variable: the warning will be + printed only if the statement is successfully logged. + + @see THD::binlog_query + + @param[in] thd Client thread + @param[in] tables Tables involved in the query + + @retval 0 No error; statement can be logged. + @retval -1 One of the error conditions above applies (1, 2, 4, 5, or 6). +*/ + +int THD::decide_logging_format(TABLE_LIST *tables) +{ + DBUG_ENTER("THD::decide_logging_format"); + DBUG_PRINT("info", ("query: %s", query())); + DBUG_PRINT("info", ("variables.binlog_format: %lu", + variables.binlog_format)); + DBUG_PRINT("info", ("lex->get_stmt_unsafe_flags(): 0x%x", + lex->get_stmt_unsafe_flags())); + + /* + We should not decide logging format if the binlog is closed or + binlogging is off, or if the statement is filtered out from the + binlog by filtering rules. + */ + if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) && + !(variables.binlog_format == BINLOG_FORMAT_STMT && + !binlog_filter->db_ok(db))) + { + /* + Compute one bit field with the union of all the engine + capabilities, and one with the intersection of all the engine + capabilities. + */ + handler::Table_flags flags_write_some_set= 0; + handler::Table_flags flags_access_some_set= 0; + handler::Table_flags flags_write_all_set= + HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE; + + /* + If different types of engines are about to be updated. + For example: Innodb and Falcon; Innodb and MyIsam. + */ + my_bool multi_write_engine= FALSE; + /* + If different types of engines are about to be accessed + and any of them is about to be updated. For example: + Innodb and Falcon; Innodb and MyIsam. + */ + my_bool multi_access_engine= FALSE; + /* + Identifies if a table is changed. + */ + my_bool is_write= FALSE; + /* + A pointer to a previous table that was changed. + */ + TABLE* prev_write_table= NULL; + /* + A pointer to a previous table that was accessed. + */ + TABLE* prev_access_table= NULL; + +#ifndef DBUG_OFF + { + static const char *prelocked_mode_name[] = { + "NON_PRELOCKED", + "PRELOCKED", + "PRELOCKED_UNDER_LOCK_TABLES", + }; + DBUG_PRINT("debug", ("prelocked_mode: %s", + prelocked_mode_name[locked_tables_mode])); + } +#endif + + /* + Get the capabilities vector for all involved storage engines and + mask out the flags for the binary log. + */ + for (TABLE_LIST *table= tables; table; table= table->next_global) + { + if (table->placeholder()) + continue; + + if (table->table->s->table_category == TABLE_CATEGORY_PERFORMANCE || + table->table->s->table_category == TABLE_CATEGORY_LOG) + lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_TABLE); + + handler::Table_flags const flags= table->table->file->ha_table_flags(); + + DBUG_PRINT("info", ("table: %s; ha_table_flags: 0x%llx", + table->table_name, flags)); + if (table->lock_type >= TL_WRITE_ALLOW_WRITE) + { + if (prev_write_table && prev_write_table->file->ht != + table->table->file->ht) + multi_write_engine= TRUE; + + my_bool trans= table->table->file->has_transactions(); + + if (table->table->s->tmp_table) + lex->set_stmt_accessed_table(trans ? LEX::STMT_WRITES_TEMP_TRANS_TABLE : + LEX::STMT_WRITES_TEMP_NON_TRANS_TABLE); + else + lex->set_stmt_accessed_table(trans ? LEX::STMT_WRITES_TRANS_TABLE : + LEX::STMT_WRITES_NON_TRANS_TABLE); + + flags_write_all_set &= flags; + flags_write_some_set |= flags; + is_write= TRUE; + + prev_write_table= table->table; + + } + flags_access_some_set |= flags; + + if (lex->sql_command != SQLCOM_CREATE_TABLE || + (lex->sql_command == SQLCOM_CREATE_TABLE && + (lex->create_info.options & HA_LEX_CREATE_TMP_TABLE))) + { + my_bool trans= table->table->file->has_transactions(); + + if (table->table->s->tmp_table) + lex->set_stmt_accessed_table(trans ? LEX::STMT_READS_TEMP_TRANS_TABLE : + LEX::STMT_READS_TEMP_NON_TRANS_TABLE); + else + lex->set_stmt_accessed_table(trans ? LEX::STMT_READS_TRANS_TABLE : + LEX::STMT_READS_NON_TRANS_TABLE); + } + + if (prev_access_table && prev_access_table->file->ht != + table->table->file->ht) + multi_access_engine= TRUE; + + prev_access_table= table->table; + } + + DBUG_PRINT("info", ("flags_write_all_set: 0x%llx", flags_write_all_set)); + DBUG_PRINT("info", ("flags_write_some_set: 0x%llx", flags_write_some_set)); + DBUG_PRINT("info", ("flags_access_some_set: 0x%llx", flags_access_some_set)); + DBUG_PRINT("info", ("multi_write_engine: %d", multi_write_engine)); + DBUG_PRINT("info", ("multi_access_engine: %d", multi_access_engine)); + + int error= 0; + int unsafe_flags; + + bool multi_stmt_trans= in_multi_stmt_transaction_mode(); + bool trans_table= trans_has_updated_trans_table(this); + bool binlog_direct= variables.binlog_direct_non_trans_update; + + if (lex->is_mixed_stmt_unsafe(multi_stmt_trans, binlog_direct, + trans_table, tx_isolation)) + lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_MIXED_STATEMENT); + else if (multi_stmt_trans && trans_table && !binlog_direct && + lex->stmt_accessed_table(LEX::STMT_WRITES_NON_TRANS_TABLE)) + lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_NONTRANS_AFTER_TRANS); + + /* + If more than one engine is involved in the statement and at + least one is doing it's own logging (is *self-logging*), the + statement cannot be logged atomically, so we generate an error + rather than allowing the binlog to become corrupt. + */ + if (multi_write_engine && + (flags_write_some_set & HA_HAS_OWN_BINLOGGING)) + my_error((error= ER_BINLOG_MULTIPLE_ENGINES_AND_SELF_LOGGING_ENGINE), + MYF(0)); + else if (multi_access_engine && flags_access_some_set & HA_HAS_OWN_BINLOGGING) + lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_MULTIPLE_ENGINES_AND_SELF_LOGGING_ENGINE); + + /* both statement-only and row-only engines involved */ + if ((flags_write_all_set & (HA_BINLOG_STMT_CAPABLE | HA_BINLOG_ROW_CAPABLE)) == 0) + { + /* + 1. Error: Binary logging impossible since both row-incapable + engines and statement-incapable engines are involved + */ + my_error((error= ER_BINLOG_ROW_ENGINE_AND_STMT_ENGINE), MYF(0)); + } + /* statement-only engines involved */ + else if ((flags_write_all_set & HA_BINLOG_ROW_CAPABLE) == 0) + { + if (lex->is_stmt_row_injection()) + { + /* + 4. Error: Cannot execute row injection since table uses + storage engine limited to statement-logging + */ + my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE), MYF(0)); + } + else if (variables.binlog_format == BINLOG_FORMAT_ROW && + sqlcom_can_generate_row_events(this)) + { + /* + 2. Error: Cannot modify table that uses a storage engine + limited to statement-logging when BINLOG_FORMAT = ROW + */ + my_error((error= ER_BINLOG_ROW_MODE_AND_STMT_ENGINE), MYF(0)); + } + else if ((unsafe_flags= lex->get_stmt_unsafe_flags()) != 0) + { + /* + 3. Error: Cannot execute statement: binlogging of unsafe + statement is impossible when storage engine is limited to + statement-logging and BINLOG_FORMAT = MIXED. + */ + for (int unsafe_type= 0; + unsafe_type < LEX::BINLOG_STMT_UNSAFE_COUNT; + unsafe_type++) + if (unsafe_flags & (1 << unsafe_type)) + my_error((error= ER_BINLOG_UNSAFE_AND_STMT_ENGINE), MYF(0), + ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type])); + } + /* log in statement format! */ + } + /* no statement-only engines */ + else + { + /* binlog_format = STATEMENT */ + if (variables.binlog_format == BINLOG_FORMAT_STMT) + { + if (lex->is_stmt_row_injection()) + { + /* + 6. Error: Cannot execute row injection since + BINLOG_FORMAT = STATEMENT + */ + my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_MODE), MYF(0)); + } + else if ((flags_write_all_set & HA_BINLOG_STMT_CAPABLE) == 0 && + sqlcom_can_generate_row_events(this)) + { + /* + 5. Error: Cannot modify table that uses a storage engine + limited to row-logging when binlog_format = STATEMENT + */ + my_error((error= ER_BINLOG_STMT_MODE_AND_ROW_ENGINE), MYF(0), ""); + } + else if (is_write && (unsafe_flags= lex->get_stmt_unsafe_flags()) != 0) + { + /* + 7. Warning: Unsafe statement logged as statement due to + binlog_format = STATEMENT + */ + binlog_unsafe_warning_flags|= unsafe_flags; + + DBUG_PRINT("info", ("Scheduling warning to be issued by " + "binlog_query: '%s'", + ER(ER_BINLOG_UNSAFE_STATEMENT))); + DBUG_PRINT("info", ("binlog_unsafe_warning_flags: 0x%x", + binlog_unsafe_warning_flags)); + } + /* log in statement format! */ + } + /* No statement-only engines and binlog_format != STATEMENT. + I.e., nothing prevents us from row logging if needed. */ + else + { + if (lex->is_stmt_unsafe() || lex->is_stmt_row_injection() + || (flags_write_all_set & HA_BINLOG_STMT_CAPABLE) == 0) + { + /* log in row format! */ + set_current_stmt_binlog_format_row_if_mixed(); + } + } + } + + if (error) { + DBUG_PRINT("info", ("decision: no logging since an error was generated")); + DBUG_RETURN(-1); + } + DBUG_PRINT("info", ("decision: logging in %s format", + is_current_stmt_binlog_format_row() ? + "ROW" : "STATEMENT")); + } +#ifndef DBUG_OFF + else + DBUG_PRINT("info", ("decision: no logging since " + "mysql_bin_log.is_open() = %d " + "and (options & OPTION_BIN_LOG) = 0x%llx " + "and binlog_format = %lu " + "and binlog_filter->db_ok(db) = %d", + mysql_bin_log.is_open(), + (variables.option_bits & OPTION_BIN_LOG), + variables.binlog_format, + binlog_filter->db_ok(db))); +#endif + + DBUG_RETURN(0); +} + + /* Implementation of interface to write rows to the binary log through the thread. The thread is responsible for writing the rows it has @@ -3725,7 +4269,7 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id, if (binlog_setup_trx_data()) DBUG_RETURN(NULL); - Rows_log_event* pending= binlog_get_pending_rows_event(); + Rows_log_event* pending= binlog_get_pending_rows_event(is_transactional); if (unlikely(pending && !pending->is_valid())) DBUG_RETURN(NULL); @@ -3759,7 +4303,9 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id, flush the pending event and replace it with the newly created event... */ - if (unlikely(mysql_bin_log.flush_and_set_pending_rows_event(this, ev))) + if (unlikely( + mysql_bin_log.flush_and_set_pending_rows_event(this, ev, + is_transactional))) { delete ev; DBUG_RETURN(NULL); @@ -3791,8 +4337,8 @@ THD::binlog_prepare_pending_rows_event(TABLE*, uint32, MY_BITMAP const*, Update_rows_log_event *); #endif - -namespace { +/* Declare in unnamed namespace. */ +CPP_UNNAMED_NS_START /** Class to handle temporary allocation of memory for row data. @@ -3845,7 +4391,7 @@ namespace { ~Row_data_memory() { if (m_memory != 0 && m_release_memory_on_destruction) - my_free((uchar*) m_memory, MYF(MY_WME)); + my_free(m_memory); } /** @@ -3911,14 +4457,14 @@ namespace { uchar *m_memory; uchar *m_ptr[2]; }; -} +CPP_UNNAMED_NS_END int THD::binlog_write_row(TABLE* table, bool is_trans, MY_BITMAP const* cols, size_t colcnt, uchar const *record) { - DBUG_ASSERT(current_stmt_binlog_row_based && mysql_bin_log.is_open()); + DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); /* Pack records into format for transfer. We are allocating more @@ -3948,7 +4494,7 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, const uchar *before_record, const uchar *after_record) { - DBUG_ASSERT(current_stmt_binlog_row_based && mysql_bin_log.is_open()); + DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); size_t const before_maxlen = max_row_length(table, before_record); size_t const after_maxlen = max_row_length(table, after_record); @@ -3993,7 +4539,7 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans, MY_BITMAP const* cols, size_t colcnt, uchar const *record) { - DBUG_ASSERT(current_stmt_binlog_row_based && mysql_bin_log.is_open()); + DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); /* Pack records into format for transfer. We are allocating more @@ -4019,14 +4565,15 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans, } -int THD::binlog_remove_pending_rows_event(bool clear_maps) +int THD::binlog_remove_pending_rows_event(bool clear_maps, + bool is_transactional) { DBUG_ENTER("THD::binlog_remove_pending_rows_event"); if (!mysql_bin_log.is_open()) DBUG_RETURN(0); - mysql_bin_log.remove_pending_rows_event(this); + mysql_bin_log.remove_pending_rows_event(this, is_transactional); if (clear_maps) binlog_table_maps= 0; @@ -4034,7 +4581,7 @@ int THD::binlog_remove_pending_rows_event(bool clear_maps) DBUG_RETURN(0); } -int THD::binlog_flush_pending_rows_event(bool stmt_end) +int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional) { DBUG_ENTER("THD::binlog_flush_pending_rows_event"); /* @@ -4050,7 +4597,7 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end) flag is set. */ int error= 0; - if (Rows_log_event *pending= binlog_get_pending_rows_event()) + if (Rows_log_event *pending= binlog_get_pending_rows_event(is_transactional)) { if (stmt_end) { @@ -4058,7 +4605,8 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end) binlog_table_maps= 0; } - error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0); + error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0, + is_transactional); } DBUG_RETURN(error); @@ -4074,8 +4622,6 @@ show_query_type(THD::enum_binlog_query_type qtype) return "ROW"; case THD::STMT_QUERY_TYPE: return "STMT"; - case THD::MYSQL_QUERY_TYPE: - return "MYSQL"; case THD::QUERY_TYPE_COUNT: default: DBUG_ASSERT(0 <= qtype && qtype < THD::QUERY_TYPE_COUNT); @@ -4087,32 +4633,78 @@ show_query_type(THD::enum_binlog_query_type qtype) #endif -/* - Member function that will log query, either row-based or - statement-based depending on the value of the 'current_stmt_binlog_row_based' - the value of the 'qtype' flag. +/** + Auxiliary method used by @c binlog_query() to raise warnings. + + The type of warning and the type of unsafeness is stored in + THD::binlog_unsafe_warning_flags. +*/ +void THD::issue_unsafe_warnings() +{ + DBUG_ENTER("issue_unsafe_warnings"); + /* + Ensure that binlog_unsafe_warning_flags is big enough to hold all + bits. This is actually a constant expression. + */ + DBUG_ASSERT(LEX::BINLOG_STMT_UNSAFE_COUNT <= + sizeof(binlog_unsafe_warning_flags) * CHAR_BIT); + + uint32 unsafe_type_flags= binlog_unsafe_warning_flags; + /* + For each unsafe_type, check if the statement is unsafe in this way + and issue a warning. + */ + for (int unsafe_type=0; + unsafe_type < LEX::BINLOG_STMT_UNSAFE_COUNT; + unsafe_type++) + { + if ((unsafe_type_flags & (1 << unsafe_type)) != 0) + { + push_warning_printf(this, MYSQL_ERROR::WARN_LEVEL_NOTE, + ER_BINLOG_UNSAFE_STATEMENT, + ER(ER_BINLOG_UNSAFE_STATEMENT), + ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type])); + if (global_system_variables.log_warnings) + { + char buf[MYSQL_ERRMSG_SIZE * 2]; + sprintf(buf, ER(ER_BINLOG_UNSAFE_STATEMENT), + ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type])); + sql_print_warning(ER(ER_MESSAGE_AND_STATEMENT), buf, query()); + } + } + } + DBUG_VOID_RETURN; +} - This function should be called after the all calls to ha_*_row() - functions have been issued, but before tables are unlocked and - closed. - OBSERVE - There shall be no writes to any system table after calling - binlog_query(), so these writes has to be moved to before the call - of binlog_query() for correct functioning. +/** + Log the current query. - This is necessesary not only for RBR, but the master might crash - after binlogging the query but before changing the system tables. - This means that the slave and the master are not in the same state - (after the master has restarted), so therefore we have to - eliminate this problem. + The query will be logged in either row format or statement format + depending on the value of @c current_stmt_binlog_format_row field and + the value of the @c qtype parameter. - RETURN VALUE - Error code, or 0 if no error. + This function must be called: + + - After the all calls to ha_*_row() functions have been issued. + + - After any writes to system tables. Rationale: if system tables + were written after a call to this function, and the master crashes + after the call to this function and before writing the system + tables, then the master and slave get out of sync. + + - Before tables are unlocked and closed. + + @see decide_logging_format + + @retval 0 Success + + @retval nonzero If there is a failure when writing the query (e.g., + write failure), then the error code is returned. */ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, - ulong query_len, bool is_trans, bool suppress_use, - int errcode) + ulong query_len, bool is_trans, bool direct, + bool suppress_use, int errcode) { DBUG_ENTER("THD::binlog_query"); DBUG_PRINT("enter", ("qtype: %s query: '%s'", @@ -4128,60 +4720,72 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, If we are in prelocked mode, the flushing will be done inside the top-most close_thread_tables(). */ - if (this->prelocked_mode == NON_PRELOCKED) - if (int error= binlog_flush_pending_rows_event(TRUE)) + if (this->locked_tables_mode <= LTM_LOCK_TABLES) + if (int error= binlog_flush_pending_rows_event(TRUE, is_trans)) DBUG_RETURN(error); /* - If we are in statement mode and trying to log an unsafe statement, - we should print a warning. + Warnings for unsafe statements logged in statement format are + printed in three places instead of in decide_logging_format(). + This is because the warnings should be printed only if the statement + is actually logged. When executing decide_logging_format(), we cannot + know for sure if the statement will be logged: + + 1 - sp_head::execute_procedure which prints out warnings for calls to + stored procedures. + + 2 - sp_head::execute_function which prints out warnings for calls + involving functions. + + 3 - THD::binlog_query (here) which prints warning for top level + statements not covered by the two cases above: i.e., if not insided a + procedure and a function. + + Besides, we should not try to print these warnings if it is not + possible to write statements to the binary log as it happens when + the execution is inside a function, or generaly speaking, when + the variables.option_bits & OPTION_BIN_LOG is false. + */ - if (sql_log_bin_toplevel && lex->is_stmt_unsafe() && - variables.binlog_format == BINLOG_FORMAT_STMT && - binlog_filter->db_ok(this->db)) - { - /* - A warning can be elevated a error when STRICT sql mode. - But we don't want to elevate binlog warning to error here. - */ - push_warning(this, MYSQL_ERROR::WARN_LEVEL_NOTE, - ER_BINLOG_UNSAFE_STATEMENT, - ER(ER_BINLOG_UNSAFE_STATEMENT)); - if (global_system_variables.log_warnings && - !(binlog_flags & BINLOG_FLAG_UNSAFE_STMT_PRINTED)) - { - sql_print_warning("%s Statement: %.*s", - ER(ER_BINLOG_UNSAFE_STATEMENT), - MYSQL_ERRMSG_SIZE, query_arg); - binlog_flags|= BINLOG_FLAG_UNSAFE_STMT_PRINTED; - } - } + if ((variables.option_bits & OPTION_BIN_LOG) && + spcont == NULL && !binlog_evt_union.do_union) + issue_unsafe_warnings(); + switch (qtype) { + /* + ROW_QUERY_TYPE means that the statement may be logged either in + row format or in statement format. If + current_stmt_binlog_format is row, it means that the + statement has already been logged in row format and hence shall + not be logged again. + */ case THD::ROW_QUERY_TYPE: DBUG_PRINT("debug", - ("current_stmt_binlog_row_based: %d", - current_stmt_binlog_row_based)); - if (current_stmt_binlog_row_based) + ("is_current_stmt_binlog_format_row: %d", + is_current_stmt_binlog_format_row())); + if (is_current_stmt_binlog_format_row()) DBUG_RETURN(0); - /* Otherwise, we fall through */ - case THD::MYSQL_QUERY_TYPE: - /* - Using this query type is a conveniece hack, since we have been - moving back and forth between using RBR for replication of - system tables and not using it. + /* Fall through */ - Make sure to change in check_table_binlog_row_based() according - to how you treat this. + /* + STMT_QUERY_TYPE means that the query must be logged in statement + format; it cannot be logged in row format. This is typically + used by DDL statements. It is an error to use this query type + if current_stmt_binlog_format_row is row. + + @todo Currently there are places that call this method with + STMT_QUERY_TYPE and current_stmt_binlog_format is row. Fix those + places and add assert to ensure correct behavior. /Sven */ case THD::STMT_QUERY_TYPE: /* The MYSQL_LOG::write() function will set the STMT_END_F flag and flush the pending rows event if necessary. - */ + */ { - Query_log_event qinfo(this, query_arg, query_len, is_trans, suppress_use, - errcode); + Query_log_event qinfo(this, query_arg, query_len, is_trans, direct, + suppress_use, errcode); /* Binlog table maps will be irrelevant after a Query_log_event (they are just removed on the slave side) so after the query |