diff options
Diffstat (limited to 'sql/sql_class.cc')
-rw-r--r-- | sql/sql_class.cc | 685 |
1 files changed, 280 insertions, 405 deletions
diff --git a/sql/sql_class.cc b/sql/sql_class.cc index c4905fcabf6..0d61a38e918 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -49,9 +49,6 @@ #include <m_ctype.h> #include <sys/stat.h> #include <thr_alarm.h> -#ifdef __WIN__0 -#include <io.h> -#endif #include <mysys_err.h> #include <limits.h> @@ -66,14 +63,18 @@ #include "sql_callback.h" #include "lock.h" #include "wsrep_mysqld.h" -#include "wsrep_thd.h" #include "sql_connect.h" -#include "my_atomic.h" +#ifdef WITH_WSREP +#include "wsrep_thd.h" +#include "wsrep_trans_observer.h" +#else +static inline bool wsrep_is_bf_aborted(THD* thd) { return false; } +#endif /* WITH_WSREP */ +#include "opt_trace.h" #ifdef HAVE_SYS_SYSCALL_H #include <sys/syscall.h> #endif -#include "repl_failsafe.h" /* The following is used to initialise Table_ident with a internal @@ -647,17 +648,50 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) tdc_hash_pins(0), xid_hash_pins(0), m_tmp_tables_locked(false) +#ifdef HAVE_REPLICATION + , + current_linfo(0), + slave_info(0) +#endif #ifdef WITH_WSREP - , + , wsrep_applier(is_wsrep_applier), wsrep_applier_closing(false), wsrep_client_thread(false), - wsrep_apply_toi(false), + wsrep_retry_counter(0), + wsrep_PA_safe(true), + wsrep_retry_query(NULL), + wsrep_retry_query_len(0), + wsrep_retry_command(COM_CONNECT), + wsrep_consistency_check(NO_CONSISTENCY_CHECK), + wsrep_mysql_replicated(0), + wsrep_TOI_pre_query(NULL), + wsrep_TOI_pre_query_len(0), wsrep_po_handle(WSREP_PO_INITIALIZER), wsrep_po_cnt(0), wsrep_apply_format(0), - wsrep_ignore_table(false) -#endif + wsrep_rbr_buf(NULL), + wsrep_sync_wait_gtid(WSREP_GTID_UNDEFINED), + wsrep_affected_rows(0), + wsrep_has_ignored_error(false), + wsrep_replicate_GTID(false), + wsrep_ignore_table(false), + wsrep_aborter(0), + +/* wsrep-lib */ + m_wsrep_next_trx_id(WSREP_UNDEFINED_TRX_ID), + m_wsrep_mutex(LOCK_thd_data), + m_wsrep_cond(COND_wsrep_thd), + m_wsrep_client_service(this, m_wsrep_client_state), + m_wsrep_client_state(this, + m_wsrep_mutex, + m_wsrep_cond, + Wsrep_server_state::instance(), + m_wsrep_client_service, + wsrep::client_id(thread_id)), + wsrep_applier_service(NULL), + wsrep_wfc() +#endif /*WITH_WSREP */ { ulong tmp; bzero(&variables, sizeof(variables)); @@ -676,6 +710,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) main_da.init(); mdl_context.init(this); + mdl_backup_lock= 0; /* Pass nominal parameters to init_alloc_root only to ensure that @@ -725,7 +760,6 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) progress.arena= 0; progress.report_to_client= 0; progress.max_counter= 0; - current_linfo = 0; slave_thread = 0; connection_name.str= 0; connection_name.length= 0; @@ -750,7 +784,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) net.reading_or_writing= 0; client_capabilities= 0; // minimalistic client system_thread= NON_SYSTEM_THREAD; - cleanup_done= free_connection_done= abort_on_warning= 0; + cleanup_done= free_connection_done= abort_on_warning= got_warning= 0; peer_port= 0; // For SHOW PROCESSLIST transaction.m_pending_rows_event= 0; transaction.on= 1; @@ -765,11 +799,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) mysql_mutex_init(key_LOCK_wakeup_ready, &LOCK_wakeup_ready, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_thd_kill, &LOCK_thd_kill, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wakeup_ready, &COND_wakeup_ready, 0); - /* - LOCK_thread_count goes before LOCK_thd_data - the former is called around - 'delete thd', the latter - in THD::~THD - */ - mysql_mutex_record_order(&LOCK_thread_count, &LOCK_thd_data); + mysql_mutex_record_order(&LOCK_thd_kill, &LOCK_thd_data); /* Variables with default values */ proc_info="login"; @@ -779,23 +809,8 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) *scramble= '\0'; #ifdef WITH_WSREP - wsrep_ws_handle.trx_id = WSREP_UNDEFINED_TRX_ID; - wsrep_ws_handle.opaque = NULL; - wsrep_retry_counter = 0; - wsrep_PA_safe = true; - wsrep_retry_query = NULL; - wsrep_retry_query_len = 0; - wsrep_retry_command = COM_CONNECT; - wsrep_consistency_check = NO_CONSISTENCY_CHECK; - wsrep_mysql_replicated = 0; - wsrep_TOI_pre_query = NULL; - wsrep_TOI_pre_query_len = 0; + mysql_cond_init(key_COND_wsrep_thd, &COND_wsrep_thd, NULL); wsrep_info[sizeof(wsrep_info) - 1] = '\0'; /* make sure it is 0-terminated */ - wsrep_sync_wait_gtid = WSREP_GTID_UNDEFINED; - wsrep_affected_rows = 0; - wsrep_replicate_GTID = false; - wsrep_skip_wsrep_GTID = false; - wsrep_split_flag = false; #endif /* Call to init() below requires fully initialized Open_tables_state. */ reset_open_tables_state(this); @@ -859,7 +874,6 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) org_charset= 0; /* Restore THR_THD */ set_current_thd(old_THR_THD); - inc_thread_count(); } @@ -1013,6 +1027,15 @@ Sql_condition* THD::raise_condition(uint sql_errno, if (!(variables.option_bits & OPTION_SQL_NOTES) && (level == Sql_condition::WARN_LEVEL_NOTE)) DBUG_RETURN(NULL); +#ifdef WITH_WSREP + /* + Suppress warnings/errors if the wsrep THD is going to replay. The + deadlock/interrupted errors may be transitient and should not be + reported to the client. + */ + if (wsrep_must_replay(this)) + DBUG_RETURN(NULL); +#endif /* WITH_WSREP */ da->opt_clear_warning_info(query_id); @@ -1038,7 +1061,8 @@ Sql_condition* THD::raise_condition(uint sql_errno, level= Sql_condition::WARN_LEVEL_ERROR; } - if (handle_condition(sql_errno, sqlstate, &level, msg, &cond)) + if (!is_fatal_error && + handle_condition(sql_errno, sqlstate, &level, msg, &cond)) DBUG_RETURN(cond); switch (level) { @@ -1059,10 +1083,25 @@ Sql_condition* THD::raise_condition(uint sql_errno, is_slave_error= 1; // needed to catch query errors during replication - if (!da->is_error()) +#ifdef WITH_WSREP + /* + With wsrep we allow converting BF abort error to warning if + errors are ignored. + */ + if (!is_fatal_error && + no_errors && + (wsrep_trx().bf_aborted() || wsrep_retry_counter)) + { + WSREP_DEBUG("BF abort error converted to warning"); + } + else +#endif /* WITH_WSREP */ { - set_row_count_func(-1); - da->set_error_status(sql_errno, msg, sqlstate, ucid, cond); + if (!da->is_error()) + { + set_row_count_func(-1); + da->set_error_status(sql_errno, msg, sqlstate, ucid, cond); + } } } @@ -1123,7 +1162,16 @@ void *thd_memdup(MYSQL_THD thd, const void* str, size_t size) extern "C" void thd_get_xid(const MYSQL_THD thd, MYSQL_XID *xid) { - *xid = *(MYSQL_XID *) &thd->transaction.xid_state.xid; +#ifdef WITH_WSREP + if (!thd->wsrep_xid.is_null()) + { + *xid = *(MYSQL_XID *) &thd->wsrep_xid; + return; + } +#endif /* WITH_WSREP */ + *xid= thd->transaction.xid_state.is_explicit_XA() ? + *(MYSQL_XID *) thd->transaction.xid_state.get_xid() : + *(MYSQL_XID *) &thd->transaction.implicit_xid; } @@ -1225,13 +1273,12 @@ void THD::init() first_successful_insert_id_in_prev_stmt= 0; first_successful_insert_id_in_prev_stmt_for_binlog= 0; first_successful_insert_id_in_cur_stmt= 0; + current_backup_stage= BACKUP_FINISHED; + backup_commit_lock= 0; #ifdef WITH_WSREP - wsrep_exec_mode= wsrep_applier ? REPL_RECV : LOCAL_STATE; - wsrep_conflict_state= NO_CONFLICT; - wsrep_thd_set_query_state(this, QUERY_IDLE); wsrep_last_query_id= 0; - wsrep_trx_meta.gtid= WSREP_GTID_UNDEFINED; - wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED; + wsrep_xid.null(); + wsrep_skip_locking= FALSE; wsrep_converted_lock_session= false; wsrep_retry_counter= 0; wsrep_rgi= NULL; @@ -1240,11 +1287,11 @@ void THD::init() wsrep_mysql_replicated = 0; wsrep_TOI_pre_query = NULL; wsrep_TOI_pre_query_len = 0; - wsrep_sync_wait_gtid = WSREP_GTID_UNDEFINED; + wsrep_rbr_buf = NULL; wsrep_affected_rows = 0; + m_wsrep_next_trx_id = WSREP_UNDEFINED_TRX_ID; wsrep_replicate_GTID = false; - wsrep_skip_wsrep_GTID = false; - wsrep_split_flag = false; + wsrep_aborter = 0; #endif /* WITH_WSREP */ if (variables.sql_log_bin) @@ -1338,14 +1385,20 @@ void THD::update_all_stats() void THD::init_for_queries() { set_time(); - ha_enable_transaction(this,TRUE); + /* + We don't need to call ha_enable_transaction() as we can't have + any active transactions that has to be committed + */ + DBUG_ASSERT(transaction.is_empty()); + transaction.on= TRUE; reset_root_defaults(mem_root, variables.query_alloc_block_size, variables.query_prealloc_size); reset_root_defaults(&transaction.mem_root, variables.trans_alloc_block_size, variables.trans_prealloc_size); - transaction.xid_state.xid.null(); + DBUG_ASSERT(!transaction.xid_state.is_explicit_XA()); + DBUG_ASSERT(transaction.implicit_xid.is_null()); } @@ -1387,6 +1440,7 @@ void THD::change_user(void) sp_cache_clear(&sp_func_cache); sp_cache_clear(&sp_package_spec_cache); sp_cache_clear(&sp_package_body_cache); + opt_trace.delete_traces(); } /** @@ -1418,7 +1472,7 @@ bool THD::set_db(const LEX_CSTRING *new_db) const char *tmp= NULL; if (new_db->str) { - if (!(tmp= my_strndup(new_db->str, new_db->length, MYF(MY_WME | ME_FATALERROR)))) + if (!(tmp= my_strndup(new_db->str, new_db->length, MYF(MY_WME | ME_FATAL)))) result= 1; } @@ -1467,12 +1521,11 @@ void THD::cleanup(void) DBUG_ASSERT(cleanup_done == 0); set_killed(KILL_CONNECTION); -#ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE - if (transaction.xid_state.xa_state == XA_PREPARED) - { -#error xid_state in the cache should be replaced by the allocated value - } -#endif +#ifdef WITH_WSREP + if (wsrep_cs().state() != wsrep::client_state::s_none) + wsrep_cs().cleanup(); + wsrep_client_thread= false; +#endif /* WITH_WSREP */ mysql_ha_cleanup(this); locked_tables_list.unlock_locked_tables(this); @@ -1480,10 +1533,10 @@ void THD::cleanup(void) delete_dynamic(&user_var_events); close_temporary_tables(); - transaction.xid_state.xa_state= XA_NOTR; - transaction.xid_state.rm_error= 0; - trans_rollback(this); - xid_cache_delete(this, &transaction.xid_state); + if (transaction.xid_state.is_explicit_XA()) + trans_xa_detach(this); + else + trans_rollback(this); DBUG_ASSERT(open_tables == NULL); /* @@ -1492,7 +1545,10 @@ void THD::cleanup(void) and left the mode a few lines above), there will be outstanding metadata locks. Release them. */ - mdl_context.release_transactional_locks(); + mdl_context.release_transactional_locks(this); + + backup_end(this); + backup_unlock(this); /* Release the global read lock, if acquired. */ if (global_read_lock.is_acquired()) @@ -1526,9 +1582,8 @@ void THD::cleanup(void) apc_target.destroy(); #ifdef HAVE_REPLICATION - unregister_slave(this, true, true); + unregister_slave(); #endif - cleanup_done=1; DBUG_VOID_RETURN; } @@ -1549,7 +1604,7 @@ void THD::free_connection() #ifndef EMBEDDED_LIBRARY if (net.vio) vio_delete(net.vio); - net.vio= 0; + net.vio= nullptr; net_end(&net); #endif if (!cleanup_done) @@ -1588,12 +1643,16 @@ void THD::reset_for_reuse() abort_on_warning= 0; free_connection_done= 0; m_command= COM_CONNECT; + transaction.on= 1; #if defined(ENABLED_PROFILING) profiling.reset(); #endif #ifdef SIGNAL_WITH_VIO_CLOSE active_vio = 0; #endif +#ifdef WITH_WSREP + wsrep_free_status(this); +#endif /* WITH_WSREP */ } @@ -1602,10 +1661,8 @@ THD::~THD() THD *orig_thd= current_thd; THD_CHECK_SENTRY(this); DBUG_ENTER("~THD()"); - /* Check that we have already called thd->unlink() */ - DBUG_ASSERT(prev == 0 && next == 0); - /* This takes a long time so we should not do this under LOCK_thread_count */ - mysql_mutex_assert_not_owner(&LOCK_thread_count); + /* Make sure threads are not available via server_threads. */ + assert_not_linked(); /* In error cases, thd may not be current thd. We have to fix this so @@ -1629,6 +1686,9 @@ THD::~THD() if (!free_connection_done) free_connection(); +#ifdef WITH_WSREP + mysql_cond_destroy(&COND_wsrep_thd); +#endif mdl_context.destroy(); free_root(&transaction.mem_root,MYF(0)); @@ -1680,7 +1740,6 @@ THD::~THD() } update_global_memory_status(status_var.global_memory_used); set_current_thd(orig_thd == this ? 0 : orig_thd); - dec_thread_count(); DBUG_VOID_RETURN; } @@ -1810,6 +1869,7 @@ void THD::awake_no_mutex(killed_state state_to_set) DBUG_PRINT("enter", ("this: %p current_thd: %p state: %d", this, current_thd, (int) state_to_set)); THD_CHECK_SENTRY(this); + mysql_mutex_assert_owner(&LOCK_thd_data); mysql_mutex_assert_owner(&LOCK_thd_kill); print_aborted_warning(3, "KILLED"); @@ -1842,14 +1902,21 @@ void THD::awake_no_mutex(killed_state state_to_set) } /* Interrupt target waiting inside a storage engine. */ - if (state_to_set != NOT_KILLED) + if (state_to_set != NOT_KILLED && !wsrep_is_bf_aborted(this)) ha_kill_query(this, thd_kill_level(this)); - /* Broadcast a condition to kick the target if it is waiting on it. */ + abort_current_cond_wait(false); + DBUG_VOID_RETURN; +} + +/* Broadcast a condition to kick the target if it is waiting on it. */ +void THD::abort_current_cond_wait(bool force) +{ + mysql_mutex_assert_owner(&LOCK_thd_kill); if (mysys_var) { mysql_mutex_lock(&mysys_var->mutex); - if (!system_thread) // Don't abort locks + if (!system_thread || force) // Don't abort locks mysys_var->abort=1; /* @@ -1907,7 +1974,6 @@ void THD::awake_no_mutex(killed_state state_to_set) } mysql_mutex_unlock(&mysys_var->mutex); } - DBUG_VOID_RETURN; } @@ -1961,16 +2027,7 @@ bool THD::notify_shared_lock(MDL_context_owner *ctx_in_use, mysql_mutex_lock(&in_use->LOCK_thd_kill); if (in_use->killed < KILL_CONNECTION) in_use->set_killed_no_mutex(KILL_CONNECTION); - if (in_use->mysys_var) - { - mysql_mutex_lock(&in_use->mysys_var->mutex); - if (in_use->mysys_var->current_cond) - mysql_cond_broadcast(in_use->mysys_var->current_cond); - - /* Abort if about to wait in thr_upgrade_write_delay_lock */ - in_use->mysys_var->abort= 1; - mysql_mutex_unlock(&in_use->mysys_var->mutex); - } + in_use->abort_current_cond_wait(true); mysql_mutex_unlock(&in_use->LOCK_thd_kill); signalled= TRUE; } @@ -1995,12 +2052,6 @@ bool THD::notify_shared_lock(MDL_context_owner *ctx_in_use, if (!thd_table->needs_reopen()) { signalled|= mysql_lock_abort_for_thread(this, thd_table); - if (WSREP(this) && wsrep_thd_is_BF(this, FALSE)) - { - WSREP_DEBUG("remove_table_from_cache: %llu", - (unsigned long long) this->real_id); - wsrep_abort_thd((void *)this, (void *)in_use, FALSE); - } } } } @@ -2071,11 +2122,19 @@ void THD::reset_killed() DBUG_ENTER("reset_killed"); if (killed != NOT_KILLED) { + mysql_mutex_assert_not_owner(&LOCK_thd_kill); mysql_mutex_lock(&LOCK_thd_kill); killed= NOT_KILLED; killed_err= 0; mysql_mutex_unlock(&LOCK_thd_kill); } +#ifdef WITH_WSREP + mysql_mutex_assert_not_owner(&LOCK_thd_data); + mysql_mutex_lock(&LOCK_thd_data); + wsrep_aborter= 0; + mysql_mutex_unlock(&LOCK_thd_data); +#endif /* WITH_WSREP */ + DBUG_VOID_RETURN; } @@ -2232,12 +2291,6 @@ void THD::cleanup_after_query() /* reset table map for multi-table update */ table_map_for_update= 0; m_binlog_invoker= INVOKER_NONE; -#ifdef WITH_WSREP - if (TOTAL_ORDER == wsrep_exec_mode) - { - wsrep_exec_mode = LOCAL_STATE; - } -#endif /* WITH_WSREP */ #ifndef EMBEDDED_LIBRARY if (rgi_slave) @@ -2245,7 +2298,6 @@ void THD::cleanup_after_query() #endif #ifdef WITH_WSREP - wsrep_sync_wait_gtid= WSREP_GTID_UNDEFINED; if (!in_active_multi_stmt_transaction()) wsrep_affected_rows= 0; #endif /* WITH_WSREP */ @@ -2505,6 +2557,16 @@ void THD::update_charset() ¬_used); } +void THD::give_protection_error() +{ + if (current_backup_stage != BACKUP_FINISHED) + my_error(ER_BACKUP_LOCK_IS_ACTIVE, MYF(0)); + else + { + DBUG_ASSERT(global_read_lock.is_acquired() || mdl_backup_lock); + my_error(ER_CANT_UPDATE_WITH_READLOCK, MYF(0)); + } +} /* routings to adding tables to list of changed in transaction tables */ @@ -2581,7 +2643,7 @@ CHANGED_TABLE_LIST* THD::changed_table_dup(const char *key, size_t key_length) key_length + 1); if (!new_table) { - my_error(EE_OUTOFMEMORY, MYF(ME_BELL+ME_FATALERROR), + my_error(EE_OUTOFMEMORY, MYF(ME_FATAL), ALIGN_SIZE(sizeof(TABLE_LIST)) + key_length + 1); set_killed(KILL_CONNECTION); return 0; @@ -2685,13 +2747,13 @@ void THD::make_explain_field_list(List<Item> &field_list, uint8 explain_flags, NAME_CHAR_LEN*MAX_REF_PARTS, cs), mem_root); item->maybe_null=1; - field_list.push_back(item= new (mem_root) - Item_return_int(this, "rows", 10, MYSQL_TYPE_LONGLONG), + field_list.push_back(item=new (mem_root) + Item_empty_string(this, "rows", NAME_CHAR_LEN, cs), mem_root); if (is_analyze) { field_list.push_back(item= new (mem_root) - Item_float(this, "r_rows", 0.1234, 2, 4), + Item_empty_string(this, "r_rows", NAME_CHAR_LEN, cs), mem_root); item->maybe_null=1; } @@ -2950,7 +3012,8 @@ int select_send::send_data(List<Item> &items) thd->inc_sent_row_count(1); - if (thd->vio_ok()) + /* Don't return error if disconnected, only if write fails */ + if (likely(thd->vio_ok())) DBUG_RETURN(protocol->write()); DBUG_RETURN(0); @@ -3233,9 +3296,9 @@ int select_export::send_data(List<Item> &items) ((uint64) res->length() / res->charset()->mbminlen + 1) * write_cs->mbmaxlen + 1; set_if_smaller(estimated_bytes, UINT_MAX32); - if (cvt_str.realloc((uint32) estimated_bytes)) + if (cvt_str.alloc((uint32) estimated_bytes)) { - my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), (uint32) estimated_bytes); + my_error(ER_OUTOFMEMORY, MYF(ME_FATAL), (uint32) estimated_bytes); goto err; } @@ -3502,7 +3565,7 @@ int select_singlerow_subselect::send_data(List<Item> &items) if (it->assigned()) { my_message(ER_SUBQUERY_NO_1_ROW, ER_THD(thd, ER_SUBQUERY_NO_1_ROW), - MYF(current_thd->lex->ignore ? ME_JUST_WARNING : 0)); + MYF(current_thd->lex->ignore ? ME_WARNING : 0)); DBUG_RETURN(1); } if (unit->offset_limit_cnt) @@ -3609,18 +3672,15 @@ bool select_max_min_finder_subselect::cmp_int() bool select_max_min_finder_subselect::cmp_decimal() { Item *maxmin= ((Item_singlerow_subselect *)item)->element_index(0); - my_decimal cval, *cvalue= cache->val_decimal(&cval); - my_decimal mval, *mvalue= maxmin->val_decimal(&mval); + VDec cvalue(cache), mvalue(maxmin); /* Ignore NULLs for ANY and keep them for ALL subqueries */ - if (cache->null_value) - return (is_all && !maxmin->null_value) || (!is_all && maxmin->null_value); - if (maxmin->null_value) + if (cvalue.is_null()) + return (is_all && !mvalue.is_null()) || (!is_all && mvalue.is_null()); + if (mvalue.is_null()) return !is_all; - if (fmax) - return (my_decimal_cmp(cvalue, mvalue) > 0) ; - return (my_decimal_cmp(cvalue,mvalue) < 0); + return fmax ? cvalue.cmp(mvalue) > 0 : cvalue.cmp(mvalue) < 0; } bool select_max_min_finder_subselect::cmp_str() @@ -4253,6 +4313,7 @@ void Security_context::init() host_or_ip= "connecting host"; priv_user[0]= priv_host[0]= proxy_user[0]= priv_role[0]= '\0'; master_access= 0; + password_expired= false; #ifndef NO_EMBEDDED_ACCESS_CHECKS db_access= NO_ACCESS; #endif @@ -4291,6 +4352,7 @@ void Security_context::skip_grants() host_or_ip= (char *)""; master_access= ~NO_ACCESS; *priv_user= *priv_host= '\0'; + password_expired= false; } @@ -4301,6 +4363,13 @@ bool Security_context::set_user(char *user_arg) return user == 0; } +bool Security_context::check_access(ulong want_access, bool match_any) +{ + DBUG_ENTER("Security_context::check_access"); + DBUG_RETURN((match_any ? (master_access & want_access) + : ((master_access & want_access) == want_access))); +} + #ifndef NO_EMBEDDED_ACCESS_CHECKS /** Initialize this security context from the passed in credentials @@ -4402,6 +4471,13 @@ bool Security_context::user_matches(Security_context *them) !strcmp(user, them->user)); } +bool Security_context::is_priv_user(const char *user, const char *host) +{ + return ((user != NULL) && (host != NULL) && + !strcmp(user, priv_user) && + !my_strcasecmp(system_charset_info, host,priv_host)); +} + /**************************************************************************** Handling of open and locked tables states. @@ -4728,14 +4804,14 @@ MYSQL_THD create_thd() thd->set_command(COM_DAEMON); thd->system_thread= SYSTEM_THREAD_GENERIC; thd->security_ctx->host_or_ip=""; - add_to_active_threads(thd); + server_threads.insert(thd); return thd; } void destroy_thd(MYSQL_THD thd) { thd->add_status_to_global(); - unlink_not_visible_thd(thd); + server_threads.erase(thd); delete thd; } @@ -4983,13 +5059,13 @@ thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd) #ifdef WITH_WSREP /* wsrep applier, replayer and TOI processing threads are ordered by replication provider, relaxed GAP locking protocol can be used - between high priority wsrep threads. Note that this function - is called while holding lock_sys mutex, therefore we can't - use THD::LOCK_thd_data mutex below to follow mutex ordering rules. + between high priority wsrep threads. + Note that wsrep_thd_is_BF() doesn't take LOCK_thd_data for either thd, + the caller should guarantee that the BF state won't change. + (e.g. InnoDB does it by keeping lock_sys.mutex locked) */ - if (WSREP_ON && - wsrep_thd_is_BF(const_cast<THD *>(thd), false) && - wsrep_thd_is_BF(const_cast<THD *>(other_thd), false)) + if (WSREP_ON && wsrep_thd_is_BF(thd, false) && + wsrep_thd_is_BF(other_thd, false)) return 0; #endif /* WITH_WSREP */ rgi= thd->rgi_slave; @@ -5024,8 +5100,9 @@ extern "C" int thd_binlog_format(const MYSQL_THD thd) if (WSREP(thd)) { /* for wsrep binlog format is meaningful also when binlogging is off */ - return (int) thd->wsrep_binlog_format(); + return (int) WSREP_BINLOG_FORMAT(thd->variables.binlog_format); } + if (mysql_bin_log.is_open() && (thd->variables.option_bits & OPTION_BIN_LOG)) return (int) thd->variables.binlog_format; return BINLOG_FORMAT_UNSPEC; @@ -5508,6 +5585,10 @@ void THD::set_query_and_id(char *query_arg, uint32 query_length_arg, set_query_inner(query_arg, query_length_arg, cs); mysql_mutex_unlock(&LOCK_thd_data); query_id= new_query_id; +#ifdef WITH_WSREP + set_wsrep_next_trx_id(query_id); + WSREP_DEBUG("assigned new next query and trx id: %" PRIu64, wsrep_next_trx_id()); +#endif /* WITH_WSREP */ } /** Assign a new value to thd->mysys_var. */ @@ -5527,6 +5608,7 @@ void THD::leave_locked_tables_mode() { if (locked_tables_mode == LTM_LOCK_TABLES) { + DBUG_ASSERT(current_backup_stage == BACKUP_FINISHED); /* When leaving LOCK TABLES mode we have to change the duration of most of the metadata locks being held, except for HANDLER and GRL locks, @@ -5559,7 +5641,7 @@ void THD::get_definer(LEX_USER *definer, bool role) { definer->user= invoker.user; definer->host= invoker.host; - definer->reset_auth(); + definer->auth= NULL; } else #endif @@ -5583,263 +5665,6 @@ void THD::mark_transaction_to_rollback(bool all) is_fatal_sub_stmt_error= true; transaction_rollback_request= all; } -/*************************************************************************** - Handling of XA id cacheing -***************************************************************************/ -class XID_cache_element -{ - /* - m_state is used to prevent elements from being deleted while XA RECOVER - iterates xid cache and to prevent recovered elments from being acquired by - multiple threads. - - bits 1..29 are reference counter - bit 30 is RECOVERED flag - bit 31 is ACQUIRED flag (thread owns this xid) - bit 32 is unused - - Newly allocated and deleted elements have m_state set to 0. - - On lock() m_state is atomically incremented. It also creates load-ACQUIRE - memory barrier to make sure m_state is actually updated before furhter - memory accesses. Attempting to lock an element that has neither ACQUIRED - nor RECOVERED flag set returns failure and further accesses to element - memory are forbidden. - - On unlock() m_state is decremented. It also creates store-RELEASE memory - barrier to make sure m_state is actually updated after preceding memory - accesses. - - ACQUIRED flag is set when thread registers it's xid or when thread acquires - recovered xid. - - RECOVERED flag is set for elements found during crash recovery. - - ACQUIRED and RECOVERED flags are cleared before element is deleted from - hash in a spin loop, after last reference is released. - */ - int32 m_state; -public: - static const int32 ACQUIRED= 1 << 30; - static const int32 RECOVERED= 1 << 29; - XID_STATE *m_xid_state; - bool is_set(int32 flag) - { return my_atomic_load32_explicit(&m_state, MY_MEMORY_ORDER_RELAXED) & flag; } - void set(int32 flag) - { - DBUG_ASSERT(!is_set(ACQUIRED | RECOVERED)); - my_atomic_add32_explicit(&m_state, flag, MY_MEMORY_ORDER_RELAXED); - } - bool lock() - { - int32 old= my_atomic_add32_explicit(&m_state, 1, MY_MEMORY_ORDER_ACQUIRE); - if (old & (ACQUIRED | RECOVERED)) - return true; - unlock(); - return false; - } - void unlock() - { my_atomic_add32_explicit(&m_state, -1, MY_MEMORY_ORDER_RELEASE); } - void mark_uninitialized() - { - int32 old= ACQUIRED; - while (!my_atomic_cas32_weak_explicit(&m_state, &old, 0, - MY_MEMORY_ORDER_RELAXED, - MY_MEMORY_ORDER_RELAXED)) - { - old&= ACQUIRED | RECOVERED; - (void) LF_BACKOFF(); - } - } - bool acquire_recovered() - { - int32 old= RECOVERED; - while (!my_atomic_cas32_weak_explicit(&m_state, &old, ACQUIRED | RECOVERED, - MY_MEMORY_ORDER_RELAXED, - MY_MEMORY_ORDER_RELAXED)) - { - if (!(old & RECOVERED) || (old & ACQUIRED)) - return false; - old= RECOVERED; - (void) LF_BACKOFF(); - } - return true; - } - static void lf_hash_initializer(LF_HASH *hash __attribute__((unused)), - XID_cache_element *element, - XID_STATE *xid_state) - { - DBUG_ASSERT(!element->is_set(ACQUIRED | RECOVERED)); - element->m_xid_state= xid_state; - xid_state->xid_cache_element= element; - } - static void lf_alloc_constructor(uchar *ptr) - { - XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD); - element->m_state= 0; - } - static void lf_alloc_destructor(uchar *ptr) - { - XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD); - DBUG_ASSERT(!element->is_set(ACQUIRED)); - if (element->is_set(RECOVERED)) - my_free(element->m_xid_state); - } - static uchar *key(const XID_cache_element *element, size_t *length, - my_bool not_used __attribute__((unused))) - { - *length= element->m_xid_state->xid.key_length(); - return element->m_xid_state->xid.key(); - } -}; - - -static LF_HASH xid_cache; -static bool xid_cache_inited; - - -bool THD::fix_xid_hash_pins() -{ - if (!xid_hash_pins) - xid_hash_pins= lf_hash_get_pins(&xid_cache); - return !xid_hash_pins; -} - - -void xid_cache_init() -{ - xid_cache_inited= true; - lf_hash_init(&xid_cache, sizeof(XID_cache_element), LF_HASH_UNIQUE, 0, 0, - (my_hash_get_key) XID_cache_element::key, &my_charset_bin); - xid_cache.alloc.constructor= XID_cache_element::lf_alloc_constructor; - xid_cache.alloc.destructor= XID_cache_element::lf_alloc_destructor; - xid_cache.initializer= - (lf_hash_initializer) XID_cache_element::lf_hash_initializer; -} - - -void xid_cache_free() -{ - if (xid_cache_inited) - { - lf_hash_destroy(&xid_cache); - xid_cache_inited= false; - } -} - - -/** - Find recovered XA transaction by XID. -*/ - -XID_STATE *xid_cache_search(THD *thd, XID *xid) -{ - XID_STATE *xs= 0; - DBUG_ASSERT(thd->xid_hash_pins); - XID_cache_element *element= - (XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins, - xid->key(), xid->key_length()); - if (element) - { - if (element->acquire_recovered()) - xs= element->m_xid_state; - lf_hash_search_unpin(thd->xid_hash_pins); - DEBUG_SYNC(thd, "xa_after_search"); - } - return xs; -} - - -bool xid_cache_insert(XID *xid, enum xa_states xa_state) -{ - XID_STATE *xs; - LF_PINS *pins; - int res= 1; - - if (!(pins= lf_hash_get_pins(&xid_cache))) - return true; - - if ((xs= (XID_STATE*) my_malloc(sizeof(*xs), MYF(MY_WME)))) - { - xs->xa_state=xa_state; - xs->xid.set(xid); - xs->rm_error=0; - - if ((res= lf_hash_insert(&xid_cache, pins, xs))) - my_free(xs); - else - xs->xid_cache_element->set(XID_cache_element::RECOVERED); - if (res == 1) - res= 0; - } - lf_hash_put_pins(pins); - return res; -} - - -bool xid_cache_insert(THD *thd, XID_STATE *xid_state) -{ - if (thd->fix_xid_hash_pins()) - return true; - - int res= lf_hash_insert(&xid_cache, thd->xid_hash_pins, xid_state); - switch (res) - { - case 0: - xid_state->xid_cache_element->set(XID_cache_element::ACQUIRED); - break; - case 1: - my_error(ER_XAER_DUPID, MYF(0)); - /* fall through */ - default: - xid_state->xid_cache_element= 0; - } - return res; -} - - -void xid_cache_delete(THD *thd, XID_STATE *xid_state) -{ - if (xid_state->xid_cache_element) - { - bool recovered= xid_state->xid_cache_element->is_set(XID_cache_element::RECOVERED); - DBUG_ASSERT(thd->xid_hash_pins); - xid_state->xid_cache_element->mark_uninitialized(); - lf_hash_delete(&xid_cache, thd->xid_hash_pins, - xid_state->xid.key(), xid_state->xid.key_length()); - xid_state->xid_cache_element= 0; - if (recovered) - my_free(xid_state); - } -} - - -struct xid_cache_iterate_arg -{ - my_hash_walk_action action; - void *argument; -}; - -static my_bool xid_cache_iterate_callback(XID_cache_element *element, - xid_cache_iterate_arg *arg) -{ - my_bool res= FALSE; - if (element->lock()) - { - res= arg->action(element->m_xid_state, arg->argument); - element->unlock(); - } - return res; -} - -int xid_cache_iterate(THD *thd, my_hash_walk_action action, void *arg) -{ - xid_cache_iterate_arg argument= { action, arg }; - return thd->fix_xid_hash_pins() ? -1 : - lf_hash_iterate(&xid_cache, thd->xid_hash_pins, - (my_hash_walk_action) xid_cache_iterate_callback, - &argument); -} /** @@ -5953,9 +5778,30 @@ int THD::decide_logging_format(TABLE_LIST *tables) binlogging is off, or if the statement is filtered out from the binlog by filtering rules. */ +#ifdef WITH_WSREP + if (WSREP_CLIENT_NNULL(this) && + wsrep_thd_is_local(this) && + wsrep_is_active(this) && + variables.wsrep_trx_fragment_size > 0) + { + if (!is_current_stmt_binlog_format_row()) + { + my_message(ER_NOT_SUPPORTED_YET, + "Streaming replication not supported with " + "binlog_format=STATEMENT", MYF(0)); + DBUG_RETURN(-1); + } + } + + if ((WSREP_EMULATE_BINLOG_NNULL(this) || + (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG))) && + !(wsrep_binlog_format() == BINLOG_FORMAT_STMT && + !binlog_filter->db_ok(db.str))) +#else if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) && !(wsrep_binlog_format() == BINLOG_FORMAT_STMT && !binlog_filter->db_ok(db.str))) +#endif /* WITH_WSREP */ { if (is_bulk_op()) @@ -6300,8 +6146,9 @@ int THD::decide_logging_format(TABLE_LIST *tables) /* As all updated tables are temporary, nothing will be logged */ set_current_stmt_binlog_format_row(); } - else if (IF_WSREP((!WSREP(this) || - wsrep_exec_mode == LOCAL_STATE),1)) + else if (IF_WSREP((!WSREP_NNULL(this) || + wsrep_cs().mode() == + wsrep::client_state::m_local),1)) { my_error((error= ER_BINLOG_STMT_MODE_AND_ROW_ENGINE), MYF(0), ""); } @@ -6692,8 +6539,9 @@ int THD::binlog_write_row(TABLE* table, bool is_trans, uchar const *record) { - DBUG_ASSERT(is_current_stmt_binlog_format_row() && - ((WSREP(this) && wsrep_emulate_bin_log) || mysql_bin_log.is_open())); + DBUG_ASSERT(is_current_stmt_binlog_format_row()); + DBUG_ASSERT((WSREP_NNULL(this) && wsrep_emulate_bin_log) || + mysql_bin_log.is_open()); /* Pack records into format for transfer. We are allocating more memory than needed, but that doesn't matter. @@ -6733,8 +6581,9 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, const uchar *before_record, const uchar *after_record) { - DBUG_ASSERT(is_current_stmt_binlog_format_row() && - ((WSREP(this) && wsrep_emulate_bin_log) || mysql_bin_log.is_open())); + DBUG_ASSERT(is_current_stmt_binlog_format_row()); + DBUG_ASSERT((WSREP_NNULL(this) && wsrep_emulate_bin_log) || + mysql_bin_log.is_open()); /** Save a reference to the original read bitmaps @@ -6812,8 +6661,9 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, int THD::binlog_delete_row(TABLE* table, bool is_trans, uchar const *record) { - DBUG_ASSERT(is_current_stmt_binlog_format_row() && - ((WSREP(this) && wsrep_emulate_bin_log) || mysql_bin_log.is_open())); + DBUG_ASSERT(is_current_stmt_binlog_format_row()); + DBUG_ASSERT((WSREP_NNULL(this) && wsrep_emulate_bin_log) || + mysql_bin_log.is_open()); /** Save a reference to the original read bitmaps We will need this to restore the bitmaps at the end as @@ -6944,7 +6794,7 @@ int THD::binlog_remove_pending_rows_event(bool clear_maps, { DBUG_ENTER("THD::binlog_remove_pending_rows_event"); - if(!WSREP_EMULATE_BINLOG(this) && !mysql_bin_log.is_open()) + if(!WSREP_EMULATE_BINLOG_NNULL(this) && !mysql_bin_log.is_open()) DBUG_RETURN(0); /* Ensure that all events in a GTID group are in the same cache */ @@ -6967,7 +6817,7 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional) mode: it might be the case that we left row-based mode before flushing anything (e.g., if we have explicitly locked tables). */ - if(!WSREP_EMULATE_BINLOG(this) && !mysql_bin_log.is_open()) + if (!WSREP_EMULATE_BINLOG_NNULL(this) && !mysql_bin_log.is_open()) DBUG_RETURN(0); /* Ensure that all events in a GTID group are in the same cache */ @@ -7241,7 +7091,7 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, show_query_type(qtype), (int) query_len, query_arg)); DBUG_ASSERT(query_arg); - DBUG_ASSERT(WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open()); + DBUG_ASSERT(WSREP_EMULATE_BINLOG_NNULL(this) || mysql_bin_log.is_open()); /* If this is withing a BEGIN ... COMMIT group, don't log it */ if (variables.option_bits & OPTION_GTID_BEGIN) @@ -7409,7 +7259,7 @@ wait_for_commit::reinit() { subsequent_commits_list= NULL; next_subsequent_commit= NULL; - waitee= NULL; + waitee.store(NULL, std::memory_order_relaxed); opaque_pointer= NULL; wakeup_error= 0; wakeup_subsequent_commits_running= false; @@ -7470,7 +7320,6 @@ wait_for_commit::~wait_for_commit() mysql_cond_destroy(&COND_wait_commit); } - void wait_for_commit::wakeup(int wakeup_error) { @@ -7487,8 +7336,9 @@ wait_for_commit::wakeup(int wakeup_error) */ mysql_mutex_lock(&LOCK_wait_commit); - waitee= NULL; this->wakeup_error= wakeup_error; + /* Memory barrier to make wakeup_error visible to the waiter thread. */ + waitee.store(NULL, std::memory_order_release); /* Note that it is critical that the mysql_cond_signal() here is done while still holding the mutex. As soon as we release the mutex, the waiter might @@ -7519,9 +7369,10 @@ wait_for_commit::wakeup(int wakeup_error) void wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee) { - DBUG_ASSERT(!this->waitee /* No prior registration allowed */); + DBUG_ASSERT(!this->waitee.load(std::memory_order_relaxed) + /* No prior registration allowed */); wakeup_error= 0; - this->waitee= waitee; + this->waitee.store(waitee, std::memory_order_relaxed); mysql_mutex_lock(&waitee->LOCK_wait_commit); /* @@ -7530,7 +7381,7 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee) see comments on wakeup_subsequent_commits2() for details. */ if (waitee->wakeup_subsequent_commits_running) - this->waitee= NULL; + this->waitee.store(NULL, std::memory_order_relaxed); else { /* @@ -7544,23 +7395,41 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee) } -/* - Wait for commit of another transaction to complete, as already registered +/** + Waits for commit of another transaction to complete, as already registered with register_wait_for_prior_commit(). If the commit already completed, returns immediately. + + If thd->backup_commit_lock is set, release it while waiting for other threads */ + int wait_for_commit::wait_for_prior_commit2(THD *thd) { PSI_stage_info old_stage; wait_for_commit *loc_waitee; + bool backup_lock_released= 0; + + /* + Release MDL_BACKUP_COMMIT LOCK while waiting for other threads to commit + This is needed to avoid deadlock between the other threads (which not + yet have the MDL_BACKUP_COMMIT_LOCK) and any threads using + BACKUP LOCK BLOCK_COMMIT. + */ + if (thd->backup_commit_lock && thd->backup_commit_lock->ticket) + { + backup_lock_released= 1; + thd->mdl_context.release_lock(thd->backup_commit_lock->ticket); + thd->backup_commit_lock->ticket= 0; + } mysql_mutex_lock(&LOCK_wait_commit); DEBUG_SYNC(thd, "wait_for_prior_commit_waiting"); thd->ENTER_COND(&COND_wait_commit, &LOCK_wait_commit, &stage_waiting_for_prior_transaction_to_commit, &old_stage); - while ((loc_waitee= this->waitee) && likely(!thd->check_killed(1))) + while ((loc_waitee= this->waitee.load(std::memory_order_relaxed)) && + likely(!thd->check_killed(1))) mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit); if (!loc_waitee) { @@ -7583,14 +7452,14 @@ wait_for_commit::wait_for_prior_commit2(THD *thd) do { mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit); - } while (this->waitee); + } while (this->waitee.load(std::memory_order_relaxed)); if (wakeup_error) my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); goto end; } remove_from_list(&loc_waitee->subsequent_commits_list); mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); - this->waitee= NULL; + this->waitee.store(NULL, std::memory_order_relaxed); wakeup_error= thd->killed_errno(); if (!wakeup_error) @@ -7602,10 +7471,16 @@ wait_for_commit::wait_for_prior_commit2(THD *thd) use within enter_cond/exit_cond. */ DEBUG_SYNC(thd, "wait_for_prior_commit_killed"); + if (backup_lock_released) + thd->mdl_context.acquire_lock(thd->backup_commit_lock, + thd->variables.lock_wait_timeout); return wakeup_error; end: thd->EXIT_COND(&old_stage); + if (backup_lock_released) + thd->mdl_context.acquire_lock(thd->backup_commit_lock, + thd->variables.lock_wait_timeout); return wakeup_error; } @@ -7692,7 +7567,7 @@ wait_for_commit::unregister_wait_for_prior_commit2() wait_for_commit *loc_waitee; mysql_mutex_lock(&LOCK_wait_commit); - if ((loc_waitee= this->waitee)) + if ((loc_waitee= this->waitee.load(std::memory_order_relaxed))) { mysql_mutex_lock(&loc_waitee->LOCK_wait_commit); if (loc_waitee->wakeup_subsequent_commits_running) @@ -7705,7 +7580,7 @@ wait_for_commit::unregister_wait_for_prior_commit2() See comments on wakeup_subsequent_commits2() for more details. */ mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); - while (this->waitee) + while (this->waitee.load(std::memory_order_relaxed)) mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit); } else @@ -7713,7 +7588,7 @@ wait_for_commit::unregister_wait_for_prior_commit2() /* Remove ourselves from the list in the waitee. */ remove_from_list(&loc_waitee->subsequent_commits_list); mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); - this->waitee= NULL; + this->waitee.store(NULL, std::memory_order_relaxed); } } wakeup_error= 0; @@ -7836,7 +7711,7 @@ Query_arena_stmt::~Query_arena_stmt() bool THD::timestamp_to_TIME(MYSQL_TIME *ltime, my_time_t ts, - ulong sec_part, ulonglong fuzzydate) + ulong sec_part, date_mode_t fuzzydate) { time_zone_used= 1; if (ts == 0 && sec_part == 0) |