diff options
Diffstat (limited to 'sql/sql_class.cc')
-rw-r--r-- | sql/sql_class.cc | 1087 |
1 files changed, 544 insertions, 543 deletions
diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 6897a26bda1..38770a24dec 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -1,6 +1,6 @@ /* Copyright (c) 2000, 2015, Oracle and/or its affiliates. - Copyright (c) 2008, 2017, MariaDB + Copyright (c) 2008, 2020, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -37,7 +37,7 @@ #include "tztime.h" // MYSQL_TIME <-> my_time_t #include "sql_acl.h" // NO_ACCESS, // acl_getroot_no_password -#include "sql_base.h" // close_temporary_tables +#include "sql_base.h" #include "sql_handler.h" // mysql_ha_cleanup #include "rpl_rli.h" #include "rpl_filter.h" @@ -268,73 +268,15 @@ bool Foreign_key::validate(List<Create_field> &table_fields) /**************************************************************************** ** Thread specific functions ****************************************************************************/ -#ifdef ONLY_FOR_MYSQL_CLOSED_SOURCE_SCHEDULED -/** - Get reference to scheduler data object - - @param thd THD object - - @retval Scheduler data object on THD -*/ -void *thd_get_scheduler_data(THD *thd) -{ - return thd->scheduler.data; -} - -/** - Set reference to Scheduler data object for THD object - - @param thd THD object - @param psi Scheduler data object to set on THD -*/ -void thd_set_scheduler_data(THD *thd, void *data) -{ - thd->scheduler.data= data; -} - -/** - Get reference to Performance Schema object for THD object - - @param thd THD object - - @retval Performance schema object for thread on THD -*/ -PSI_thread *thd_get_psi(THD *thd) -{ - return thd->scheduler.m_psi; -} - -/** - Get net_wait_timeout for THD object - - @param thd THD object - - @retval net_wait_timeout value for thread on THD -*/ -ulong thd_get_net_wait_timeout(THD* thd) -{ - return thd->variables.net_wait_timeout; -} - -/** - Set reference to Performance Schema object for THD object - - @param thd THD object - @param psi Performance schema object for thread -*/ -void thd_set_psi(THD *thd, PSI_thread *psi) -{ - thd->scheduler.m_psi= psi; -} /** - Set the state on connection to killed + Get current THD object from thread local data - @param thd THD object + @retval The THD object for the thread, NULL if not connection thread */ -void thd_set_killed(THD *thd) +THD *thd_get_current_thd() { - thd->set_killed(KILL_CONNECTION); + return current_thd; } /** @@ -348,116 +290,6 @@ void thd_clear_errors(THD *thd) thd->mysys_var->abort= 0; } -/** - Set thread stack in THD object - - @param thd Thread object - @param stack_start Start of stack to set in THD object -*/ -void thd_set_thread_stack(THD *thd, char *stack_start) -{ - thd->thread_stack= stack_start; -} - -/** - Close the socket used by this connection - - @param thd THD object -*/ -void thd_close_connection(THD *thd) -{ - if (thd->net.vio) - vio_close(thd->net.vio); -} - -/** - Lock data that needs protection in THD object - - @param thd THD object -*/ -void thd_lock_data(THD *thd) -{ - mysql_mutex_lock(&thd->LOCK_thd_data); -} - -/** - Unlock data that needs protection in THD object - - @param thd THD object -*/ -void thd_unlock_data(THD *thd) -{ - mysql_mutex_unlock(&thd->LOCK_thd_data); -} - -/** - Support method to check if connection has already started transcaction - - @param client_cntx Low level client context - - @retval TRUE if connection already started transaction -*/ -bool thd_is_transaction_active(THD *thd) -{ - return thd->transaction.is_active(); -} - -/** - Check if there is buffered data on the socket representing the connection - - @param thd THD object -*/ -int thd_connection_has_data(THD *thd) -{ - Vio *vio= thd->net.vio; - return vio->has_data(vio); -} - -/** - Set reading/writing on socket, used by SHOW PROCESSLIST - - @param thd THD object - @param val Value to set it to (0 or 1) -*/ -void thd_set_net_read_write(THD *thd, uint val) -{ - thd->net.reading_or_writing= val; -} - -/** - Get reading/writing on socket from THD object - @param thd THD object - - @retval net.reading_or_writing value for thread on THD. -*/ -uint thd_get_net_read_write(THD *thd) -{ - return thd->net.reading_or_writing; -} - -/** - Set reference to mysys variable in THD object - - @param thd THD object - @param mysys_var Reference to set -*/ -void thd_set_mysys_var(THD *thd, st_my_thread_var *mysys_var) -{ - thd->set_mysys_var(mysys_var); -} - -/** - Get socket file descriptor for this connection - - @param thd THD object - - @retval Socket of the connection -*/ -my_socket thd_get_fd(THD *thd) -{ - return mysql_socket_getfd(thd->net.vio->mysql_socket); -} -#endif /** Get thread attributes for connection threads @@ -530,13 +362,13 @@ const char *set_thd_proc_info(THD *thd_arg, const char *info, PSI_stage_info old_stage; PSI_stage_info new_stage; - old_stage.m_key= 0; - old_stage.m_name= info; + new_stage.m_key= 0; + new_stage.m_name= info; - set_thd_stage_info(thd_arg, & old_stage, & new_stage, + set_thd_stage_info(thd_arg, & new_stage, & old_stage, calling_function, calling_file, calling_line); - return new_stage.m_name; + return old_stage.m_name; } extern "C" @@ -716,7 +548,7 @@ char *thd_security_context(THD *thd, bool Drop_table_error_handler::handle_condition(THD *thd, uint sql_errno, const char* sqlstate, - Sql_condition::enum_warning_level level, + Sql_condition::enum_warning_level *level, const char* msg, Sql_condition ** cond_hdl) { @@ -727,6 +559,28 @@ bool Drop_table_error_handler::handle_condition(THD *thd, /** + Handle an error from MDL_context::upgrade_lock() and mysql_lock_tables(). + Ignore ER_LOCK_ABORTED and ER_LOCK_DEADLOCK errors. +*/ + +bool +MDL_deadlock_and_lock_abort_error_handler:: +handle_condition(THD *thd, + uint sql_errno, + const char *sqlstate, + Sql_condition::enum_warning_level *level, + const char* msg, + Sql_condition **cond_hdl) +{ + *cond_hdl= NULL; + if (sql_errno == ER_LOCK_ABORTED || sql_errno == ER_LOCK_DEADLOCK) + m_need_reopen= true; + + return m_need_reopen; +} + + +/** Send timeout to thread. Note that this is always safe as the thread will always remove it's @@ -743,26 +597,25 @@ extern "C" void thd_kill_timeout(THD* thd) } -THD::THD(bool is_wsrep_applier) +THD::THD(my_thread_id id, bool is_wsrep_applier) :Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION, /* statement id */ 0), rli_fake(0), rgi_fake(0), rgi_slave(NULL), protocol_text(this), protocol_binary(this), + m_current_stage_key(0), in_sub_stmt(0), log_all_errors(0), binlog_unsafe_warning_flags(0), + current_stmt_binlog_format(BINLOG_FORMAT_MIXED), binlog_table_maps(0), + bulk_param(0), table_map_for_update(0), - arg_of_last_insert_id_function(FALSE), - first_successful_insert_id_in_prev_stmt(0), - first_successful_insert_id_in_prev_stmt_for_binlog(0), - first_successful_insert_id_in_cur_stmt(0), - stmt_depends_on_first_successful_insert_id_in_prev_stmt(FALSE), m_examined_row_count(0), accessed_rows_and_keys(0), m_digest(NULL), m_statement_psi(NULL), m_idle_psi(NULL), - thread_id(0), + thread_id(id), + thread_dbug_id(id), os_thread_id(0), global_disable_checkpoint(0), failed_com_change_user(0), @@ -788,7 +641,8 @@ THD::THD(bool is_wsrep_applier) main_da(0, false, false), m_stmt_da(&main_da), tdc_hash_pins(0), - xid_hash_pins(0) + xid_hash_pins(0), + m_tmp_tables_locked(false) #ifdef WITH_WSREP , wsrep_applier(is_wsrep_applier), @@ -812,6 +666,7 @@ THD::THD(bool is_wsrep_applier) set_current_thd(this); status_var.local_memory_used= sizeof(THD); status_var.global_memory_used= 0; + variables.pseudo_thread_id= thread_id; variables.max_mem_used= global_system_variables.max_mem_used; main_da.init(); @@ -825,6 +680,12 @@ THD::THD(bool is_wsrep_applier) init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0, MYF(MY_THREAD_SPECIFIC)); + /* + Allocation of user variables for binary logging is always done with main + mem root + */ + user_var_events_alloc= mem_root; + stmt_arena= this; thread_stack= 0; scheduler= thread_scheduler; // Will be fixed later @@ -853,8 +714,7 @@ THD::THD(bool is_wsrep_applier) statement_id_counter= 0UL; // Must be reset to handle error with THD's created for init of mysqld lex->current_select= 0; - user_time.val= start_time= start_time_sec_part= 0; - start_utime= utime_after_query= prior_thr_create_utime= 0L; + start_utime= utime_after_query= 0; utime_after_lock= 0L; progress.arena= 0; progress.report_to_client= 0; @@ -869,7 +729,7 @@ THD::THD(bool is_wsrep_applier) query_name_consts= 0; semisync_info= 0; db_charset= global_system_variables.collation_database; - bzero(ha_data, sizeof(ha_data)); + bzero((void*) ha_data, sizeof(ha_data)); mysys_var=0; binlog_evt_union.do_union= FALSE; enable_slow_log= 0; @@ -878,14 +738,13 @@ THD::THD(bool is_wsrep_applier) #ifndef DBUG_OFF dbug_sentry=THD_SENTRY_MAGIC; #endif -#ifndef EMBEDDED_LIBRARY mysql_audit_init_thd(this); -#endif net.vio=0; net.buff= 0; + net.reading_or_writing= 0; client_capabilities= 0; // minimalistic client system_thread= NON_SYSTEM_THREAD; - cleanup_done= abort_on_warning= 0; + cleanup_done= free_connection_done= abort_on_warning= 0; peer_port= 0; // For SHOW PROCESSLIST transaction.m_pending_rows_event= 0; transaction.on= 1; @@ -909,7 +768,6 @@ THD::THD(bool is_wsrep_applier) /* Variables with default values */ proc_info="login"; where= THD::DEFAULT_WHERE; - variables.server_id = global_system_variables.server_id; slave_net = 0; m_command=COM_CONNECT; *scramble= '\0'; @@ -931,6 +789,7 @@ THD::THD(bool is_wsrep_applier) wsrep_affected_rows = 0; wsrep_replicate_GTID = false; wsrep_skip_wsrep_GTID = false; + wsrep_split_flag = false; #endif /* Call to init() below requires fully initialized Open_tables_state. */ reset_open_tables_state(this); @@ -970,9 +829,8 @@ THD::THD(bool is_wsrep_applier) by adding the address of the stack. */ tmp= (ulong) (my_rnd(&sql_rand) * 0xffffffff); - my_rnd_init(&rand, tmp + (ulong) &rand, tmp + (ulong) ::global_query_id); + my_rnd_init(&rand, tmp + (ulong)((size_t) &rand), tmp + (ulong) ::global_query_id); substitute_null_with_insert_id = FALSE; - thr_lock_info_init(&lock_info); /* safety: will be reset after start */ lock_info.mysql_thd= (void *)this; m_token_array= NULL; @@ -988,8 +846,10 @@ THD::THD(bool is_wsrep_applier) prepare_derived_at_open= FALSE; create_tmp_table_for_derived= FALSE; save_prep_leaf_list= FALSE; + org_charset= 0; /* Restore THR_THD */ set_current_thd(old_THR_THD); + inc_thread_count(); } @@ -1010,7 +870,7 @@ void THD::push_internal_handler(Internal_error_handler *handler) bool THD::handle_condition(uint sql_errno, const char* sqlstate, - Sql_condition::enum_warning_level level, + Sql_condition::enum_warning_level *level, const char* msg, Sql_condition ** cond_hdl) { @@ -1137,6 +997,7 @@ Sql_condition* THD::raise_condition(uint sql_errno, Diagnostics_area *da= get_stmt_da(); Sql_condition *cond= NULL; DBUG_ENTER("THD::raise_condition"); + DBUG_ASSERT(level < Sql_condition::WARN_LEVEL_END); if (!(variables.option_bits & OPTION_SQL_NOTES) && (level == Sql_condition::WARN_LEVEL_NOTE)) @@ -1164,24 +1025,23 @@ Sql_condition* THD::raise_condition(uint sql_errno, push_warning and strict SQL_MODE case. */ level= Sql_condition::WARN_LEVEL_ERROR; - set_killed(KILL_BAD_DATA); } - switch (level) - { + if (handle_condition(sql_errno, sqlstate, &level, msg, &cond)) + DBUG_RETURN(cond); + + switch (level) { case Sql_condition::WARN_LEVEL_NOTE: case Sql_condition::WARN_LEVEL_WARN: got_warning= 1; break; case Sql_condition::WARN_LEVEL_ERROR: break; - default: - DBUG_ASSERT(FALSE); + case Sql_condition::WARN_LEVEL_END: + /* Impossible */ + break; } - if (handle_condition(sql_errno, sqlstate, level, msg, &cond)) - DBUG_RETURN(cond); - if (level == Sql_condition::WARN_LEVEL_ERROR) { mysql_audit_general(this, MYSQL_AUDIT_GENERAL_ERROR, sql_errno, msg); @@ -1211,13 +1071,13 @@ Sql_condition* THD::raise_condition(uint sql_errno, } extern "C" -void *thd_alloc(MYSQL_THD thd, unsigned int size) +void *thd_alloc(MYSQL_THD thd, size_t size) { return thd->alloc(size); } extern "C" -void *thd_calloc(MYSQL_THD thd, unsigned int size) +void *thd_calloc(MYSQL_THD thd, size_t size) { return thd->calloc(size); } @@ -1229,14 +1089,14 @@ char *thd_strdup(MYSQL_THD thd, const char *str) } extern "C" -char *thd_strmake(MYSQL_THD thd, const char *str, unsigned int size) +char *thd_strmake(MYSQL_THD thd, const char *str, size_t size) { return thd->strmake(str, size); } extern "C" LEX_STRING *thd_make_lex_string(THD *thd, LEX_STRING *lex_str, - const char *str, unsigned int size, + const char *str, size_t size, int allocate_lex_string) { return allocate_lex_string ? thd->make_lex_string(str, size) @@ -1244,7 +1104,7 @@ LEX_STRING *thd_make_lex_string(THD *thd, LEX_STRING *lex_str, } extern "C" -void *thd_memdup(MYSQL_THD thd, const void* str, unsigned int size) +void *thd_memdup(MYSQL_THD thd, const void* str, size_t size) { return thd->memdup(str, size); } @@ -1280,6 +1140,12 @@ extern "C" THD *_current_thd_noinline(void) { return my_pthread_getspecific_ptr(THD*,THR_THD); } + +extern "C" my_thread_id next_thread_id_noinline() +{ +#undef next_thread_id + return next_thread_id(); +} #endif /* @@ -1305,6 +1171,8 @@ void THD::init(void) mysql_mutex_unlock(&LOCK_global_system_variables); + user_time.val= start_time= start_time_sec_part= 0; + server_status= SERVER_STATUS_AUTOCOMMIT; if (variables.sql_mode & MODE_NO_BACKSLASH_ESCAPES) server_status|= SERVER_STATUS_NO_BACKSLASH_ESCAPES; @@ -1322,18 +1190,25 @@ void THD::init(void) TL_WRITE); tx_isolation= (enum_tx_isolation) variables.tx_isolation; tx_read_only= variables.tx_read_only; - update_charset(); + update_charset(); // plugin_thd_var() changed character sets reset_current_stmt_binlog_format_row(); reset_binlog_local_stmt_filter(); set_status_var_init(); bzero((char *) &org_status_var, sizeof(org_status_var)); - start_bytes_received= 0; - last_commit_gtid.seq_no= 0; status_in_global= 0; + start_bytes_received= 0; + m_last_commit_gtid.seq_no= 0; + last_stmt= NULL; + /* Reset status of last insert id */ + arg_of_last_insert_id_function= FALSE; + stmt_depends_on_first_successful_insert_id_in_prev_stmt= FALSE; + first_successful_insert_id_in_prev_stmt= 0; + first_successful_insert_id_in_prev_stmt_for_binlog= 0; + first_successful_insert_id_in_cur_stmt= 0; #ifdef WITH_WSREP wsrep_exec_mode= wsrep_applier ? REPL_RECV : LOCAL_STATE; wsrep_conflict_state= NO_CONFLICT; - wsrep_query_state= QUERY_IDLE; + wsrep_thd_set_query_state(this, QUERY_IDLE); wsrep_last_query_id= 0; wsrep_trx_meta.gtid= WSREP_GTID_UNDEFINED; wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED; @@ -1349,6 +1224,7 @@ void THD::init(void) wsrep_affected_rows = 0; wsrep_replicate_GTID = false; wsrep_skip_wsrep_GTID = false; + wsrep_split_flag = false; #endif /* WITH_WSREP */ if (variables.sql_log_bin) @@ -1366,6 +1242,11 @@ void THD::init(void) /* Initialize the Debug Sync Facility. See debug_sync.cc. */ debug_sync_init_thread(this); #endif /* defined(ENABLED_DEBUG_SYNC) */ + +#ifndef EMBEDDED_LIBRARY + session_tracker.enable(this); +#endif //EMBEDDED_LIBRARY + apc_target.init(&LOCK_thd_data); DBUG_VOID_RETURN; } @@ -1451,12 +1332,19 @@ void THD::init_for_queries() void THD::change_user(void) { - add_status_to_global(); + if (!status_in_global) // Reset in init() + add_status_to_global(); - cleanup(); - reset_killed(); + if (!cleanup_done) + cleanup(); cleanup_done= 0; - status_in_global= 0; + reset_killed(); + thd_clear_errors(this); + + /* Clear warnings. */ + if (!get_stmt_da()->is_warning_info_empty()) + get_stmt_da()->clear_warning_info(0); + init(); stmt_map.reset(); my_hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0, @@ -1486,9 +1374,10 @@ void THD::cleanup(void) locked_tables_list.unlock_locked_tables(this); delete_dynamic(&user_var_events); - close_temporary_tables(this); + close_temporary_tables(); transaction.xid_state.xa_state= XA_NOTR; + transaction.xid_state.rm_error= 0; trans_rollback(this); xid_cache_delete(this, &transaction.xid_state); @@ -1520,6 +1409,8 @@ void THD::cleanup(void) my_hash_free(&user_vars); sp_cache_clear(&sp_proc_cache); sp_cache_clear(&sp_func_cache); + auto_inc_intervals_forced.empty(); + auto_inc_intervals_in_cur_stmt_for_binlog.empty(); mysql_ull_cleanup(this); stmt_map.reset(); @@ -1527,16 +1418,87 @@ void THD::cleanup(void) DBUG_ASSERT(!mdl_context.has_locks()); apc_target.destroy(); +#ifdef HAVE_REPLICATION + unregister_slave(this, true, true); +#endif + cleanup_done=1; DBUG_VOID_RETURN; } +/* + Free all connection related resources associated with a THD. + This is used when we put a thread into the thread cache. + After this call should either call ~THD or reset_for_reuse() depending on + circumstances. +*/ + +void THD::free_connection() +{ + DBUG_ASSERT(free_connection_done == 0); + my_free(db); + db= NULL; +#ifndef EMBEDDED_LIBRARY + if (net.vio) + vio_delete(net.vio); + net.vio= 0; + net_end(&net); +#endif + if (!cleanup_done) + cleanup(); + ha_close_connection(this); + plugin_thdvar_cleanup(this); + mysql_audit_free_thd(this); + main_security_ctx.destroy(); + /* close all prepared statements, to save memory */ + stmt_map.reset(); + free_connection_done= 1; +#if defined(ENABLED_PROFILING) + profiling.restart(); // Reset profiling +#endif +} + +/* + Reset thd for reuse by another connection + This is only used for user connections, so the following variables doesn't + have to be reset: + - Replication (slave) variables. + - Variables not reset between each statements. See reset_for_next_command. +*/ + +void THD::reset_for_reuse() +{ + mysql_audit_init_thd(this); + change_user(); // Calls cleanup() & init() + get_stmt_da()->reset_diagnostics_area(); + main_security_ctx.init(); + failed_com_change_user= 0; + is_fatal_error= 0; + client_capabilities= 0; + peer_port= 0; + query_name_consts= 0; // Safety + abort_on_warning= 0; + free_connection_done= 0; + m_command= COM_CONNECT; +#if defined(ENABLED_PROFILING) + profiling.reset(); +#endif +#ifdef SIGNAL_WITH_VIO_CLOSE + active_vio = 0; +#endif +} + + THD::~THD() { THD *orig_thd= current_thd; THD_CHECK_SENTRY(this); DBUG_ENTER("~THD()"); + /* Check that we have already called thd->unlink() */ + DBUG_ASSERT(prev == 0 && next == 0); + /* This takes a long time so we should not do this under LOCK_thread_count */ + mysql_mutex_assert_not_owner(&LOCK_thread_count); /* In error cases, thd may not be current thd. We have to fix this so @@ -1551,25 +1513,13 @@ THD::~THD() mysql_mutex_unlock(&LOCK_thd_data); #ifdef WITH_WSREP - if (wsrep_rgi) delete wsrep_rgi; + delete wsrep_rgi; #endif - /* Close connection */ -#ifndef EMBEDDED_LIBRARY - if (net.vio) - vio_delete(net.vio); - net_end(&net); -#endif - stmt_map.reset(); /* close all prepared statements */ - if (!cleanup_done) - cleanup(); + if (!free_connection_done) + free_connection(); mdl_context.destroy(); - ha_close_connection(this); - plugin_thdvar_cleanup(this); - main_security_ctx.destroy(); - my_free(db); - db= NULL; free_root(&transaction.mem_root,MYF(0)); mysql_cond_destroy(&COND_wakeup_ready); mysql_mutex_destroy(&LOCK_wakeup_ready); @@ -1590,11 +1540,9 @@ THD::~THD() rli_fake= NULL; } - mysql_audit_free_thd(this); if (rgi_slave) rgi_slave->cleanup_after_session(); my_free(semisync_info); - unregister_slave(this, true, true); #endif main_lex.free_set_stmt_mem_root(); free_root(&main_mem_root, MYF(0)); @@ -1606,14 +1554,22 @@ THD::~THD() lf_hash_put_pins(xid_hash_pins); /* Ensure everything is freed */ status_var.local_memory_used-= sizeof(THD); + + /* trick to make happy memory accounting system */ +#ifndef EMBEDDED_LIBRARY + session_tracker.deinit(); +#endif //EMBEDDED_LIBRARY + if (status_var.local_memory_used != 0) { DBUG_PRINT("error", ("memory_used: %lld", status_var.local_memory_used)); - SAFEMALLOC_REPORT_MEMORY(my_thread_dbug_id()); - DBUG_ASSERT(status_var.local_memory_used == 0); + SAFEMALLOC_REPORT_MEMORY(thread_id); + DBUG_ASSERT(status_var.local_memory_used == 0 || + !debug_assert_on_not_freed_memory); } update_global_memory_status(status_var.global_memory_used); set_current_thd(orig_thd == this ? 0 : orig_thd); + dec_thread_count(); DBUG_VOID_RETURN; } @@ -1628,10 +1584,9 @@ THD::~THD() NOTES This function assumes that all variables at start are long/ulong and - other types are handled explicitely + other types are handled explicitly */ - void add_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var) { ulong *end= (ulong*) ((uchar*) to_var + @@ -1661,11 +1616,10 @@ void add_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var) DBUG_PRINT("info", ("global memory_used: %lld size: %lld", (longlong) global_status_var.global_memory_used, (longlong) from_var->global_memory_used)); + update_global_memory_status(from_var->global_memory_used); } - // workaround for gcc 4.2.4-1ubuntu4 -fPIE (from DEB_BUILD_HARDENING=1) - int64 volatile * volatile ptr= &to_var->global_memory_used; - my_atomic_add64_explicit(ptr, from_var->global_memory_used, - MY_MEMORY_ORDER_RELAXED); + else + to_var->global_memory_used+= from_var->global_memory_used; } /* @@ -1679,7 +1633,7 @@ void add_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var) NOTE This function assumes that all variables at start are long/ulong and - other types are handled explicitely + other types are handled explicitly */ void add_diff_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var, @@ -1981,6 +1935,11 @@ int THD::killed_errno() case KILL_SERVER: case KILL_SERVER_HARD: DBUG_RETURN(ER_SERVER_SHUTDOWN); + case KILL_SLAVE_SAME_ID: + DBUG_RETURN(ER_SLAVE_SAME_ID); + case KILL_WAIT_TIMEOUT: + case KILL_WAIT_TIMEOUT_HARD: + DBUG_RETURN(ER_NET_READ_INTERRUPTED); } DBUG_RETURN(0); // Keep compiler happy } @@ -1988,7 +1947,7 @@ int THD::killed_errno() /* Remember the location of thread info, the structure needed for - sql_alloc() and the structure for the net buffer + the structure for the net buffer */ bool THD::store_globals() @@ -1999,8 +1958,7 @@ bool THD::store_globals() */ DBUG_ASSERT(thread_stack); - if (set_current_thd(this) || - my_pthread_setspecific_ptr(THR_MALLOC, &mem_root)) + if (set_current_thd(this)) return 1; /* mysys_var is concurrently readable by a killer thread. @@ -2016,7 +1974,16 @@ bool THD::store_globals() Let mysqld define the thread id (not mysys) This allows us to move THD to different threads if needed. */ - mysys_var->id= thread_id; + mysys_var->id= thread_id; + + /* thread_dbug_id should not change for a THD */ + if (!thread_dbug_id) + thread_dbug_id= mysys_var->dbug_id; + else + { + /* This only changes if we are using pool-of-threads */ + mysys_var->dbug_id= thread_dbug_id; + } #ifdef __NR_gettid os_thread_id= (uint32)syscall(__NR_gettid); #else @@ -2027,14 +1994,13 @@ bool THD::store_globals() STACK_DIRECTION * (long)my_thread_stack_size; if (net.vio) { - vio_set_thread_id(net.vio, real_id); net.thd= this; } /* We have to call thr_lock_info_init() again here as THD may have been created in another thread */ - thr_lock_info_init(&lock_info); + thr_lock_info_init(&lock_info, mysys_var); return 0; } @@ -2053,7 +2019,6 @@ void THD::reset_globals() /* Undocking the thread specific data. */ set_current_thd(0); - my_pthread_setspecific_ptr(THR_MALLOC, NULL); net.thd= 0; } @@ -2181,12 +2146,19 @@ bool THD::convert_string(LEX_STRING *to, CHARSET_INFO *to_cs, { DBUG_ENTER("THD::convert_string"); size_t new_length= to_cs->mbmaxlen * from_length; - uint dummy_errors; + uint errors; if (alloc_lex_string(to, new_length + 1)) DBUG_RETURN(true); // EOM to->length= copy_and_convert((char*) to->str, new_length, to_cs, - from, from_length, from_cs, &dummy_errors); + from, from_length, from_cs, &errors); to->str[to->length]= 0; // Safety + if (errors && lex->parse_vcol_expr) + { + my_error(ER_BAD_DATA, MYF(0), + ErrConvString(from, from_length, from_cs).ptr(), + to_cs->csname); + DBUG_RETURN(true); + } DBUG_RETURN(false); } @@ -2567,8 +2539,10 @@ struct Item_change_record: public ilink thd->mem_root (due to possible set_n_backup_active_arena called for thd). */ -void THD::nocheck_register_item_tree_change(Item **place, Item *old_value, - MEM_ROOT *runtime_memroot) +void +Item_change_list::nocheck_register_item_tree_change(Item **place, + Item *old_value, + MEM_ROOT *runtime_memroot) { Item_change_record *change; DBUG_ENTER("THD::nocheck_register_item_tree_change"); @@ -2609,8 +2583,10 @@ void THD::nocheck_register_item_tree_change(Item **place, Item *old_value, changes to substitute the same reference at both locations L1 and L2. */ -void THD::check_and_register_item_tree_change(Item **place, Item **new_value, - MEM_ROOT *runtime_memroot) +void +Item_change_list::check_and_register_item_tree_change(Item **place, + Item **new_value, + MEM_ROOT *runtime_memroot) { Item_change_record *change; DBUG_ENTER("THD::check_and_register_item_tree_change"); @@ -2629,7 +2605,7 @@ void THD::check_and_register_item_tree_change(Item **place, Item **new_value, } -void THD::rollback_item_tree_changes() +void Item_change_list::rollback_item_tree_changes() { DBUG_ENTER("THD::rollback_item_tree_changes"); I_List_iterator<Item_change_record> it(change_list); @@ -2751,13 +2727,6 @@ int select_send::send_data(List<Item> &items) if (thd->killed == ABORT_QUERY) DBUG_RETURN(FALSE); - /* - We may be passing the control from mysqld to the client: release the - InnoDB adaptive hash S-latch to avoid thread deadlocks if it was reserved - by thd - */ - ha_release_temporary_latches(thd); - protocol->prepare_for_resend(); if (protocol->send_result_set_row(&items)) { @@ -2777,13 +2746,6 @@ int select_send::send_data(List<Item> &items) bool select_send::send_eof() { /* - We may be passing the control from mysqld to the client: release the - InnoDB adaptive hash S-latch to avoid thread deadlocks if it was reserved - by thd - */ - ha_release_temporary_latches(thd); - - /* Don't send EOF if we're in error condition (which implies we've already sent or are sending an error) */ @@ -3070,6 +3032,10 @@ int select_export::send_data(List<Item> &items) error_pos= copier.most_important_error_pos(); if (error_pos) { + /* + TODO: + add new error message that will show user this printable_buff + char printable_buff[32]; convert_to_printable(printable_buff, sizeof(printable_buff), error_pos, res->ptr() + res->length() - error_pos, @@ -3079,6 +3045,11 @@ int select_export::send_data(List<Item> &items) ER_THD(thd, ER_TRUNCATED_WRONG_VALUE_FOR_FIELD), "string", printable_buff, item->name, static_cast<long>(row_count)); + */ + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, + ER_TRUNCATED_WRONG_VALUE_FOR_FIELD, + ER_THD(thd, WARN_DATA_TRUNCATED), + item->name, static_cast<long>(row_count)); } else if (copier.source_end_pos() < res->ptr() + res->length()) { @@ -3186,7 +3157,7 @@ int select_export::send_data(List<Item> &items) if ((NEED_ESCAPING(*pos) || (check_second_byte && - my_mbcharlen(character_set_client, (uchar) *pos) == 2 && + ((uchar) *pos) > 0x7F /* a potential MB2HEAD */ && pos + 1 < end && NEED_ESCAPING(pos[1]))) && /* @@ -3519,12 +3490,12 @@ void Query_arena::free_items() { Item *next; DBUG_ENTER("Query_arena::free_items"); - /* This works because items are allocated with sql_alloc() */ + /* This works because items are allocated on THD::mem_root */ for (; free_list; free_list= next) { next= free_list->next; DBUG_ASSERT(free_list != next); - DBUG_PRINT("info", ("free item: 0x%lx", (ulong) free_list)); + DBUG_PRINT("info", ("free item: %p", free_list)); free_list->delete_self(); } /* Postcondition: free_list is 0 */ @@ -3972,7 +3943,7 @@ int select_materialize_with_stats::send_data(List<Item> &items) void TMP_TABLE_PARAM::init() { DBUG_ENTER("TMP_TABLE_PARAM::init"); - DBUG_PRINT("enter", ("this: 0x%lx", (ulong)this)); + DBUG_PRINT("enter", ("this: %p", this)); field_count= sum_func_count= func_count= hidden_field_count= 0; group_parts= group_length= group_null_parts= 0; quick_group= 1; @@ -4039,7 +4010,7 @@ void Security_context::destroy() // If not pointer to constant if (host != my_localhost) { - my_free(host); + my_free((char*) host); host= NULL; } if (user != delayed_user) @@ -4204,7 +4175,8 @@ void THD::restore_backup_open_tables_state(Open_tables_backup *backup) Before we will throw away current open tables state we want to be sure that it was properly cleaned up. */ - DBUG_ASSERT(open_tables == 0 && temporary_tables == 0 && + DBUG_ASSERT(open_tables == 0 && + temporary_tables == 0 && derived_tables == 0 && lock == 0 && locked_tables_mode == LTM_NONE && @@ -4310,7 +4282,7 @@ extern "C" void thd_progress_init(MYSQL_THD thd, uint max_stage) is a high level command (like ALTER TABLE) and we are not in a stored procedure */ - thd->progress.report= ((thd->client_capabilities & CLIENT_PROGRESS) && + thd->progress.report= ((thd->client_capabilities & MARIADB_CLIENT_PROGRESS) && thd->progress.report_to_client && !thd->in_sub_stmt); thd->progress.next_report_time= 0; @@ -4432,31 +4404,149 @@ extern "C" void thd_create_random_password(MYSQL_THD thd, #ifdef INNODB_COMPATIBILITY_HOOKS -extern "C" const struct charset_info_st *thd_charset(MYSQL_THD thd) + +/** open a table and add it to thd->open_tables + + @note At the moment this is used in innodb background purge threads + *only*.There should be no table locks, because the background purge does not + change the table as far as LOCK TABLES is concerned. MDL locks are + still needed, though. + + To make sure no table stays open for long, this helper allows the thread to + have only one table open at any given time. +*/ +TABLE *open_purge_table(THD *thd, const char *db, size_t dblen, + const char *tb, size_t tblen) { - return(thd->charset()); + DBUG_ENTER("open_purge_table"); + DBUG_ASSERT(thd->open_tables == NULL); + DBUG_ASSERT(thd->locked_tables_mode < LTM_PRELOCKED); + + Open_table_context ot_ctx(thd, 0); + TABLE_LIST *tl= (TABLE_LIST*)thd->alloc(sizeof(TABLE_LIST)); + + tl->init_one_table(db, dblen, tb, tblen, tb, TL_READ); + tl->i_s_requested_object= OPEN_TABLE_ONLY; + + bool error= open_table(thd, tl, &ot_ctx); + + /* we don't recover here */ + DBUG_ASSERT(!error || !ot_ctx.can_recover_from_failed_open()); + + if (error) + close_thread_tables(thd); + + DBUG_RETURN(error ? NULL : tl->table); } -/** - OBSOLETE : there's no way to ensure the string is null terminated. - Use thd_query_string instead() + +/** Find an open table in the list of prelocked tabled + + Used for foreign key actions, for example, in UPDATE t1 SET a=1; + where a child table t2 has a KB on t1.a. + + But only when virtual columns are involved, otherwise InnoDB + does not need an open TABLE. */ -extern "C" char **thd_query(MYSQL_THD thd) +TABLE *find_fk_open_table(THD *thd, const char *db, size_t db_len, + const char *table, size_t table_len) +{ + for (TABLE *t= thd->open_tables; t; t= t->next) + { + if (t->s->db.length == db_len && t->s->table_name.length == table_len && + !strcmp(t->s->db.str, db) && !strcmp(t->s->table_name.str, table) && + t->pos_in_table_list->prelocking_placeholder == TABLE_LIST::FK) + return t; + } + return NULL; +} + +/* the following three functions are used in background purge threads */ + +MYSQL_THD create_thd() +{ + THD *thd= new THD(next_thread_id()); + thd->thread_stack= (char*) &thd; + thd->store_globals(); + thd->set_command(COM_DAEMON); + thd->system_thread= SYSTEM_THREAD_GENERIC; + thd->security_ctx->host_or_ip=""; + add_to_active_threads(thd); + return thd; +} + +void destroy_thd(MYSQL_THD thd) +{ + thd->add_status_to_global(); + unlink_not_visible_thd(thd); + delete thd; + dec_thread_running(); +} + +void reset_thd(MYSQL_THD thd) +{ + close_thread_tables(thd); + thd->mdl_context.release_transactional_locks(); + thd->free_items(); + free_root(thd->mem_root, MYF(MY_KEEP_PREALLOC)); +} + +unsigned long long thd_get_query_id(const MYSQL_THD thd) +{ + return((unsigned long long)thd->query_id); +} + +void thd_clear_error(MYSQL_THD thd) +{ + thd->clear_error(); +} + +extern "C" const struct charset_info_st *thd_charset(MYSQL_THD thd) { - return (&thd->query_string.string.str); + return(thd->charset()); } + /** Get the current query string for the thread. + This function is not thread safe and can be used only by thd owner thread. + @param The MySQL internal thread pointer @return query string and length. May be non-null-terminated. */ extern "C" LEX_STRING * thd_query_string (MYSQL_THD thd) { + DBUG_ASSERT(thd == current_thd); return(&thd->query_string.string); } + +/** + Get the current query string for the thread. + + @param thd The MySQL internal thread pointer + @param buf Buffer where the query string will be copied + @param buflen Length of the buffer + + @return Length of the query + + @note This function is thread safe as the query string is + accessed under mutex protection and the string is copied + into the provided buffer. @see thd_query_string(). +*/ + +extern "C" size_t thd_query_safe(MYSQL_THD thd, char *buf, size_t buflen) +{ + mysql_mutex_lock(&thd->LOCK_thd_data); + size_t len= MY_MIN(buflen - 1, thd->query_length()); + memcpy(buf, thd->query(), len); + mysql_mutex_unlock(&thd->LOCK_thd_data); + buf[len]= '\0'; + return len; +} + + extern "C" int thd_slave_thread(const MYSQL_THD thd) { return(thd->slave_thread); @@ -4478,19 +4568,20 @@ extern "C" unsigned long long thd_start_utime(const MYSQL_THD thd) /* - This function can optionally be called to check if thd_report_wait_for() + This function can optionally be called to check if thd_rpl_deadlock_check() needs to be called for waits done by a given transaction. - If this function returns false for a given thd, there is no need to do any - calls to thd_report_wait_for() on that thd. + If this function returns false for a given thd, there is no need to do + any calls to thd_rpl_deadlock_check() on that thd. - This call is optional; it is safe to call thd_report_wait_for() in any case. - This call can be used to save some redundant calls to thd_report_wait_for() - if desired. (This is unlikely to matter much unless there are _lots_ of - waits to report, as the overhead of thd_report_wait_for() is small). + This call is optional; it is safe to call thd_rpl_deadlock_check() in + any case. This call can be used to save some redundant calls to + thd_rpl_deadlock_check() if desired. (This is unlikely to matter much + unless there are _lots_ of waits to report, as the overhead of + thd_rpl_deadlock_check() is small). */ extern "C" int -thd_need_wait_for(const MYSQL_THD thd) +thd_need_wait_reports(const MYSQL_THD thd) { rpl_group_info *rgi; @@ -4505,75 +4596,9 @@ thd_need_wait_for(const MYSQL_THD thd) } /* - Used by InnoDB/XtraDB to report that one transaction THD is about to go to - wait for a transactional lock held by another transactions OTHER_THD. - - This is used for parallel replication, where transactions are required to - commit in the same order on the slave as they did on the master. If the - transactions on the slave encounters lock conflicts on the slave that did - not exist on the master, this can cause deadlocks. - - Normally, such conflicts will not occur, because the same conflict would - have prevented the two transactions from committing in parallel on the - master, thus preventing them from running in parallel on the slave in the - first place. However, it is possible in case when the optimizer chooses a - different plan on the slave than on the master (eg. table scan instead of - index scan). - - InnoDB/XtraDB reports lock waits using this call. If a lock wait causes a - deadlock with the pre-determined commit order, we kill the later transaction, - and later re-try it, to resolve the deadlock. - - This call need only receive reports about waits for locks that will remain - until the holding transaction commits. InnoDB/XtraDB auto-increment locks - are released earlier, and so need not be reported. (Such false positives are - not harmful, but could lead to unnecessary kill and retry, so best avoided). -*/ -extern "C" void -thd_report_wait_for(MYSQL_THD thd, MYSQL_THD other_thd) -{ - rpl_group_info *rgi; - rpl_group_info *other_rgi; - - if (!thd) - return; - DEBUG_SYNC(thd, "thd_report_wait_for"); - thd->transaction.stmt.mark_trans_did_wait(); - if (!other_thd) - return; - binlog_report_wait_for(thd, other_thd); - rgi= thd->rgi_slave; - other_rgi= other_thd->rgi_slave; - if (!rgi || !other_rgi) - return; - if (!rgi->is_parallel_exec) - return; - if (rgi->rli != other_rgi->rli) - return; - if (!rgi->gtid_sub_id || !other_rgi->gtid_sub_id) - return; - if (rgi->current_gtid.domain_id != other_rgi->current_gtid.domain_id) - return; - if (rgi->gtid_sub_id > other_rgi->gtid_sub_id) - return; - /* - This transaction is about to wait for another transaction that is required - by replication binlog order to commit after. This would cause a deadlock. - - So send a kill to the other transaction, with a temporary error; this will - cause replication to rollback (and later re-try) the other transaction, - releasing the lock for this transaction so replication can proceed. - */ - other_rgi->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED; - mysql_mutex_lock(&other_thd->LOCK_thd_data); - other_thd->awake(KILL_CONNECTION); - mysql_mutex_unlock(&other_thd->LOCK_thd_data); -} - -/* - Used by storage engines (currently TokuDB) to report that one transaction - THD is about to go to wait for a transactional lock held by another - transactions OTHER_THD. + Used by storage engines (currently TokuDB and InnoDB/XtraDB) to report that + one transaction THD is about to go to wait for a transactional lock held by + another transactions OTHER_THD. This is used for parallel replication, where transactions are required to commit in the same order on the slave as they did on the master. If the @@ -4588,9 +4613,9 @@ thd_report_wait_for(MYSQL_THD thd, MYSQL_THD other_thd) chooses a different plan on the slave than on the master (eg. table scan instead of index scan). - InnoDB/XtraDB reports lock waits using this call. If a lock wait causes a - deadlock with the pre-determined commit order, we kill the later transaction, - and later re-try it, to resolve the deadlock. + Storage engines report lock waits using this call. If a lock wait causes a + deadlock with the pre-determined commit order, we kill the later + transaction, and later re-try it, to resolve the deadlock. This call need only receive reports about waits for locks that will remain until the holding transaction commits. InnoDB/XtraDB auto-increment locks, @@ -4681,8 +4706,8 @@ thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd) Calling this function is just an optimisation to avoid unnecessary deadlocks. If it was not used, a gap lock would be set that could eventually - cause a deadlock; the deadlock would be caught by thd_report_wait_for() and - the transaction T2 killed and rolled back (and later re-tried). + cause a deadlock; the deadlock would be caught by thd_rpl_deadlock_check() + and the transaction T2 killed and rolled back (and later re-tried). */ extern "C" int thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd) @@ -5161,9 +5186,9 @@ void THD::inc_status_sort_range() void THD::inc_status_sort_rows(ha_rows count) { - statistic_add(status_var.filesort_rows_, count, &LOCK_status); + statistic_add(status_var.filesort_rows_, (ulong)count, &LOCK_status); #ifdef HAVE_PSI_STATEMENT_INTERFACE - PSI_STATEMENT_CALL(inc_statement_sort_rows)(m_statement_psi, count); + PSI_STATEMENT_CALL(inc_statement_sort_rows)(m_statement_psi, (ulong)count); #endif } @@ -5534,99 +5559,6 @@ int xid_cache_iterate(THD *thd, my_hash_walk_action action, void *arg) &argument); } -/* - Tells if two (or more) tables have auto_increment columns and we want to - lock those tables with a write lock. - - SYNOPSIS - has_two_write_locked_tables_with_auto_increment - tables Table list - - NOTES: - Call this function only when you have established the list of all tables - which you'll want to update (including stored functions, triggers, views - inside your statement). - - Ignore tables prelocked for foreign key cascading actions, as these - actions cannot generate new auto_increment values. -*/ - -static bool -has_write_table_with_auto_increment(TABLE_LIST *tables) -{ - for (TABLE_LIST *table= tables; table; table= table->next_global) - { - /* we must do preliminary checks as table->table may be NULL */ - if (!table->placeholder() && - table->prelocking_placeholder != TABLE_LIST::FK && - table->table->found_next_number_field && - (table->lock_type >= TL_WRITE_ALLOW_WRITE)) - return 1; - } - - return 0; -} - -/* - checks if we have select tables in the table list and write tables - with auto-increment column. - - SYNOPSIS - has_two_write_locked_tables_with_auto_increment_and_select - tables Table list - - RETURN VALUES - - -true if the table list has atleast one table with auto-increment column - - - and atleast one table to select from. - -false otherwise -*/ - -static bool -has_write_table_with_auto_increment_and_select(TABLE_LIST *tables) -{ - bool has_select= false; - bool has_auto_increment_tables = has_write_table_with_auto_increment(tables); - for(TABLE_LIST *table= tables; table; table= table->next_global) - { - if (!table->placeholder() && - table->lock_type <= TL_READ_NO_INSERT && - table->prelocking_placeholder != TABLE_LIST::FK) - { - has_select= true; - break; - } - } - return(has_select && has_auto_increment_tables); -} - -/* - Tells if there is a table whose auto_increment column is a part - of a compound primary key while is not the first column in - the table definition. - - @param tables Table list - - @return true if the table exists, fais if does not. -*/ - -static bool -has_write_table_auto_increment_not_first_in_pk(TABLE_LIST *tables) -{ - for (TABLE_LIST *table= tables; table; table= table->next_global) - { - /* we must do preliminary checks as table->table may be NULL */ - if (!table->placeholder() && - table->table->found_next_number_field && - (table->lock_type >= TL_WRITE_ALLOW_WRITE) - && table->table->s->next_number_keypart != 0) - return 1; - } - - return 0; -} /** Decide on logging format to use for the statement and issue errors @@ -5743,6 +5675,17 @@ int THD::decide_logging_format(TABLE_LIST *tables) !(wsrep_binlog_format() == BINLOG_FORMAT_STMT && !binlog_filter->db_ok(db))) { + + if (is_bulk_op()) + { + if (wsrep_binlog_format() == BINLOG_FORMAT_STMT) + { + my_error(ER_BINLOG_NON_SUPPORTED_BULK, MYF(0)); + DBUG_PRINT("info", + ("decision: no logging since an error was generated")); + DBUG_RETURN(-1); + } + } /* Compute one bit field with the union of all the engine capabilities, and one with the intersection of all the engine @@ -5757,17 +5700,25 @@ int THD::decide_logging_format(TABLE_LIST *tables) If different types of engines are about to be updated. For example: Innodb and Falcon; Innodb and MyIsam. */ - my_bool multi_write_engine= FALSE; + bool multi_write_engine= FALSE; /* If different types of engines are about to be accessed and any of them is about to be updated. For example: Innodb and Falcon; Innodb and MyIsam. */ - my_bool multi_access_engine= FALSE; + bool multi_access_engine= FALSE; /* Identifies if a table is changed. */ - my_bool is_write= FALSE; + bool is_write= FALSE; // If any write tables + bool has_read_tables= FALSE; // If any read only tables + bool has_auto_increment_write_tables= FALSE; // Write with auto-increment + /* If a write table that doesn't have auto increment part first */ + bool has_write_table_auto_increment_not_first_in_pk= FALSE; + bool has_auto_increment_write_tables_not_first= FALSE; + bool found_first_not_own_table= FALSE; + bool has_write_tables_with_unsafe_statements= FALSE; + /* A pointer to a previous table that was changed. */ @@ -5813,31 +5764,6 @@ int THD::decide_logging_format(TABLE_LIST *tables) } #endif - if (wsrep_binlog_format() != BINLOG_FORMAT_ROW && tables) - { - /* - DML statements that modify a table with an auto_increment column based on - rows selected from a table are unsafe as the order in which the rows are - fetched fron the select tables cannot be determined and may differ on - master and slave. - */ - if (has_write_table_with_auto_increment_and_select(tables)) - lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_WRITE_AUTOINC_SELECT); - - if (has_write_table_auto_increment_not_first_in_pk(tables)) - lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_AUTOINC_NOT_FIRST); - - /* - A query that modifies autoinc column in sub-statement can make the - master and slave inconsistent. - We can solve these problems in mixed mode by switching to binlogging - if at least one updated table is used by sub-statement - */ - if (lex->requires_prelocking() && - has_write_table_with_auto_increment(lex->first_not_own_table())) - lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_AUTOINC_COLUMNS); - } - /* Get the capabilities vector for all involved storage engines and mask out the flags for the binary log. @@ -5876,16 +5802,35 @@ int THD::decide_logging_format(TABLE_LIST *tables) continue; } } + if (table == lex->first_not_own_table()) + found_first_not_own_table= true; replicated_tables_count++; + if (table->prelocking_placeholder != TABLE_LIST::FK) + { + if (table->lock_type <= TL_READ_NO_INSERT) + has_read_tables= true; + else if (table->table->found_next_number_field && + (table->lock_type >= TL_WRITE_ALLOW_WRITE)) + { + has_auto_increment_write_tables= true; + has_auto_increment_write_tables_not_first= found_first_not_own_table; + if (table->table->s->next_number_keypart != 0) + has_write_table_auto_increment_not_first_in_pk= true; + } + } + if (table->lock_type >= TL_WRITE_ALLOW_WRITE) { + bool trans; if (prev_write_table && prev_write_table->file->ht != table->table->file->ht) multi_write_engine= TRUE; + if (table->table->s->non_determinstic_insert) + has_write_tables_with_unsafe_statements= true; - my_bool trans= table->table->file->has_transactions(); + trans= table->table->file->has_transactions(); if (table->table->s->tmp_table) lex->set_stmt_accessed_table(trans ? LEX::STMT_WRITES_TEMP_TRANS_TABLE : @@ -5923,6 +5868,34 @@ int THD::decide_logging_format(TABLE_LIST *tables) prev_access_table= table->table; } + if (wsrep_binlog_format() != BINLOG_FORMAT_ROW) + { + /* + DML statements that modify a table with an auto_increment + column based on rows selected from a table are unsafe as the + order in which the rows are fetched fron the select tables + cannot be determined and may differ on master and slave. + */ + if (has_auto_increment_write_tables && has_read_tables) + lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_WRITE_AUTOINC_SELECT); + + if (has_write_table_auto_increment_not_first_in_pk) + lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_AUTOINC_NOT_FIRST); + + if (has_write_tables_with_unsafe_statements) + lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); + + /* + A query that modifies autoinc column in sub-statement can make the + master and slave inconsistent. + We can solve these problems in mixed mode by switching to binlogging + if at least one updated table is used by sub-statement + */ + if (lex->requires_prelocking() && + has_auto_increment_write_tables_not_first) + lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_AUTOINC_COLUMNS); + } + DBUG_PRINT("info", ("flags_write_all_set: 0x%llx", flags_write_all_set)); DBUG_PRINT("info", ("flags_write_some_set: 0x%llx", flags_write_some_set)); DBUG_PRINT("info", ("flags_access_some_set: 0x%llx", flags_access_some_set)); @@ -5976,7 +5949,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) */ my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE), MYF(0)); } - else if (wsrep_binlog_format() == BINLOG_FORMAT_ROW && + else if ((wsrep_binlog_format() == BINLOG_FORMAT_ROW || is_bulk_op()) && sqlcom_can_generate_row_events(this)) { /* @@ -6049,7 +6022,8 @@ int THD::decide_logging_format(TABLE_LIST *tables) else { if (lex->is_stmt_unsafe() || lex->is_stmt_row_injection() - || (flags_write_all_set & HA_BINLOG_STMT_CAPABLE) == 0) + || (flags_write_all_set & HA_BINLOG_STMT_CAPABLE) == 0 || + is_bulk_op()) { /* log in row format! */ set_current_stmt_binlog_format_row_if_mixed(); @@ -6420,7 +6394,8 @@ int THD::binlog_write_row(TABLE* table, bool is_trans, Pack records into format for transfer. We are allocating more memory than needed, but that doesn't matter. */ - Row_data_memory memory(table, max_row_length(table, record)); + Row_data_memory memory(table, max_row_length(table, table->rpl_write_set, + record)); if (!memory.has_memory()) return HA_ERR_OUT_OF_MEM; @@ -6432,7 +6407,14 @@ int THD::binlog_write_row(TABLE* table, bool is_trans, if (variables.option_bits & OPTION_GTID_BEGIN) is_trans= 1; - Rows_log_event* const ev= + Rows_log_event* ev; + if (binlog_should_compress(len)) + ev = + binlog_prepare_pending_rows_event(table, variables.server_id, + len, is_trans, + static_cast<Write_rows_compressed_log_event*>(0)); + else + ev = binlog_prepare_pending_rows_event(table, variables.server_id, len, is_trans, static_cast<Write_rows_log_event*>(0)); @@ -6466,8 +6448,10 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, */ binlog_prepare_row_images(table); - size_t const before_maxlen = max_row_length(table, before_record); - size_t const after_maxlen = max_row_length(table, after_record); + size_t const before_maxlen= max_row_length(table, table->read_set, + before_record); + size_t const after_maxlen= max_row_length(table, table->rpl_write_set, + after_record); Row_data_memory row_data(table, before_maxlen, after_maxlen); if (!row_data.has_memory()) @@ -6496,8 +6480,15 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, DBUG_DUMP("after_row", after_row, after_size); #endif - Rows_log_event* const ev= - binlog_prepare_pending_rows_event(table, variables.server_id, + Rows_log_event* ev; + if(binlog_should_compress(before_size + after_size)) + ev = + binlog_prepare_pending_rows_event(table, variables.server_id, + before_size + after_size, is_trans, + static_cast<Update_rows_compressed_log_event*>(0)); + else + ev = + binlog_prepare_pending_rows_event(table, variables.server_id, before_size + after_size, is_trans, static_cast<Update_rows_log_event*>(0)); @@ -6539,7 +6530,8 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans, Pack records into format for transfer. We are allocating more memory than needed, but that doesn't matter. */ - Row_data_memory memory(table, max_row_length(table, record)); + Row_data_memory memory(table, max_row_length(table, table->read_set, + record)); if (unlikely(!memory.has_memory())) return HA_ERR_OUT_OF_MEM; @@ -6552,8 +6544,15 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans, if (variables.option_bits & OPTION_GTID_BEGIN) is_trans= 1; - Rows_log_event* const ev= - binlog_prepare_pending_rows_event(table, variables.server_id, + Rows_log_event* ev; + if(binlog_should_compress(len)) + ev = + binlog_prepare_pending_rows_event(table, variables.server_id, + len, is_trans, + static_cast<Delete_rows_compressed_log_event*>(0)); + else + ev = + binlog_prepare_pending_rows_event(table, variables.server_id, len, is_trans, static_cast<Delete_rows_log_event*>(0)); @@ -6571,15 +6570,17 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans, } +/** + Remove from read_set spurious columns. The write_set has been + handled before in table->mark_columns_needed_for_update. +*/ + void THD::binlog_prepare_row_images(TABLE *table) { DBUG_ENTER("THD::binlog_prepare_row_images"); - /** - Remove from read_set spurious columns. The write_set has been - handled before in table->mark_columns_needed_for_update. - */ - DBUG_PRINT_BITSET("debug", "table->read_set (before preparing): %s", table->read_set); + DBUG_PRINT_BITSET("debug", "table->read_set (before preparing): %s", + table->read_set); THD *thd= table->in_use; /** @@ -6597,21 +6598,19 @@ void THD::binlog_prepare_row_images(TABLE *table) */ DBUG_ASSERT(table->read_set != &table->tmp_set); - bitmap_clear_all(&table->tmp_set); - - switch(thd->variables.binlog_row_image) + switch (thd->variables.binlog_row_image) { case BINLOG_ROW_IMAGE_MINIMAL: /* MINIMAL: Mark only PK */ - table->mark_columns_used_by_index_no_reset(table->s->primary_key, - &table->tmp_set); + table->mark_columns_used_by_index(table->s->primary_key, + &table->tmp_set); break; case BINLOG_ROW_IMAGE_NOBLOB: /** NOBLOB: Remove unnecessary BLOB fields from read_set (the ones that are not part of PK). */ - bitmap_union(&table->tmp_set, table->read_set); + bitmap_copy(&table->tmp_set, table->read_set); for (Field **ptr=table->field ; *ptr ; ptr++) { Field *field= (*ptr); @@ -6629,7 +6628,8 @@ void THD::binlog_prepare_row_images(TABLE *table) table->write_set); } - DBUG_PRINT_BITSET("debug", "table->read_set (after preparing): %s", table->read_set); + DBUG_PRINT_BITSET("debug", "table->read_set (after preparing): %s", + table->read_set); DBUG_VOID_RETURN; } @@ -7018,15 +7018,27 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, flush the pending rows event if necessary. */ { - Query_log_event qinfo(this, query_arg, query_len, is_trans, direct, - suppress_use, errcode); + int error = 0; + /* Binlog table maps will be irrelevant after a Query_log_event (they are just removed on the slave side) so after the query log event is written to the binary log, we pretend that no table maps were written. - */ - int error= mysql_bin_log.write(&qinfo); + */ + if(binlog_should_compress(query_len)) + { + Query_compressed_log_event qinfo(this, query_arg, query_len, is_trans, direct, + suppress_use, errcode); + error= mysql_bin_log.write(&qinfo); + } + else + { + Query_log_event qinfo(this, query_arg, query_len, is_trans, direct, + suppress_use, errcode); + error= mysql_bin_log.write(&qinfo); + } + binlog_table_maps= 0; DBUG_RETURN(error); } @@ -7056,33 +7068,22 @@ THD::signal_wakeup_ready() mysql_cond_signal(&COND_wakeup_ready); } - -void THD::rgi_lock_temporary_tables() +void THD::set_last_commit_gtid(rpl_gtid >id) { - mysql_mutex_lock(&rgi_slave->rli->data_lock); - temporary_tables= rgi_slave->rli->save_temporary_tables; -} - -void THD::rgi_unlock_temporary_tables(bool clear) -{ - rgi_slave->rli->save_temporary_tables= temporary_tables; - mysql_mutex_unlock(&rgi_slave->rli->data_lock); - if (clear) +#ifndef EMBEDDED_LIBRARY + bool changed_gtid= (m_last_commit_gtid.seq_no != gtid.seq_no); +#endif + m_last_commit_gtid= gtid; +#ifndef EMBEDDED_LIBRARY + if (changed_gtid && + session_tracker.get_tracker(SESSION_SYSVARS_TRACKER)->is_enabled()) { - /* - Temporary tables are shared with other by sql execution threads. - As a safety messure, clear the pointer to the common area. - */ - temporary_tables= 0; - } -} - -bool THD::rgi_have_temporary_tables() -{ - return rgi_slave->rli->save_temporary_tables != 0; + session_tracker.get_tracker(SESSION_SYSVARS_TRACKER)-> + mark_as_changed(this, (LEX_CSTRING*)Sys_last_gtid_ptr); + } +#endif } - void wait_for_commit::reinit() { |