diff options
Diffstat (limited to 'sql/sql_class.cc')
-rw-r--r-- | sql/sql_class.cc | 772 |
1 files changed, 555 insertions, 217 deletions
diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 28bf77c94e8..883e9c688ff 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -35,7 +35,7 @@ #include "sql_base.h" // close_thread_tables #include "sql_time.h" // date_time_format_copy #include "tztime.h" // MYSQL_TIME <-> my_time_t -#include "sql_acl.h" // NO_ACCESS, +#include "sql_acl.h" // NO_ACL, // acl_getroot_no_password #include "sql_base.h" #include "sql_handler.h" // mysql_ha_cleanup @@ -71,6 +71,7 @@ static inline bool wsrep_is_bf_aborted(THD* thd) { return false; } #endif /* WITH_WSREP */ #include "opt_trace.h" +#include <mysql/psi/mysql_transaction.h> #ifdef HAVE_SYS_SYSCALL_H #include <sys/syscall.h> @@ -128,6 +129,39 @@ bool Key_part_spec::operator==(const Key_part_spec& other) const &other.field_name); } + +bool Key_part_spec::check_key_for_blob(const handler *file) const +{ + if (!(file->ha_table_flags() & HA_CAN_INDEX_BLOBS)) + { + my_error(ER_BLOB_USED_AS_KEY, MYF(0), field_name.str, file->table_type()); + return true; + } + return false; +} + + +bool Key_part_spec::check_key_length_for_blob() const +{ + if (!length) + { + my_error(ER_BLOB_KEY_WITHOUT_LENGTH, MYF(0), field_name.str); + return true; + } + return false; +} + + +bool Key_part_spec::init_multiple_key_for_blob(const handler *file) +{ + if (check_key_for_blob(file)) + return true; + if (!length) + length= file->max_key_length() + 1; + return false; +} + + /** Construct an (almost) deep copy of this key. Only those elements that are known to never change are not copied. @@ -141,7 +175,8 @@ Key::Key(const Key &rhs, MEM_ROOT *mem_root) columns(rhs.columns, mem_root), name(rhs.name), option_list(rhs.option_list), - generated(rhs.generated), invisible(false) + generated(rhs.generated), invisible(false), + without_overlaps(rhs.without_overlaps), period(rhs.period) { list_copy_and_replace_each_value(columns, mem_root); } @@ -155,6 +190,7 @@ Key::Key(const Key &rhs, MEM_ROOT *mem_root) Foreign_key::Foreign_key(const Foreign_key &rhs, MEM_ROOT *mem_root) :Key(rhs,mem_root), + constraint_name(rhs.constraint_name), ref_db(rhs.ref_db), ref_table(rhs.ref_table), ref_columns(rhs.ref_columns,mem_root), @@ -300,6 +336,12 @@ THD *thd_get_current_thd() } +extern "C" unsigned long long thd_query_id(const MYSQL_THD thd) +{ + return((unsigned long long)thd->query_id); +} + + /** Get thread attributes for connection threads @@ -408,12 +450,6 @@ void thd_exit_cond(MYSQL_THD thd, const PSI_stage_info *stage, } extern "C" -void **thd_ha_data(const THD *thd, const struct handlerton *hton) -{ - return (void **) &thd->ha_data[hton->slot].ha_ptr; -} - -extern "C" void thd_storage_lock_wait(THD *thd, long long value) { thd->utime_after_lock+= value; @@ -425,7 +461,7 @@ void thd_storage_lock_wait(THD *thd, long long value) extern "C" void *thd_get_ha_data(const THD *thd, const struct handlerton *hton) { - return *thd_ha_data(thd, hton); + return thd->ha_data[hton->slot].ha_ptr; } @@ -438,7 +474,9 @@ void thd_set_ha_data(THD *thd, const struct handlerton *hton, const void *ha_data) { plugin_ref *lock= &thd->ha_data[hton->slot].lock; - DBUG_ASSERT(thd == current_thd); + mysql_mutex_lock(&thd->LOCK_thd_data); + thd->ha_data[hton->slot].ha_ptr= const_cast<void*>(ha_data); + mysql_mutex_unlock(&thd->LOCK_thd_data); if (ha_data && !*lock) *lock= ha_lock_engine(NULL, (handlerton*) hton); else if (!ha_data && *lock) @@ -446,9 +484,6 @@ void thd_set_ha_data(THD *thd, const struct handlerton *hton, plugin_unlock(NULL, *lock); *lock= NULL; } - mysql_mutex_lock(&thd->LOCK_thd_data); - *thd_ha_data(thd, hton)= (void*) ha_data; - mysql_mutex_unlock(&thd->LOCK_thd_data); } @@ -595,19 +630,20 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) :Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION, /* statement id */ 0), rli_fake(0), rgi_fake(0), rgi_slave(NULL), - protocol_text(this), protocol_binary(this), - m_current_stage_key(0), + protocol_text(this), protocol_binary(this), initial_status_var(0), + m_current_stage_key(0), m_psi(0), in_sub_stmt(0), log_all_errors(0), binlog_unsafe_warning_flags(0), current_stmt_binlog_format(BINLOG_FORMAT_MIXED), - binlog_table_maps(0), bulk_param(0), table_map_for_update(0), m_examined_row_count(0), accessed_rows_and_keys(0), m_digest(NULL), m_statement_psi(NULL), + m_transaction_psi(NULL), m_idle_psi(NULL), + col_access(NO_ACL), thread_id(id), thread_dbug_id(id), os_thread_id(0), @@ -661,9 +697,10 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) wsrep_apply_format(0), wsrep_rbr_buf(NULL), wsrep_sync_wait_gtid(WSREP_GTID_UNDEFINED), + wsrep_last_written_gtid_seqno(0), + wsrep_current_gtid_seqno(0), wsrep_affected_rows(0), wsrep_has_ignored_error(false), - wsrep_replicate_GTID(false), wsrep_ignore_table(false), wsrep_aborter(0), @@ -706,8 +743,9 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) the destructor works OK in case of an error. The main_mem_root will be re-initialized in init_for_queries(). */ - init_sql_alloc(&main_mem_root, "THD::main_mem_root", - ALLOC_ROOT_MIN_BLOCK_SIZE, 0, MYF(MY_THREAD_SPECIFIC)); + init_sql_alloc(key_memory_thd_main_mem_root, + &main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0, + MYF(MY_THREAD_SPECIFIC)); /* Allocation of user variables for binary logging is always done with main @@ -719,9 +757,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) thread_stack= 0; scheduler= thread_scheduler; // Will be fixed later event_scheduler.data= 0; - event_scheduler.m_psi= 0; skip_wait_timeout= false; - extra_port= 0; catalog= (char*)"std"; // the only catalog we have for now main_security_ctx.init(); security_ctx= &main_security_ctx; @@ -731,7 +767,6 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) count_cuted_fields= CHECK_FIELD_IGNORE; killed= NOT_KILLED; killed_err= 0; - col_access=0; is_slave_error= thread_specific_used= FALSE; my_hash_clear(&handler_tables_hash); my_hash_clear(&ull_hash); @@ -761,6 +796,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) bzero((void*) ha_data, sizeof(ha_data)); mysys_var=0; binlog_evt_union.do_union= FALSE; + binlog_table_maps= FALSE; enable_slow_log= 0; durability_property= HA_REGULAR_DURABILITY; @@ -775,12 +811,14 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) system_thread= NON_SYSTEM_THREAD; cleanup_done= free_connection_done= abort_on_warning= got_warning= 0; peer_port= 0; // For SHOW PROCESSLIST - transaction.m_pending_rows_event= 0; - transaction.on= 1; - wt_thd_lazy_init(&transaction.wt, &variables.wt_deadlock_search_depth_short, - &variables.wt_timeout_short, - &variables.wt_deadlock_search_depth_long, - &variables.wt_timeout_long); + transaction= &default_transaction; + transaction->m_pending_rows_event= 0; + transaction->on= 1; + wt_thd_lazy_init(&transaction->wt, + &variables.wt_deadlock_search_depth_short, + &variables.wt_timeout_short, + &variables.wt_deadlock_search_depth_long, + &variables.wt_timeout_long); #ifdef SIGNAL_WITH_VIO_CLOSE active_vio = 0; #endif @@ -805,16 +843,18 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) reset_open_tables_state(this); init(); + debug_sync_init_thread(this); #if defined(ENABLED_PROFILING) profiling.set_thd(this); #endif user_connect=(USER_CONN *)0; - my_hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0, - (my_hash_get_key) get_var_key, + my_hash_init(key_memory_user_var_entry, &user_vars, system_charset_info, + USER_VARS_HASH_SIZE, 0, 0, (my_hash_get_key) get_var_key, (my_hash_free_key) free_user_var, HASH_THREAD_SPECIFIC); - my_hash_init(&sequences, system_charset_info, SEQUENCES_HASH_SIZE, 0, 0, - (my_hash_get_key) get_sequence_last_key, - (my_hash_free_key) free_sequence_last, HASH_THREAD_SPECIFIC); + my_hash_init(PSI_INSTRUMENT_ME, &sequences, system_charset_info, + SEQUENCES_HASH_SIZE, 0, 0, (my_hash_get_key) + get_sequence_last_key, (my_hash_free_key) free_sequence_last, + HASH_THREAD_SPECIFIC); sp_proc_cache= NULL; sp_func_cache= NULL; @@ -823,7 +863,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) /* For user vars replication*/ if (opt_bin_log) - my_init_dynamic_array(&user_var_events, + my_init_dynamic_array(key_memory_user_var_entry, &user_var_events, sizeof(BINLOG_USER_VAR_EVENT *), 16, 16, MYF(0)); else bzero((char*) &user_var_events, sizeof(user_var_events)); @@ -851,7 +891,8 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) m_token_array= NULL; if (max_digest_length > 0) { - m_token_array= (unsigned char*) my_malloc(max_digest_length, + m_token_array= (unsigned char*) my_malloc(PSI_INSTRUMENT_ME, + max_digest_length, MYF(MY_WME|MY_THREAD_SPECIFIC)); } @@ -1151,19 +1192,9 @@ void *thd_memdup(MYSQL_THD thd, const void* str, size_t size) extern "C" void thd_get_xid(const MYSQL_THD thd, MYSQL_XID *xid) { -#ifdef WITH_WSREP - if (!thd->wsrep_xid.is_null()) - { - *xid = *(MYSQL_XID *) &thd->wsrep_xid; - return; - } -#endif /* WITH_WSREP */ - *xid= thd->transaction.xid_state.is_explicit_XA() ? - *(MYSQL_XID *) thd->transaction.xid_state.get_xid() : - *(MYSQL_XID *) &thd->transaction.implicit_xid; + *xid = *(MYSQL_XID *) thd->get_xid(); } - extern "C" my_time_t thd_TIME_to_gmt_sec(MYSQL_THD thd, const MYSQL_TIME *ltime, unsigned int *errcode) @@ -1184,11 +1215,6 @@ void thd_gmt_sec_to_TIME(MYSQL_THD thd, MYSQL_TIME *ltime, my_time_t t) #ifdef _WIN32 -extern "C" THD *_current_thd_noinline(void) -{ - return my_pthread_getspecific_ptr(THD*,THR_THD); -} - extern "C" my_thread_id next_thread_id_noinline() { #undef next_thread_id @@ -1235,10 +1261,10 @@ void THD::init() if (variables.sql_mode & MODE_ANSI_QUOTES) server_status|= SERVER_STATUS_ANSI_QUOTES; - transaction.all.modified_non_trans_table= - transaction.stmt.modified_non_trans_table= FALSE; - transaction.all.m_unsafe_rollback_flags= - transaction.stmt.m_unsafe_rollback_flags= 0; + transaction->all.modified_non_trans_table= + transaction->stmt.modified_non_trans_table= FALSE; + transaction->all.m_unsafe_rollback_flags= + transaction->stmt.m_unsafe_rollback_flags= 0; open_options=ha_open_options; update_lock_default= (variables.low_priority_updates ? @@ -1279,7 +1305,6 @@ void THD::init() wsrep_rbr_buf = NULL; wsrep_affected_rows = 0; m_wsrep_next_trx_id = WSREP_UNDEFINED_TRX_ID; - wsrep_replicate_GTID = false; wsrep_aborter = 0; wsrep_desynced_backup_stage= false; #endif /* WITH_WSREP */ @@ -1289,22 +1314,16 @@ void THD::init() else variables.option_bits&= ~OPTION_BIN_LOG; - variables.sql_log_bin_off= 0; - select_commands= update_commands= other_commands= 0; /* Set to handle counting of aborted connections */ userstat_running= opt_userstat_running; last_global_update_time= current_connect_time= time(NULL); -#if defined(ENABLED_DEBUG_SYNC) - /* Initialize the Debug Sync Facility. See debug_sync.cc. */ - debug_sync_init_thread(this); -#endif /* defined(ENABLED_DEBUG_SYNC) */ - #ifndef EMBEDDED_LIBRARY session_tracker.enable(this); #endif //EMBEDDED_LIBRARY apc_target.init(&LOCK_thd_kill); + gap_tracker_data.init(); DBUG_VOID_RETURN; } @@ -1374,21 +1393,18 @@ void THD::update_all_stats() void THD::init_for_queries() { - set_time(); - /* - We don't need to call ha_enable_transaction() as we can't have - any active transactions that has to be committed - */ - DBUG_ASSERT(transaction.is_empty()); - transaction.on= TRUE; + DBUG_ASSERT(transaction->on); + DBUG_ASSERT(m_transaction_psi == NULL); + /* Set time for --init-file queries */ + set_time(); reset_root_defaults(mem_root, variables.query_alloc_block_size, variables.query_prealloc_size); - reset_root_defaults(&transaction.mem_root, + reset_root_defaults(&transaction->mem_root, variables.trans_alloc_block_size, variables.trans_prealloc_size); - DBUG_ASSERT(!transaction.xid_state.is_explicit_XA()); - DBUG_ASSERT(transaction.implicit_xid.is_null()); + DBUG_ASSERT(!transaction->xid_state.is_explicit_XA()); + DBUG_ASSERT(transaction->implicit_xid.is_null()); } @@ -1423,12 +1439,13 @@ void THD::change_user(void) init(); stmt_map.reset(); - my_hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0, - (my_hash_get_key) get_var_key, - (my_hash_free_key) free_user_var, 0); - my_hash_init(&sequences, system_charset_info, SEQUENCES_HASH_SIZE, 0, 0, - (my_hash_get_key) get_sequence_last_key, - (my_hash_free_key) free_sequence_last, HASH_THREAD_SPECIFIC); + my_hash_init(key_memory_user_var_entry, &user_vars, system_charset_info, + USER_VARS_HASH_SIZE, 0, 0, (my_hash_get_key) get_var_key, + (my_hash_free_key) free_user_var, HASH_THREAD_SPECIFIC); + my_hash_init(key_memory_user_var_entry, &sequences, system_charset_info, + SEQUENCES_HASH_SIZE, 0, 0, (my_hash_get_key) + get_sequence_last_key, (my_hash_free_key) free_sequence_last, + HASH_THREAD_SPECIFIC); sp_cache_clear(&sp_proc_cache); sp_cache_clear(&sp_func_cache); sp_cache_clear(&sp_package_spec_cache); @@ -1465,7 +1482,8 @@ bool THD::set_db(const LEX_CSTRING *new_db) const char *tmp= NULL; if (new_db->str) { - if (!(tmp= my_strndup(new_db->str, new_db->length, MYF(MY_WME | ME_FATAL)))) + if (!(tmp= my_strndup(key_memory_THD_db, new_db->str, new_db->length, + MYF(MY_WME | ME_FATAL)))) result= 1; } @@ -1526,12 +1544,14 @@ void THD::cleanup(void) delete_dynamic(&user_var_events); close_temporary_tables(); - if (transaction.xid_state.is_explicit_XA()) + if (transaction->xid_state.is_explicit_XA()) trans_xa_detach(this); else trans_rollback(this); DBUG_ASSERT(open_tables == NULL); + DBUG_ASSERT(m_transaction_psi == NULL); + /* If the thread was in the middle of an ongoing transaction (rolled back a few lines above) or under LOCK TABLES (unlocked the tables @@ -1552,12 +1572,7 @@ void THD::cleanup(void) decrease_user_connections(user_connect); user_connect= 0; // Safety } - wt_thd_destroy(&transaction.wt); - -#if defined(ENABLED_DEBUG_SYNC) - /* End the Debug Sync Facility. See debug_sync.cc. */ - debug_sync_end_thread(this); -#endif /* defined(ENABLED_DEBUG_SYNC) */ + wt_thd_destroy(&transaction->wt); my_hash_free(&user_vars); my_hash_free(&sequences); @@ -1612,6 +1627,7 @@ void THD::free_connection() #if defined(ENABLED_PROFILING) profiling.restart(); // Reset profiling #endif + debug_sync_reset_thread(this); } /* @@ -1636,7 +1652,7 @@ void THD::reset_for_reuse() abort_on_warning= 0; free_connection_done= 0; m_command= COM_CONNECT; - transaction.on= 1; + transaction->on= 1; #if defined(ENABLED_PROFILING) profiling.reset(); #endif @@ -1656,6 +1672,8 @@ THD::~THD() DBUG_ENTER("~THD()"); /* Make sure threads are not available via server_threads. */ assert_not_linked(); + if (m_psi) + PSI_CALL_set_thread_THD(m_psi, 0); /* In error cases, thd may not be current thd. We have to fix this so @@ -1684,7 +1702,7 @@ THD::~THD() #endif mdl_context.destroy(); - free_root(&transaction.mem_root,MYF(0)); + transaction->free(); mysql_cond_destroy(&COND_wakeup_ready); mysql_mutex_destroy(&LOCK_wakeup_ready); mysql_mutex_destroy(&LOCK_thd_data); @@ -1716,12 +1734,16 @@ THD::~THD() lf_hash_put_pins(tdc_hash_pins); if (xid_hash_pins) lf_hash_put_pins(xid_hash_pins); + debug_sync_end_thread(this); /* Ensure everything is freed */ status_var.local_memory_used-= sizeof(THD); /* trick to make happy memory accounting system */ #ifndef EMBEDDED_LIBRARY session_tracker.sysvars.deinit(); +#ifdef USER_VAR_TRACKING + session_tracker.user_variables.deinit(); +#endif // USER_VAR_TRACKING #endif //EMBEDDED_LIBRARY if (status_var.local_memory_used != 0) @@ -2136,7 +2158,7 @@ void THD::reset_killed() the structure for the net buffer */ -bool THD::store_globals() +void THD::store_globals() { /* Assert that thread_stack is initialized: it's necessary to be able @@ -2144,8 +2166,7 @@ bool THD::store_globals() */ DBUG_ASSERT(thread_stack); - if (set_current_thd(this)) - return 1; + set_current_thd(this); /* mysys_var is concurrently readable by a killer thread. It is protected by LOCK_thd_kill, it is not needed to lock while the @@ -2187,8 +2208,6 @@ bool THD::store_globals() created in another thread */ thr_lock_info_init(&lock_info, mysys_var); - - return 0; } /** @@ -2480,7 +2499,8 @@ bool THD::to_ident_sys_alloc(Lex_ident_sys_st *to, const Lex_ident_cli_st *ident Item_basic_constant * -THD::make_string_literal(const char *str, size_t length, uint repertoire) +THD::make_string_literal(const char *str, size_t length, + my_repertoire_t repertoire) { if (!length && (variables.sql_mode & MODE_EMPTY_STRING_IS_NULL)) return new (mem_root) Item_null(this, 0, variables.collation_connection); @@ -2521,8 +2541,7 @@ THD::make_string_literal_charset(const Lex_string_with_metadata_st &str, { if (!str.length && (variables.sql_mode & MODE_EMPTY_STRING_IS_NULL)) return new (mem_root) Item_null(this, 0, cs); - return new (mem_root) Item_string_with_introducer(this, - str.str, (uint)str.length, cs); + return new (mem_root) Item_string_with_introducer(this, str, cs); } @@ -2580,7 +2599,8 @@ void THD::add_changed_table(TABLE *table) { DBUG_ENTER("THD::add_changed_table(table)"); - DBUG_ASSERT(in_multi_stmt_transaction_mode() && table->file->has_transactions()); + DBUG_ASSERT(in_multi_stmt_transaction_mode() && + table->file->has_transactions()); add_changed_table(table->s->table_cache_key.str, (long) table->s->table_cache_key.length); DBUG_VOID_RETURN; @@ -2590,8 +2610,8 @@ void THD::add_changed_table(TABLE *table) void THD::add_changed_table(const char *key, size_t key_length) { DBUG_ENTER("THD::add_changed_table(key)"); - CHANGED_TABLE_LIST **prev_changed = &transaction.changed_tables; - CHANGED_TABLE_LIST *curr = transaction.changed_tables; + CHANGED_TABLE_LIST **prev_changed = &transaction->changed_tables; + CHANGED_TABLE_LIST *curr = transaction->changed_tables; for (; curr; prev_changed = &(curr->next), curr = curr->next) { @@ -3043,15 +3063,6 @@ int select_send::send_data(List<Item> &items) Protocol *protocol= thd->protocol; DBUG_ENTER("select_send::send_data"); - /* unit is not set when using 'delete ... returning' */ - if (unit && unit->offset_limit_cnt) - { // using limit offset,count - unit->offset_limit_cnt--; - DBUG_RETURN(FALSE); - } - if (thd->killed == ABORT_QUERY) - DBUG_RETURN(FALSE); - protocol->prepare_for_resend(); if (protocol->send_result_set_row(&items)) { @@ -3314,13 +3325,6 @@ int select_export::send_data(List<Item> &items) String tmp(buff,sizeof(buff),&my_charset_bin),*res; tmp.length(0); - if (unit->offset_limit_cnt) - { // using limit offset,count - unit->offset_limit_cnt--; - DBUG_RETURN(0); - } - if (thd->killed == ABORT_QUERY) - DBUG_RETURN(0); row_count++; Item *item; uint used_length=0,items_left=items.elements; @@ -3438,7 +3442,7 @@ int select_export::send_data(List<Item> &items) pos++) { #ifdef USE_MB - if (use_mb(res_charset)) + if (res_charset->use_mb()) { int l; if ((l=my_ismbchar(res_charset, pos, end))) @@ -3574,14 +3578,6 @@ int select_dump::send_data(List<Item> &items) Item *item; DBUG_ENTER("select_dump::send_data"); - if (unit->offset_limit_cnt) - { // using limit offset,count - unit->offset_limit_cnt--; - DBUG_RETURN(0); - } - if (thd->killed == ABORT_QUERY) - DBUG_RETURN(0); - if (row_count++ > 1) { my_message(ER_TOO_MANY_ROWS, ER_THD(thd, ER_TOO_MANY_ROWS), MYF(0)); @@ -3617,13 +3613,6 @@ int select_singlerow_subselect::send_data(List<Item> &items) MYF(current_thd->lex->ignore ? ME_WARNING : 0)); DBUG_RETURN(1); } - if (unit->offset_limit_cnt) - { // Using limit offset,count - unit->offset_limit_cnt--; - DBUG_RETURN(0); - } - if (thd->killed == ABORT_QUERY) - DBUG_RETURN(0); List_iterator_fast<Item> li(items); Item *val_item; for (uint i= 0; (val_item= li++); i++) @@ -3674,7 +3663,7 @@ int select_max_min_finder_subselect::send_data(List<Item> &items) break; case ROW_RESULT: case TIME_RESULT: - // This case should never be choosen + // This case should never be chosen DBUG_ASSERT(0); op= 0; } @@ -3758,13 +3747,6 @@ int select_exists_subselect::send_data(List<Item> &items) { DBUG_ENTER("select_exists_subselect::send_data"); Item_exists_subselect *it= (Item_exists_subselect *)item; - if (unit->offset_limit_cnt) - { // Using limit offset,count - unit->offset_limit_cnt--; - DBUG_RETURN(0); - } - if (thd->killed == ABORT_QUERY) - DBUG_RETURN(0); it->value= 1; it->assigned(1); DBUG_RETURN(0); @@ -3993,12 +3975,12 @@ Statement_map::Statement_map() : START_STMT_HASH_SIZE = 16, START_NAME_HASH_SIZE = 16 }; - my_hash_init(&st_hash, &my_charset_bin, START_STMT_HASH_SIZE, 0, 0, - get_statement_id_as_hash_key, + my_hash_init(key_memory_prepared_statement_map, &st_hash, &my_charset_bin, + START_STMT_HASH_SIZE, 0, 0, get_statement_id_as_hash_key, delete_statement_as_hash_key, MYF(0)); - my_hash_init(&names_hash, system_charset_info, START_NAME_HASH_SIZE, 0, 0, + my_hash_init(key_memory_prepared_statement_map, &names_hash, system_charset_info, START_NAME_HASH_SIZE, 0, 0, (my_hash_get_key) get_stmt_name_hash_key, - NULL,MYF(0)); + NULL, MYF(0)); } @@ -4166,12 +4148,7 @@ int select_dumpvar::send_data(List<Item> &items) { DBUG_ENTER("select_dumpvar::send_data"); - if (unit->offset_limit_cnt) - { // using limit offset,count - unit->offset_limit_cnt--; - DBUG_RETURN(0); - } - if (row_count++) + if (row_count++) { my_message(ER_TOO_MANY_ROWS, ER_THD(thd, ER_TOO_MANY_ROWS), MYF(0)); DBUG_RETURN(1); @@ -4361,10 +4338,10 @@ void Security_context::init() host= user= ip= external_user= 0; host_or_ip= "connecting host"; priv_user[0]= priv_host[0]= proxy_user[0]= priv_role[0]= '\0'; - master_access= 0; + master_access= NO_ACL; password_expired= false; #ifndef NO_EMBEDDED_ACCESS_CHECKS - db_access= NO_ACCESS; + db_access= NO_ACL; #endif } @@ -4399,7 +4376,7 @@ void Security_context::skip_grants() { /* privileges for the user are unknown everything is allowed */ host_or_ip= (char *)""; - master_access= ~NO_ACCESS; + master_access= ALL_KNOWN_ACL; *priv_user= *priv_host= '\0'; password_expired= false; } @@ -4407,15 +4384,16 @@ void Security_context::skip_grants() bool Security_context::set_user(char *user_arg) { - my_free((char*) user); - user= my_strdup(user_arg, MYF(0)); + my_free(const_cast<char*>(user)); + user= my_strdup(key_memory_MPVIO_EXT_auth_info, user_arg, MYF(0)); return user == 0; } -bool Security_context::check_access(ulong want_access, bool match_any) +bool Security_context::check_access(const privilege_t want_access, + bool match_any) { DBUG_ENTER("Security_context::check_access"); - DBUG_RETURN((match_any ? (master_access & want_access) + DBUG_RETURN((match_any ? (master_access & want_access) != NO_ACL : ((master_access & want_access) == want_access))); } @@ -4682,7 +4660,13 @@ extern "C" void thd_progress_report(MYSQL_THD thd, return; if (thd->progress.max_counter != max_progress) // Simple optimization { - mysql_mutex_lock(&thd->LOCK_thd_data); + /* + Better to not wait in the unlikely event that LOCK_thd_data is locked + as Galera can potentially have this locked for a long time. + Progress counters will fix themselves after the next call. + */ + if (mysql_mutex_trylock(&thd->LOCK_thd_data)) + return; thd->progress.counter= progress; thd->progress.max_counter= max_progress; mysql_mutex_unlock(&thd->LOCK_thd_data); @@ -4783,6 +4767,32 @@ extern "C" void thd_create_random_password(MYSQL_THD thd, } +extern "C" const char *thd_priv_host(MYSQL_THD thd, size_t *length) +{ + const Security_context *sctx= thd->security_ctx; + if (!sctx) + { + *length= 0; + return NULL; + } + *length= strlen(sctx->priv_host); + return sctx->priv_host; +} + + +extern "C" const char *thd_priv_user(MYSQL_THD thd, size_t *length) +{ + const Security_context *sctx= thd->security_ctx; + if (!sctx) + { + *length= 0; + return NULL; + } + *length= strlen(sctx->priv_user); + return sctx->priv_user; +} + + #ifdef INNODB_COMPATIBILITY_HOOKS /** open a table and add it to thd->open_tables @@ -4802,7 +4812,8 @@ TABLE *open_purge_table(THD *thd, const char *db, size_t dblen, DBUG_ASSERT(thd->open_tables == NULL); DBUG_ASSERT(thd->locked_tables_mode < LTM_PRELOCKED); - Open_table_context ot_ctx(thd, MYSQL_OPEN_IGNORE_FLUSH); + /* Purge already hold the MDL for the table */ + Open_table_context ot_ctx(thd, MYSQL_OPEN_HAS_MDL_LOCK); TABLE_LIST *tl= (TABLE_LIST*)thd->alloc(sizeof(TABLE_LIST)); LEX_CSTRING db_name= {db, dblen }; LEX_CSTRING table_name= { tb, tblen }; @@ -4821,6 +4832,12 @@ TABLE *open_purge_table(THD *thd, const char *db, size_t dblen, DBUG_RETURN(error ? NULL : tl->table); } +TABLE *get_purge_table(THD *thd) +{ + /* see above, at most one table can be opened */ + DBUG_ASSERT(thd->open_tables == NULL || thd->open_tables->next == NULL); + return thd->open_tables; +} /** Find an open table in the list of prelocked tabled @@ -4864,6 +4881,121 @@ void destroy_thd(MYSQL_THD thd) delete thd; } +/** + Create a THD that only has auxilliary functions + It will never be added to the global connection list + server_threads. It does not represent any client connection. + + It should never be counted, because it will stall the + shutdown. It is solely for engine's internal use, + like for example, evaluation of virtual function in innodb + purge. +*/ +extern "C" pthread_key(struct st_my_thread_var *, THR_KEY_mysys); +MYSQL_THD create_background_thd() +{ + auto save_thd = current_thd; + set_current_thd(nullptr); + + auto save_mysysvar= pthread_getspecific(THR_KEY_mysys); + + /* + Allocate new mysys_var specifically new THD, + so that e.g safemalloc, DBUG etc are happy. + */ + pthread_setspecific(THR_KEY_mysys, 0); + my_thread_init(); + auto thd_mysysvar= pthread_getspecific(THR_KEY_mysys); + auto thd= new THD(0); + pthread_setspecific(THR_KEY_mysys, save_mysysvar); + thd->set_psi(nullptr); + set_current_thd(save_thd); + + /* + Workaround the adverse effect of incrementing thread_count + in THD constructor. We do not want these THDs to be counted, + or waited for on shutdown. + */ + THD_count::count--; + + thd->mysys_var= (st_my_thread_var *) thd_mysysvar; + thd->set_command(COM_DAEMON); + thd->system_thread= SYSTEM_THREAD_GENERIC; + thd->security_ctx->host_or_ip= ""; + thd->real_id= 0; + thd->thread_id= 0; + thd->query_id= 0; + return thd; +} + + +/* + Attach a background THD. + + Changes current value THR_KEY_mysys TLS variable, + and returns the original value. +*/ +void *thd_attach_thd(MYSQL_THD thd) +{ + DBUG_ASSERT(!current_thd); + DBUG_ASSERT(thd && thd->mysys_var); + + auto save_mysysvar= pthread_getspecific(THR_KEY_mysys); + pthread_setspecific(THR_KEY_mysys, thd->mysys_var); + thd->thread_stack= (char *) &thd; + thd->store_globals(); + return save_mysysvar; +} + +/* + Restore THR_KEY_mysys TLS variable, + which was changed thd_attach_thd(). +*/ +void thd_detach_thd(void *mysysvar) +{ + /* Restore mysys_var that is changed when THD was attached.*/ + pthread_setspecific(THR_KEY_mysys, mysysvar); + /* Restore the THD (we assume it was NULL during attach).*/ + set_current_thd(0); +} + +/* + Destroy a THD that was previously created by + create_background_thd() +*/ +void destroy_background_thd(MYSQL_THD thd) +{ + DBUG_ASSERT(!current_thd); + auto thd_mysys_var= thd->mysys_var; + auto save_mysys_var= thd_attach_thd(thd); + DBUG_ASSERT(thd_mysys_var != save_mysys_var); + /* + Workaround the adverse effect decrementing thread_count on THD() + destructor. + As we decremented it in create_background_thd(), in order for it + not to go negative, we have to increment it before destructor. + */ + THD_count::count++; + delete thd; + + thd_detach_thd(save_mysys_var); + /* + Delete THD-specific my_thread_var, that was + allocated in create_background_thd(). + Also preserve current PSI context, since my_thread_end() + would kill it, if we're not careful. + */ +#ifdef HAVE_PSI_THREAD_INTERFACE + auto save_psi_thread= PSI_CALL_get_thread(); +#endif + PSI_CALL_set_thread(0); + pthread_setspecific(THR_KEY_mysys, thd_mysys_var); + my_thread_end(); + pthread_setspecific(THR_KEY_mysys, save_mysys_var); + PSI_CALL_set_thread(save_psi_thread); +} + + void reset_thd(MYSQL_THD thd) { close_thread_tables(thd); @@ -4939,6 +5071,55 @@ extern "C" size_t thd_query_safe(MYSQL_THD thd, char *buf, size_t buflen) } +extern "C" const char *thd_user_name(MYSQL_THD thd) +{ + if (!thd->security_ctx) + return 0; + + return thd->security_ctx->user; +} + + +extern "C" const char *thd_client_host(MYSQL_THD thd) +{ + if (!thd->security_ctx) + return 0; + + return thd->security_ctx->host; +} + + +extern "C" const char *thd_client_ip(MYSQL_THD thd) +{ + if (!thd->security_ctx) + return 0; + + return thd->security_ctx->ip; +} + + +extern "C" LEX_CSTRING *thd_current_db(MYSQL_THD thd) +{ + return &thd->db; +} + + +extern "C" int thd_current_status(MYSQL_THD thd) +{ + Diagnostics_area *da= thd->get_stmt_da(); + if (!da) + return 0; + + return da->is_error() ? da->sql_errno() : 0; +} + + +extern "C" enum enum_server_command thd_current_command(MYSQL_THD thd) +{ + return thd->get_command(); +} + + extern "C" int thd_slave_thread(const MYSQL_THD thd) { return(thd->slave_thread); @@ -5026,7 +5207,7 @@ thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd) if (!thd) return 0; DEBUG_SYNC(thd, "thd_report_wait_for"); - thd->transaction.stmt.mark_trans_did_wait(); + thd->transaction->stmt.mark_trans_did_wait(); if (!other_thd) return 0; binlog_report_wait_for(thd, other_thd); @@ -5141,7 +5322,7 @@ thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd) extern "C" int thd_non_transactional_update(const MYSQL_THD thd) { - return(thd->transaction.all.modified_non_trans_table); + return(thd->transaction->all.modified_non_trans_table); } extern "C" int thd_binlog_format(const MYSQL_THD thd) @@ -5287,6 +5468,18 @@ extern "C" void thd_wait_end(MYSQL_THD thd) #endif // INNODB_COMPATIBILITY_HOOKS */ + +/** + MDL_context accessor + @param thd the current session + @return pointer to thd->mdl_context +*/ +extern "C" void *thd_mdl_context(MYSQL_THD thd) +{ + return &thd->mdl_context; +} + + /**************************************************************************** Handling of statement states in functions and triggers. @@ -5338,7 +5531,7 @@ void THD::reset_sub_statement_state(Sub_statement_state *backup, backup->limit_found_rows= limit_found_rows; backup->cuted_fields= cuted_fields; backup->client_capabilities= client_capabilities; - backup->savepoints= transaction.savepoints; + backup->savepoints= transaction->savepoints; backup->first_successful_insert_id_in_prev_stmt= first_successful_insert_id_in_prev_stmt; backup->first_successful_insert_id_in_cur_stmt= @@ -5360,7 +5553,7 @@ void THD::reset_sub_statement_state(Sub_statement_state *backup, client_capabilities &= ~CLIENT_MULTI_RESULTS; in_sub_stmt|= new_state; cuted_fields= 0; - transaction.savepoints= 0; + transaction->savepoints= 0; first_successful_insert_id_in_cur_stmt= 0; reset_slow_query_state(); } @@ -5386,16 +5579,16 @@ void THD::restore_sub_statement_state(Sub_statement_state *backup) level. It is enough to release first savepoint set on this level since all later savepoints will be released automatically. */ - if (transaction.savepoints) + if (transaction->savepoints) { SAVEPOINT *sv; - for (sv= transaction.savepoints; sv->prev; sv= sv->prev) + for (sv= transaction->savepoints; sv->prev; sv= sv->prev) {} /* ha_release_savepoint() never returns error. */ (void)ha_release_savepoint(this, sv); } count_cuted_fields= backup->count_cuted_fields; - transaction.savepoints= backup->savepoints; + transaction->savepoints= backup->savepoints; variables.option_bits= backup->option_bits; in_sub_stmt= backup->in_sub_stmt; enable_slow_log= backup->enable_slow_log; @@ -5717,6 +5910,96 @@ void THD::mark_transaction_to_rollback(bool all) /** + Commit the whole transaction (both statment and all) + + This is used mainly to commit an independent transaction, + like reading system tables. + + @return 0 0k + @return <>0 error code. my_error() has been called() +*/ + +int THD::commit_whole_transaction_and_close_tables() +{ + int error, error2; + DBUG_ENTER("THD::commit_whole_transaction_and_close_tables"); + + /* + This can only happened if we failed to open any table in the + new transaction + */ + DBUG_ASSERT(open_tables); + + if (!open_tables) // Safety for production usage + DBUG_RETURN(0); + + /* + Ensure table was locked (opened with open_and_lock_tables()). If not + the THD can't be part of any transactions and doesn't have to call + this function. + */ + DBUG_ASSERT(lock); + + error= ha_commit_trans(this, FALSE); + /* This will call external_lock to unlock all tables */ + if ((error2= mysql_unlock_tables(this, lock))) + { + my_error(ER_ERROR_DURING_COMMIT, MYF(0), error2); + error= error2; + } + lock= 0; + if ((error2= ha_commit_trans(this, TRUE))) + error= error2; + close_thread_tables(this); + DBUG_RETURN(error); +} + +/** + Start a new independent transaction +*/ + +start_new_trans::start_new_trans(THD *thd) +{ + org_thd= thd; + mdl_savepoint= thd->mdl_context.mdl_savepoint(); + memcpy(old_ha_data, thd->ha_data, sizeof(old_ha_data)); + thd->reset_n_backup_open_tables_state(&open_tables_state_backup); + for (auto &data : thd->ha_data) + data.reset(); + old_transaction= thd->transaction; + thd->transaction= &new_transaction; + new_transaction.on= 1; + in_sub_stmt= thd->in_sub_stmt; + thd->in_sub_stmt= 0; + server_status= thd->server_status; + m_transaction_psi= thd->m_transaction_psi; + thd->m_transaction_psi= 0; + wsrep_on= thd->variables.wsrep_on; + thd->variables.wsrep_on= 0; + thd->server_status&= ~(SERVER_STATUS_IN_TRANS | + SERVER_STATUS_IN_TRANS_READONLY); + thd->server_status|= SERVER_STATUS_AUTOCOMMIT; +} + + +void start_new_trans::restore_old_transaction() +{ + org_thd->transaction= old_transaction; + org_thd->restore_backup_open_tables_state(&open_tables_state_backup); + ha_close_connection(org_thd); + memcpy(org_thd->ha_data, old_ha_data, sizeof(old_ha_data)); + org_thd->mdl_context.rollback_to_savepoint(mdl_savepoint); + org_thd->in_sub_stmt= in_sub_stmt; + org_thd->server_status= server_status; + if (org_thd->m_transaction_psi) + MYSQL_COMMIT_TRANSACTION(org_thd->m_transaction_psi); + org_thd->m_transaction_psi= m_transaction_psi; + org_thd->variables.wsrep_on= wsrep_on; + org_thd= 0; +} + + +/** Decide on logging format to use for the statement and issue errors or warnings as needed. The decision depends on the following parameters: @@ -5815,8 +6098,9 @@ int THD::decide_logging_format(TABLE_LIST *tables) { DBUG_ENTER("THD::decide_logging_format"); DBUG_PRINT("info", ("Query: %.*s", (uint) query_length(), query())); - DBUG_PRINT("info", ("variables.binlog_format: %lu", - variables.binlog_format)); + DBUG_PRINT("info", ("binlog_format: %lu", (ulong) variables.binlog_format)); + DBUG_PRINT("info", ("current_stmt_binlog_format: %lu", + (ulong) current_stmt_binlog_format)); DBUG_PRINT("info", ("lex->get_stmt_unsafe_flags(): 0x%x", lex->get_stmt_unsafe_flags())); @@ -5841,18 +6125,11 @@ int THD::decide_logging_format(TABLE_LIST *tables) DBUG_RETURN(-1); } } - - if ((WSREP_EMULATE_BINLOG_NNULL(this) || - (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG))) && - !(wsrep_binlog_format() == BINLOG_FORMAT_STMT && - !binlog_filter->db_ok(db.str))) -#else - if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) && - !(wsrep_binlog_format() == BINLOG_FORMAT_STMT && - !binlog_filter->db_ok(db.str))) #endif /* WITH_WSREP */ - { + if (WSREP_EMULATE_BINLOG_NNULL(this) || + binlog_table_should_be_logged(&db)) + { if (is_bulk_op()) { if (wsrep_binlog_format() == BINLOG_FORMAT_STMT) @@ -5895,6 +6172,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) bool has_auto_increment_write_tables_not_first= FALSE; bool found_first_not_own_table= FALSE; bool has_write_tables_with_unsafe_statements= FALSE; + bool blackhole_table_found= 0; /* A pointer to a previous table that was changed. @@ -6020,6 +6298,10 @@ int THD::decide_logging_format(TABLE_LIST *tables) if (prev_write_table && prev_write_table->file->ht != table->file->ht) multi_write_engine= TRUE; + + if (table->file->ht->db_type == DB_TYPE_BLACKHOLE_DB) + blackhole_table_found= 1; + if (share->non_determinstic_insert && !(sql_command_flags[lex->sql_command] & CF_SCHEMA_CHANGE)) has_write_tables_with_unsafe_statements= true; @@ -6265,7 +6547,8 @@ int THD::decide_logging_format(TABLE_LIST *tables) is_current_stmt_binlog_format_row() ? "ROW" : "STATEMENT")); - if (variables.binlog_format == BINLOG_FORMAT_ROW && + if (blackhole_table_found && + variables.binlog_format == BINLOG_FORMAT_ROW && (sql_command_flags[lex->sql_command] & (CF_UPDATES_DATA | CF_DELETES_DATA))) { @@ -6281,8 +6564,8 @@ int THD::decide_logging_format(TABLE_LIST *tables) if (table->table->file->ht->db_type == DB_TYPE_BLACKHOLE_DB && table->lock_type >= TL_WRITE_ALLOW_WRITE) { - table_names.append(&table->table_name); - table_names.append(","); + table_names.append(&table->table_name); + table_names.append(","); } } if (!table_names.is_empty()) @@ -6302,9 +6585,12 @@ int THD::decide_logging_format(TABLE_LIST *tables) table_names.c_ptr()); } } + + if (is_write && is_current_stmt_binlog_format_row()) + binlog_prepare_for_row_logging(); } -#ifndef DBUG_OFF else + { DBUG_PRINT("info", ("decision: no logging since " "mysql_bin_log.is_open() = %d " "and (options & OPTION_BIN_LOG) = 0x%llx " @@ -6314,22 +6600,23 @@ int THD::decide_logging_format(TABLE_LIST *tables) (variables.option_bits & OPTION_BIN_LOG), (uint) wsrep_binlog_format(), binlog_filter->db_ok(db.str))); -#endif - + if (WSREP_NNULL(this) && is_current_stmt_binlog_format_row()) + binlog_prepare_for_row_logging(); + } DBUG_RETURN(0); } int THD::decide_logging_format_low(TABLE *table) { + DBUG_ENTER("decide_logging_format_low"); /* - INSERT...ON DUPLICATE KEY UPDATE on a table with more than one unique keys - can be unsafe. - */ - if(wsrep_binlog_format() <= BINLOG_FORMAT_STMT && - !is_current_stmt_binlog_format_row() && - !lex->is_stmt_unsafe() && - lex->sql_command == SQLCOM_INSERT && - lex->duplicates == DUP_UPDATE) + INSERT...ON DUPLICATE KEY UPDATE on a table with more than one unique keys + can be unsafe. + */ + if (wsrep_binlog_format() <= BINLOG_FORMAT_STMT && + !is_current_stmt_binlog_format_row() && + !lex->is_stmt_unsafe() && + lex->duplicates == DUP_UPDATE) { uint unique_keys= 0; uint keys= table->s->keys, i= 0; @@ -6356,19 +6643,29 @@ exit:; lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_TWO_KEYS); binlog_unsafe_warning_flags|= lex->get_stmt_unsafe_flags(); set_current_stmt_binlog_format_row_if_mixed(); - return 1; + if (is_current_stmt_binlog_format_row()) + binlog_prepare_for_row_logging(); + DBUG_RETURN(1); } } - return 0; + DBUG_RETURN(0); } -/* - Implementation of interface to write rows to the binary log through the - thread. The thread is responsible for writing the rows it has - inserted/updated/deleted. +#ifndef MYSQL_CLIENT +/** + Check if we should log a table DDL to the binlog + + @retval true yes + @retval false no */ -#ifndef MYSQL_CLIENT +bool THD::binlog_table_should_be_logged(const LEX_CSTRING *db) +{ + return (mysql_bin_log.is_open() && + (variables.option_bits & OPTION_BIN_LOG) && + (wsrep_binlog_format() != BINLOG_FORMAT_STMT || + binlog_filter->db_ok(db->str))); +} /* Template member function for ensuring that there is an rows log @@ -6376,7 +6673,7 @@ exit:; PRE CONDITION: - Events of type 'RowEventT' have the type code 'type_code'. - + POST CONDITION: If a non-NULL pointer is returned, the pending event for thread 'thd' will be an event of type 'RowEventT' (which have the type code 'type_code') @@ -6569,7 +6866,8 @@ CPP_UNNAMED_NS_START } else { - m_memory= (uchar *) my_malloc(total_length, MYF(MY_WME)); + m_memory= (uchar *) my_malloc(key_memory_Row_data_memory_memory, + total_length, MYF(MY_WME)); m_release_memory_on_destruction= TRUE; } } @@ -6788,12 +7086,13 @@ void THD::binlog_prepare_row_images(TABLE *table) /** if there is a primary key in the table (ie, user declared PK or a - non-null unique index) and we dont want to ship the entire image, + non-null unique index) and we don't want to ship the entire image, and the handler involved supports this. */ if (table->s->primary_key < MAX_KEY && (thd->variables.binlog_row_image < BINLOG_ROW_IMAGE_FULL) && - !ha_check_storage_engine_flag(table->s->db_type(), HTON_NO_BINLOG_ROW_OPT)) + !ha_check_storage_engine_flag(table->s->db_type(), + HTON_NO_BINLOG_ROW_OPT)) { /** Just to be sure that tmp_set is currently not in use as @@ -6838,7 +7137,7 @@ void THD::binlog_prepare_row_images(TABLE *table) -int THD::binlog_remove_pending_rows_event(bool clear_maps, +int THD::binlog_remove_pending_rows_event(bool reset_stmt, bool is_transactional) { DBUG_ENTER("THD::binlog_remove_pending_rows_event"); @@ -6852,12 +7151,12 @@ int THD::binlog_remove_pending_rows_event(bool clear_maps, mysql_bin_log.remove_pending_rows_event(this, is_transactional); - if (clear_maps) - binlog_table_maps= 0; - + if (reset_stmt) + reset_binlog_for_next_statement(); DBUG_RETURN(0); } + int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional) { DBUG_ENTER("THD::binlog_flush_pending_rows_event"); @@ -6883,9 +7182,8 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional) if (stmt_end) { pending->set_flags(Rows_log_event::STMT_END_F); - binlog_table_maps= 0; + reset_binlog_for_next_statement(); } - error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0, is_transactional); } @@ -7245,20 +7543,23 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, log event is written to the binary log, we pretend that no table maps were written. */ - if(binlog_should_compress(query_len)) + if (binlog_should_compress(query_len)) { - Query_compressed_log_event qinfo(this, query_arg, query_len, is_trans, direct, - suppress_use, errcode); + Query_compressed_log_event qinfo(this, query_arg, query_len, is_trans, + direct, suppress_use, errcode); error= mysql_bin_log.write(&qinfo); } else { Query_log_event qinfo(this, query_arg, query_len, is_trans, direct, - suppress_use, errcode); + suppress_use, errcode); error= mysql_bin_log.write(&qinfo); } + /* + row logged binlog may not have been reset in the case of locked tables + */ + reset_binlog_for_next_statement(); - binlog_table_maps= 0; DBUG_RETURN(error >= 0 ? error : 1); } @@ -7269,6 +7570,38 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, DBUG_RETURN(0); } + +/** + Binlog current query as a statement, ignoring the binlog filter setting. + + The filter is in decide_logging_format() to mark queries to not be stored + in the binary log, for example by a shared distributed engine like S3. + This function resets the filter to ensure the the query is logged if + the binlog is active. + + Note that 'direct' is set to false, which means that the query will + not be directly written to the binary log but instead to the cache. + + @retval false ok + @retval true error +*/ + + +bool THD::binlog_current_query_unfiltered() +{ + if (!mysql_bin_log.is_open()) + return 0; + + reset_binlog_local_stmt_filter(); + clear_binlog_local_stmt_filter(); + return binlog_query(THD::STMT_QUERY_TYPE, query(), query_length(), + /* is_trans */ FALSE, + /* direct */ FALSE, + /* suppress_use */ FALSE, + /* Error */ 0) > 0; +} + + void THD::wait_for_wakeup_ready() { @@ -7294,11 +7627,10 @@ void THD::set_last_commit_gtid(rpl_gtid >id) #endif m_last_commit_gtid= gtid; #ifndef EMBEDDED_LIBRARY - if (changed_gtid && session_tracker.sysvars.is_enabled()) + if (changed_gtid) { DBUG_ASSERT(current_thd == this); - session_tracker.sysvars. - mark_as_changed(this, (LEX_CSTRING*)Sys_last_gtid_ptr); + session_tracker.sysvars.mark_as_changed(this, Sys_last_gtid_ptr); } #endif } @@ -7369,6 +7701,7 @@ wait_for_commit::~wait_for_commit() mysql_cond_destroy(&COND_wait_commit); } + void wait_for_commit::wakeup(int wakeup_error) { @@ -7776,3 +8109,8 @@ bool THD::timestamp_to_TIME(MYSQL_TIME *ltime, my_time_t ts, } return 0; } + +THD_list_iterator *THD_list_iterator::iterator() +{ + return &server_threads; +} |