diff options
-rw-r--r-- | scripts/wsrep_sst_xtrabackup.sh | 6 | ||||
-rw-r--r-- | sql/handler.cc | 21 | ||||
-rw-r--r-- | sql/handler.h | 2 | ||||
-rw-r--r-- | sql/log.cc | 7 | ||||
-rw-r--r-- | sql/mdl.cc | 8 | ||||
-rw-r--r-- | sql/slave.cc | 17 | ||||
-rw-r--r-- | sql/sql_acl.cc | 38 | ||||
-rw-r--r-- | sql/sql_class.cc | 60 | ||||
-rw-r--r-- | sql/sql_class.h | 32 | ||||
-rw-r--r-- | sql/sql_insert.cc | 23 | ||||
-rw-r--r-- | sql/sql_parse.cc | 170 | ||||
-rw-r--r-- | sql/sql_parse.h | 1 | ||||
-rw-r--r-- | sql/sys_vars.cc | 9 | ||||
-rw-r--r-- | sql/wsrep_hton.cc | 9 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 22 | ||||
-rw-r--r-- | sql/wsrep_mysqld.h | 88 | ||||
-rw-r--r-- | storage/innobase/handler/ha_innodb.cc | 40 | ||||
-rw-r--r-- | storage/innobase/handler/ha_innodb.h | 3 | ||||
-rw-r--r-- | storage/innobase/include/ha_prototypes.h | 2 | ||||
-rw-r--r-- | storage/innobase/lock/lock0lock.c | 59 | ||||
-rw-r--r-- | storage/innobase/rem/rem0rec.c | 4 |
21 files changed, 434 insertions, 187 deletions
diff --git a/scripts/wsrep_sst_xtrabackup.sh b/scripts/wsrep_sst_xtrabackup.sh index 5dad320e0f8..dd8532d6485 100644 --- a/scripts/wsrep_sst_xtrabackup.sh +++ b/scripts/wsrep_sst_xtrabackup.sh @@ -67,9 +67,9 @@ INNOBACKUPEX_BIN=innobackupex INNOBACKUPEX_ARGS="" NC_BIN=nc -for TOOL_BIN in INNOBACKUPEX_BIN NC_BIN ; do - which ${!TOOL_BIN} > /dev/null 2>&1 - if [ $? -ne 0 ]; then +for TOOL_BIN in INNOBACKUPEX_BIN NC_BIN ; do + if ! which ${!TOOL_BIN} > /dev/null 2>&1 + then echo "Can't find ${!TOOL_BIN} in the path" exit 22 # EINVAL fi diff --git a/sql/handler.cc b/sql/handler.cc index e95c903fd47..bce48feab89 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -5268,6 +5268,27 @@ int ha_wsrep_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal) DBUG_RETURN(0); } + +void ha_wsrep_fake_trx_id(THD *thd) +{ + DBUG_ENTER("ha_wsrep_fake_trx_id"); + if (!WSREP(thd)) + { + DBUG_VOID_RETURN; + } + + handlerton *hton= installed_htons[DB_TYPE_INNODB]; + if (hton && hton->wsrep_fake_trx_id) + { + hton->wsrep_fake_trx_id(hton, thd); + } + else + { + WSREP_WARN("cannot get get fake InnoDB transaction ID"); + } + + DBUG_VOID_RETURN; +} #endif /* WITH_WSREP */ #ifdef TRANS_LOG_MGM_EXAMPLE_CODE /* diff --git a/sql/handler.h b/sql/handler.h index 47593845ad6..13444a1095d 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -1047,6 +1047,7 @@ struct handlerton THD *victim_thd, my_bool signal); int (*wsrep_set_checkpoint)(handlerton *hton, const XID* xid); int (*wsrep_get_checkpoint)(handlerton *hton, XID* xid); + void (*wsrep_fake_trx_id)(handlerton *hton, THD *thd); uint32 license; /* Flag for Engine License */ /* @@ -3033,6 +3034,7 @@ int ha_savepoint(THD *thd, SAVEPOINT *sv); int ha_release_savepoint(THD *thd, SAVEPOINT *sv); #ifdef WITH_WSREP int ha_wsrep_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal); +void ha_wsrep_fake_trx_id(THD *thd); #endif /* WITH_WSREP */ /* these are called by storage engines */ diff --git a/sql/log.cc b/sql/log.cc index 22d2a3c0647..4dd5bb561e5 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -1833,6 +1833,13 @@ static inline int binlog_commit_flush_stmt_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr) { +#ifdef WITH_WSREP + if (thd->wsrep_mysql_replicated > 0) + { + WSREP_DEBUG("avoiding binlog_commit_flush_trx_cache: %d", thd->wsrep_mysql_replicated); + return 0; + } +#endif Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), FALSE, TRUE, TRUE, 0); return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, FALSE)); diff --git a/sql/mdl.cc b/sql/mdl.cc index 8ff420e4f50..2e279c19592 100644 --- a/sql/mdl.cc +++ b/sql/mdl.cc @@ -1575,6 +1575,14 @@ MDL_lock::can_grant_lock(enum_mdl_type type_arg, else if (!wsrep_grant_mdl_exception(requestor_ctx, ticket)) { wsrep_can_grant= FALSE; + if (wsrep_log_conflicts) + { + MDL_lock * lock = ticket->get_lock(); + WSREP_INFO( + "MDL conflict db=%s table=%s ticket=%d solved by %s", + lock->key.db_name(), lock->key.name(), ticket->get_type(), "abort" + ); + } } else { diff --git a/sql/slave.cc b/sql/slave.cc index fe3ce480367..b928b658f51 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -2600,6 +2600,23 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) ev->thd = thd; // because up to this point, ev->thd == 0 int reason= ev->shall_skip(rli); +#ifdef WITH_WSREP + if (ev->get_type_code() == XID_EVENT || + (ev->get_type_code() == QUERY_EVENT && thd->wsrep_mysql_replicated > 0 && + (!strncasecmp(((Query_log_event*)ev)->query , "BEGIN", 5) || + !strncasecmp(((Query_log_event*)ev)->query , "COMMIT", 6) ))) + { + if (++thd->wsrep_mysql_replicated < (int)wsrep_mysql_replication_bundle) + { + WSREP_DEBUG("skipping wsrep commit %d", thd->wsrep_mysql_replicated); + reason = Log_event::EVENT_SKIP_IGNORE; + } + else + { + thd->wsrep_mysql_replicated = 0; + } + } +#endif if (reason == Log_event::EVENT_SKIP_COUNT) sql_slave_skip_counter= --rli->slave_skip_counter; mysql_mutex_unlock(&rli->data_lock); diff --git a/sql/sql_acl.cc b/sql/sql_acl.cc index 8f27c45247e..22fb5ee74cd 100644 --- a/sql/sql_acl.cc +++ b/sql/sql_acl.cc @@ -1830,6 +1830,9 @@ int check_change_password(THD *thd, const char *host, const char *user, return(1); } if (!thd->slave_thread && +#ifdef WITH_WSREP + (!WSREP(thd) || !thd->wsrep_applier) && +#endif /* WITH_WSREP */ (strcmp(thd->security_ctx->user, user) || my_strcasecmp(system_charset_info, host, thd->security_ctx->priv_host))) @@ -1837,7 +1840,12 @@ int check_change_password(THD *thd, const char *host, const char *user, if (check_access(thd, UPDATE_ACL, "mysql", NULL, NULL, 1, 0)) return(1); } +#ifdef WITH_WSREP + if ((!WSREP(thd) || !thd->wsrep_applier) && + !thd->slave_thread && !thd->security_ctx->user[0]) +#else if (!thd->slave_thread && !thd->security_ctx->user[0]) +#endif /* WITH_WSREP */ { my_message(ER_PASSWORD_ANONYMOUS_USER, ER(ER_PASSWORD_ANONYMOUS_USER), MYF(0)); @@ -1876,10 +1884,13 @@ bool change_password(THD *thd, const char *host, const char *user, TABLE *table; /* Buffer should be extended when password length is extended. */ char buff[512]; - ulong query_length; + ulong query_length=0; bool save_binlog_row_based; uint new_password_len= (uint) strlen(new_password); bool result= 1; +#ifdef WITH_WSREP + const CSET_STRING query_save = thd->query_string; +#endif /* WITH_WSREP */ DBUG_ENTER("change_password"); DBUG_PRINT("enter",("host: '%s' user: '%s' new_password: '%s'", host,user,new_password)); @@ -1887,6 +1898,18 @@ bool change_password(THD *thd, const char *host, const char *user, if (check_change_password(thd, host, user, new_password, new_password_len)) DBUG_RETURN(1); +#ifdef WITH_WSREP + if (WSREP(thd) && !thd->wsrep_applier) + { + query_length= sprintf(buff, "SET PASSWORD FOR '%-.120s'@'%-.120s'='%-.120s'", + user ? user : "", + host ? host : "", + new_password); + thd->set_query_inner(buff, query_length, system_charset_info); + + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, (char*)"user", NULL); + } +#endif /* WITH_WSREP */ tables.init_one_table("mysql", 5, "user", 4, "user", TL_WRITE); @@ -1965,13 +1988,26 @@ bool change_password(THD *thd, const char *host, const char *user, } end: close_mysql_tables(thd); +#ifdef WITH_WSREP + if (WSREP(thd) && !thd->wsrep_applier) + { + WSREP_TO_ISOLATION_END; + thd->query_string = query_save; + thd->wsrep_exec_mode = LOCAL_STATE; + } +#endif /* WITH_WSREP */ /* Restore the state of binlog format */ DBUG_ASSERT(!thd->is_current_stmt_binlog_format_row()); if (save_binlog_row_based) thd->set_current_stmt_binlog_format_row(); DBUG_RETURN(result); +#ifdef WITH_WSREP + error: + WSREP_ERROR("Replication of SET PASSWORD failed: %s", buff); + DBUG_RETURN(result); +#endif /* WITH_WSREP */ } diff --git a/sql/sql_class.cc b/sql/sql_class.cc index ff92eda7c1f..7604227740f 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -733,14 +733,49 @@ extern "C" enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd) { return thd->wsrep_exec_mode; } + +extern "C" const char *wsrep_thd_exec_mode_str(THD *thd) +{ + return + (!thd) ? "void" : + (thd->wsrep_exec_mode == LOCAL_STATE) ? "local" : + (thd->wsrep_exec_mode == REPL_RECV) ? "applier" : + (thd->wsrep_exec_mode == TOTAL_ORDER) ? "total order" : + (thd->wsrep_exec_mode == LOCAL_COMMIT) ? "local commit" : "void"; +} + extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd) { return thd->wsrep_query_state; } + +extern "C" const char *wsrep_thd_query_state_str(THD *thd) +{ + return + (!thd) ? "void" : + (thd->wsrep_query_state == QUERY_IDLE) ? "idle" : + (thd->wsrep_query_state == QUERY_EXEC) ? "executing" : + (thd->wsrep_query_state == QUERY_COMMITTING) ? "committing" : + (thd->wsrep_query_state == QUERY_EXITING) ? "exiting" : + (thd->wsrep_query_state == QUERY_ROLLINGBACK) ? "rolling back" : "void"; +} + extern "C" enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd) { return thd->wsrep_conflict_state; } +extern "C" const char *wsrep_thd_conflict_state_str(THD *thd) +{ + return + (!thd) ? "void" : + (thd->wsrep_conflict_state == NO_CONFLICT) ? "no conflict" : + (thd->wsrep_conflict_state == MUST_ABORT) ? "must abort" : + (thd->wsrep_conflict_state == ABORTING) ? "aborting" : + (thd->wsrep_conflict_state == MUST_REPLAY) ? "must replay" : + (thd->wsrep_conflict_state == REPLAYING) ? "replaying" : + (thd->wsrep_conflict_state == RETRY_AUTOCOMMIT) ? "retrying" : + (thd->wsrep_conflict_state == CERT_FAILURE) ? "cert failure" : "void"; +} extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd) { @@ -769,7 +804,7 @@ extern "C" my_thread_id wsrep_thd_thread_id(THD *thd) } extern "C" wsrep_seqno_t wsrep_thd_trx_seqno(THD *thd) { - return thd->wsrep_trx_seqno; + return (thd) ? thd->wsrep_trx_seqno : -1; } extern "C" query_id_t wsrep_thd_query_id(THD *thd) { @@ -777,7 +812,7 @@ extern "C" query_id_t wsrep_thd_query_id(THD *thd) } extern "C" char *wsrep_thd_query(THD *thd) { - return thd->query(); + return (thd) ? thd->query() : NULL; } extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd) { @@ -995,17 +1030,19 @@ THD::THD() #ifdef WITH_WSREP mysql_mutex_init(key_LOCK_wsrep_thd, &LOCK_wsrep_thd, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wsrep_thd, &COND_wsrep_thd, NULL); - wsrep_trx_handle.trx_id= -1; - wsrep_trx_handle.opaque= NULL; + wsrep_trx_handle.trx_id = -1; + wsrep_trx_handle.opaque = NULL; //wsrep_retry_autocommit= ::wsrep_retry_autocommit; - wsrep_retry_counter= 0; - wsrep_PA_safe = true; - wsrep_seqno_changed= false; - wsrep_retry_query = NULL; - wsrep_retry_query_len = 0; - wsrep_retry_command = COM_CONNECT; + wsrep_retry_counter = 0; + wsrep_PA_safe = true; + wsrep_seqno_changed = false; + wsrep_retry_query = NULL; + wsrep_retry_query_len = 0; + wsrep_retry_command = COM_CONNECT; wsrep_consistency_check = NO_CONSISTENCY_CHECK; - wsrep_status_vars = 0; + wsrep_status_vars = 0; + wsrep_mysql_replicated = 0; + #endif /* Call to init() below requires fully initialized Open_tables_state. */ reset_open_tables_state(this); @@ -1359,6 +1396,7 @@ void THD::init(void) wsrep_PA_safe= true; wsrep_seqno_changed= false; wsrep_consistency_check = NO_CONSISTENCY_CHECK; + wsrep_mysql_replicated = 0; #endif if (variables.sql_log_bin) variables.option_bits|= OPTION_BIN_LOG; diff --git a/sql/sql_class.h b/sql/sql_class.h index b6e2069fe17..98766cabeca 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -20,37 +20,6 @@ #define SQL_CLASS_INCLUDED /* Classes in mysql */ -#ifdef WITH_WSREP -#include "../wsrep/wsrep_api.h" -//#include "wsrep_mysqld.h" - enum wsrep_exec_mode { - LOCAL_STATE, - REPL_RECV, - TOTAL_ORDER, - LOCAL_COMMIT, - }; - enum wsrep_query_state { - QUERY_IDLE, - QUERY_EXEC, - QUERY_COMMITTING, - QUERY_EXITING, - QUERY_ROLLINGBACK, - }; - enum wsrep_conflict_state { - NO_CONFLICT, - MUST_ABORT, - ABORTING, - ABORTED, - MUST_REPLAY, - REPLAYING, - RETRY_AUTOCOMMIT, - }; - enum wsrep_consistency_check_mode { - NO_CONSISTENCY_CHECK, - CONSISTENCY_CHECK_DECLARED, - CONSISTENCY_CHECK_RUNNING, - }; -#endif #ifdef USE_PRAGMA_INTERFACE #pragma interface /* gcc class implementation */ @@ -2405,6 +2374,7 @@ public: enum wsrep_consistency_check_mode wsrep_consistency_check; wsrep_stats_var* wsrep_status_vars; + int wsrep_mysql_replicated; #endif /* WITH_WSREP */ /** Internal parser state. diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 93b6ea11cc1..5c165e4ec7d 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -3573,8 +3573,14 @@ bool select_insert::send_eof() DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'", trans_table, table->file->table_type())); +#ifdef WITH_WSREP + error= (thd->wsrep_conflict_state == MUST_ABORT) ? -1 : + (thd->locked_tables_mode <= LTM_LOCK_TABLES ? + table->file->ha_end_bulk_insert() : 0); +#else error= (thd->locked_tables_mode <= LTM_LOCK_TABLES ? table->file->ha_end_bulk_insert() : 0); +#endif /* WITH_WSREP */ if (!error && thd->is_error()) error= thd->stmt_da->sql_errno(); @@ -4101,22 +4107,9 @@ select_create::binlog_show_create_table(TABLE **tables, uint count) errcode); } #ifdef WITH_WSREP - const CSET_STRING query_save = thd->query_string; - thd->set_query_inner((char*)query.ptr(), query.length(), system_charset_info); - - WSREP_TO_ISOLATION_BEGIN((*tables)->s->db.str, (*tables)->s->table_name.str, NULL); - WSREP_TO_ISOLATION_END; - - thd_binlog_trx_reset(thd); - thd->query_string = query_save; - thd->wsrep_exec_mode = LOCAL_STATE; -#endif /* WITH_WSREP */ - return result; -#ifdef WITH_WSREP - error: - WSREP_WARN("TO isolation failed: %s", thd->query()); - return 0; + ha_wsrep_fake_trx_id(thd); #endif + return result; } void select_create::store_values(List<Item> &values) diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index c008870b1df..6a9063d0edb 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -108,7 +108,6 @@ static void wsrep_client_rollback(THD *thd); extern Format_description_log_event *wsrep_format_desc; -#define WSREP_MYSQL_DB (char *)"mysql" static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, Parser_state *parser_state); @@ -6050,85 +6049,88 @@ static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, /* wsrep BF abort in query exec phase */ mysql_mutex_lock(&thd->LOCK_wsrep_thd); if (thd->wsrep_conflict_state == MUST_ABORT) { - wsrep_client_rollback(thd); + wsrep_client_rollback(thd); - WSREP_DEBUG("abort in exec query state, avoiding autocommit"); + WSREP_DEBUG("abort in exec query state, avoiding autocommit"); } /* checking if BF trx must be replayed */ if (thd->wsrep_conflict_state== MUST_REPLAY) { - if (thd->wsrep_exec_mode!= REPL_RECV) { - if (thd->stmt_da->is_sent) { - WSREP_ERROR("replay issue, thd has reported status already"); - } - thd->stmt_da->reset_diagnostics_area(); + if (thd->wsrep_exec_mode!= REPL_RECV) { + if (thd->stmt_da->is_sent) { + WSREP_ERROR("replay issue, thd has reported status already"); + } + thd->stmt_da->reset_diagnostics_area(); - thd->wsrep_conflict_state= REPLAYING; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + thd->wsrep_conflict_state= REPLAYING; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - mysql_reset_thd_for_next_command(thd, opt_userstat_running); - thd->killed= NOT_KILLED; - close_thread_tables(thd); - if (thd->locked_tables_mode && thd->lock) - { - WSREP_DEBUG("releasing table lock for replaying (%ld)", thd->thread_id); - thd->locked_tables_list.unlock_locked_tables(thd); - thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); - } - thd->mdl_context.release_transactional_locks(); - - thd_proc_info(thd, "wsrep replaying trx"); - WSREP_DEBUG("replay trx: %s %lld", - thd->query() ? thd->query() : "void", - (long long)thd->wsrep_trx_seqno); - struct wsrep_thd_shadow shadow; - wsrep_prepare_bf_thd(thd, &shadow); - int rcode = wsrep->replay_trx(wsrep, - &thd->wsrep_trx_handle, - (void *)thd); - - wsrep_return_from_bf_mode(thd, &shadow); - if (thd->wsrep_conflict_state!= REPLAYING) - WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state ); - - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - - switch (rcode) { - case WSREP_OK: - thd->wsrep_conflict_state= NO_CONFLICT; - wsrep->post_commit(wsrep, &thd->wsrep_trx_handle); - WSREP_DEBUG("trx_replay successful for: %ld %llu", - thd->thread_id, (long long)thd->real_id); - break; - case WSREP_TRX_FAIL: - if (thd->stmt_da->is_sent) { - WSREP_ERROR("replay failed, thd has reported status"); - } - else - { - WSREP_DEBUG("replay failed, rolling back"); - my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); - } - thd->wsrep_conflict_state= ABORTED; - wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle); - break; - default: - WSREP_ERROR("trx_replay failed for: %d, query: %s", - rcode, thd->query() ? thd->query() : "void"); - /* we're now in inconsistent state, must abort */ - unireg_abort(1); - break; - } - mysql_mutex_lock(&LOCK_wsrep_replaying); - wsrep_replaying--; - WSREP_DEBUG("replaying decreased: %d, thd: %lu", - wsrep_replaying, thd->thread_id); - mysql_cond_broadcast(&COND_wsrep_replaying); - mysql_mutex_unlock(&LOCK_wsrep_replaying); + mysql_reset_thd_for_next_command(thd, opt_userstat_running); + thd->killed= NOT_KILLED; + close_thread_tables(thd); + if (thd->locked_tables_mode && thd->lock) + { + WSREP_DEBUG("releasing table lock for replaying (%ld)", + thd->thread_id); + thd->locked_tables_list.unlock_locked_tables(thd); + thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); + } + thd->mdl_context.release_transactional_locks(); + + thd_proc_info(thd, "wsrep replaying trx"); + WSREP_DEBUG("replay trx: %s %lld", + thd->query() ? thd->query() : "void", + (long long)thd->wsrep_trx_seqno); + struct wsrep_thd_shadow shadow; + wsrep_prepare_bf_thd(thd, &shadow); + int rcode = wsrep->replay_trx(wsrep, + &thd->wsrep_trx_handle, + (void *)thd); + + wsrep_return_from_bf_mode(thd, &shadow); + if (thd->wsrep_conflict_state!= REPLAYING) + WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state ); + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + + switch (rcode) { + case WSREP_OK: + thd->wsrep_conflict_state= NO_CONFLICT; + wsrep->post_commit(wsrep, &thd->wsrep_trx_handle); + WSREP_DEBUG("trx_replay successful for: %ld %llu", + thd->thread_id, (long long)thd->real_id); + break; + case WSREP_TRX_FAIL: + if (thd->stmt_da->is_sent) { + WSREP_ERROR("replay failed, thd has reported status"); + } + else + { + WSREP_DEBUG("replay failed, rolling back"); + my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); + } + thd->wsrep_conflict_state= ABORTED; + wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle); + break; + default: + WSREP_ERROR("trx_replay failed for: %d, query: %s", + rcode, thd->query() ? thd->query() : "void"); + /* we're now in inconsistent state, must abort */ + unireg_abort(1); + break; + } + mysql_mutex_lock(&LOCK_wsrep_replaying); + wsrep_replaying--; + WSREP_DEBUG("replaying decreased: %d, thd: %lu", + wsrep_replaying, thd->thread_id); + mysql_cond_broadcast(&COND_wsrep_replaying); + mysql_mutex_unlock(&LOCK_wsrep_replaying); } } + /* setting error code for BF aborted trxs */ - if (thd->wsrep_conflict_state == ABORTED) + if (thd->wsrep_conflict_state == ABORTED || + thd->wsrep_conflict_state == CERT_FAILURE) { mysql_reset_thd_for_next_command(thd, opt_userstat_running); thd->killed= NOT_KILLED; @@ -6143,25 +6145,27 @@ static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, thd->wsrep_conflict_state= RETRY_AUTOCOMMIT; thd->wsrep_retry_counter++; // grow - wsrep_copy_query(thd); + wsrep_copy_query(thd); thd->set_time(); parser_state->reset(rawbuf, length); - } - else - { - WSREP_DEBUG("BF aborted, thd: %lu is_AC: %d, retry: %lu - %lu SQL: %s", - thd->thread_id, is_autocommit, thd->wsrep_retry_counter, - thd->variables.wsrep_retry_autocommit, thd->query()); - my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); - thd->killed= NOT_KILLED; - thd->wsrep_conflict_state= NO_CONFLICT; + } + else + { + WSREP_DEBUG("%s, thd: %lu is_AC: %d, retry: %lu - %lu SQL: %s", + (thd->wsrep_conflict_state == ABORTED) ? + "BF Aborted" : "cert failure", + thd->thread_id, is_autocommit, thd->wsrep_retry_counter, + thd->variables.wsrep_retry_autocommit, thd->query()); + my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); + thd->killed= NOT_KILLED; + thd->wsrep_conflict_state= NO_CONFLICT; if (thd->wsrep_conflict_state != REPLAYING) - thd->wsrep_retry_counter= 0; // reset - } + thd->wsrep_retry_counter= 0; // reset + } } else { - set_if_smaller(thd->wsrep_retry_counter, 0); // reset; eventually ok + set_if_smaller(thd->wsrep_retry_counter, 0); // reset; eventually ok } mysql_mutex_unlock(&thd->LOCK_wsrep_thd); } diff --git a/sql/sql_parse.h b/sql/sql_parse.h index d7063291fde..1303eba84b7 100644 --- a/sql/sql_parse.h +++ b/sql/sql_parse.h @@ -204,6 +204,7 @@ inline bool is_supported_parser_charset(CHARSET_INFO *cs) } #ifdef WITH_WSREP +#define WSREP_MYSQL_DB (char *)"mysql" #define WSREP_TO_ISOLATION_BEGIN(db_, table_, table_list_) \ if (WSREP(thd) && wsrep_to_isolation_begin(thd, db_, table_, table_list_)) goto error; diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 89bb2e1a812..08277b9f624 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -3826,6 +3826,15 @@ static Sys_var_mybool Sys_wsrep_replicate_myisam( "wsrep_replicate_myisam", "To enable myisam replication", GLOBAL_VAR(wsrep_replicate_myisam), CMD_LINE(OPT_ARG), DEFAULT(FALSE)); +static Sys_var_mybool Sys_wsrep_log_conflicts( + "wsrep_log_conflicts", "To log multi-master conflicts", + GLOBAL_VAR(wsrep_log_conflicts), CMD_LINE(OPT_ARG), DEFAULT(FALSE)); + +static Sys_var_ulong Sys_wsrep_mysql_replication_bundle( + "wsrep_mysql_replication_bundle", "mysql replication group commit ", + GLOBAL_VAR(wsrep_mysql_replication_bundle), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0, 1000), DEFAULT(0), BLOCK_SIZE(1)); + #endif /* WITH_WSREP */ static Sys_var_ulong Sys_sp_cache_size( diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc index 219e0e8a244..1fc2372a57c 100644 --- a/sql/wsrep_hton.cc +++ b/sql/wsrep_hton.cc @@ -378,6 +378,15 @@ wsrep_run_wsrep_commit( if (thd->wsrep_conflict_state == MUST_ABORT) { thd->wsrep_conflict_state= ABORTED; + } + else + { + WSREP_DEBUG("conflict state: %d", thd->wsrep_conflict_state); + if (thd->wsrep_conflict_state == NO_CONFLICT) + { + thd->wsrep_conflict_state = CERT_FAILURE; + WSREP_LOG_CONFLICT(NULL, thd, FALSE); + } } mysql_mutex_unlock(&thd->LOCK_wsrep_thd); diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index eedd763b784..c31c02e6d33 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -49,6 +49,8 @@ long wsrep_max_protocol_version = 2; // maximum protocol version to use ulong wsrep_forced_binlog_format = BINLOG_FORMAT_UNSPEC; my_bool wsrep_recovery = 0; // recovery my_bool wsrep_replicate_myisam = 0; // enable myisam replication +my_bool wsrep_log_conflicts = 0; // +ulong wsrep_mysql_replication_bundle = 0; /* * End configuration options @@ -725,7 +727,8 @@ wsrep_causal_wait (THD* thd) break; default: msg= "Causal wait failed."; - err= ER_ERROR_ON_READ; + err= ER_LOCK_WAIT_TIMEOUT; // NOTE: the above msg won't be displayed + // with ER_LOCK_WAIT_TIMEOUT } my_error(err, MYF(0), msg); @@ -1225,6 +1228,7 @@ wsrep_grant_mdl_exception(MDL_context *requestor_ctx, THD *request_thd = requestor_ctx->get_thd(); THD *granted_thd = ticket->get_ctx()->get_thd(); + bool ret = FALSE; mysql_mutex_lock(&request_thd->LOCK_wsrep_thd); if (request_thd->wsrep_exec_mode == TOTAL_ORDER || @@ -1239,39 +1243,39 @@ wsrep_grant_mdl_exception(MDL_context *requestor_ctx, { WSREP_MDL_LOG(INFO, "MDL BF-BF conflict", request_thd, granted_thd); mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd); - return TRUE; + ret = TRUE; } else if (granted_thd->lex->sql_command == SQLCOM_FLUSH) { WSREP_DEBUG("mdl granted over FLUSH BF"); mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd); - return TRUE; - } + ret = TRUE; + } else if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE) { WSREP_DEBUG("DROP caused BF abort"); mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd); wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1); - return FALSE; - } + ret = FALSE; + } else if (granted_thd->wsrep_query_state == QUERY_COMMITTING) { WSREP_DEBUG("mdl granted, but commiting thd abort scheduled"); mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd); wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1); - return FALSE; + ret = FALSE; } else { WSREP_MDL_LOG(DEBUG, "MDL conflict-> BF abort", request_thd, granted_thd); mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd); wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1); - return FALSE; + ret = FALSE; } } else { mysql_mutex_unlock(&request_thd->LOCK_wsrep_thd); } - return FALSE; + return ret; } diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index af6c66609d4..d1c72e6001f 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -24,6 +24,39 @@ typedef struct st_mysql_show_var SHOW_VAR; class set_var; class THD; +#ifdef WITH_WSREP +#include "../wsrep/wsrep_api.h" +//#include "wsrep_mysqld.h" + enum wsrep_exec_mode { + LOCAL_STATE, + REPL_RECV, + TOTAL_ORDER, + LOCAL_COMMIT, + }; + enum wsrep_query_state { + QUERY_IDLE, + QUERY_EXEC, + QUERY_COMMITTING, + QUERY_EXITING, + QUERY_ROLLINGBACK, + }; + enum wsrep_conflict_state { + NO_CONFLICT, + MUST_ABORT, + ABORTING, + ABORTED, + MUST_REPLAY, + REPLAYING, + RETRY_AUTOCOMMIT, + CERT_FAILURE, + }; + enum wsrep_consistency_check_mode { + NO_CONSISTENCY_CHECK, + CONSISTENCY_CHECK_DECLARED, + CONSISTENCY_CHECK_RUNNING, + }; +#endif + // Global wsrep parameters extern wsrep_t* wsrep; @@ -60,6 +93,8 @@ extern ulong wsrep_forced_binlog_format; extern ulong wsrep_OSU_method_options; extern my_bool wsrep_recovery; extern my_bool wsrep_replicate_myisam; +extern my_bool wsrep_log_conflicts; +extern ulong wsrep_mysql_replication_bundle; enum enum_wsrep_OSU_method { WSREP_OSU_TOI, WSREP_OSU_RSU }; @@ -138,6 +173,38 @@ extern int wsrep_init(); extern void wsrep_deinit(); extern void wsrep_recover(); + + +extern "C" enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd); +extern "C" enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd); +extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd); +extern "C" const char * wsrep_thd_exec_mode_str(THD *thd); +extern "C" const char * wsrep_thd_conflict_state_str(THD *thd); +extern "C" const char * wsrep_thd_query_state_str(THD *thd); +extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd); + +extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode); +extern "C" void wsrep_thd_set_query_state( + THD *thd, enum wsrep_query_state state); +extern "C" void wsrep_thd_set_conflict_state( + THD *thd, enum wsrep_conflict_state state); + +extern "C" void wsrep_thd_set_trx_to_replay(THD *thd, uint64 trx_id); + +extern "C"void wsrep_thd_LOCK(THD *thd); +extern "C"void wsrep_thd_UNLOCK(THD *thd); +extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd); +extern "C" time_t wsrep_thd_query_start(THD *thd); +extern "C" my_thread_id wsrep_thd_thread_id(THD *thd); +extern "C" int64_t wsrep_thd_trx_seqno(THD *thd); +extern "C" query_id_t wsrep_thd_query_id(THD *thd); +extern "C" char * wsrep_thd_query(THD *thd); +extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd); +extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id); +extern "C" void wsrep_thd_awake(THD *thd, my_bool signal); + + + /* wsrep initialization sequence at startup * @param first wsrep_init_first() value */ extern void wsrep_init_startup(bool first); @@ -188,6 +255,27 @@ extern wsrep_seqno_t wsrep_locked_seqno; #define WSREP_WARN(...) WSREP_LOG(sql_print_warning, ##__VA_ARGS__) #define WSREP_ERROR(...) WSREP_LOG(sql_print_error, ##__VA_ARGS__) +#define WSREP_LOG_CONFLICT_THD(thd, role) \ + WSREP_LOG(sql_print_information, \ + "%s: \n " \ + " THD: %lu, mode: %s, state: %s, conflict: %s, seqno: %ld\n " \ + " SQL: %s", \ + role, wsrep_thd_thread_id(thd), wsrep_thd_exec_mode_str(thd), \ + wsrep_thd_query_state_str(thd), \ + wsrep_thd_conflict_state_str(thd), wsrep_thd_trx_seqno(thd), \ + wsrep_thd_query(thd) \ + ); + +#define WSREP_LOG_CONFLICT(bf_thd, victim_thd, bf_abort) \ + if (wsrep_debug || wsrep_log_conflicts) \ + { \ + WSREP_LOG(sql_print_information, "cluster conflict due to %s for threads:",\ + (bf_abort) ? "high priority abort" : "certification failure" \ + ); \ + if (bf_thd) WSREP_LOG_CONFLICT_THD(bf_thd, "Winning thread"); \ + if (victim_thd) WSREP_LOG_CONFLICT_THD(victim_thd, "Victim thread"); \ + } + /*! Synchronizes applier thread start with init thread */ extern void wsrep_sst_grab(); /*! Init thread waits for SST completion */ diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index e1f18c84800..a1b7eab5deb 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -927,6 +927,8 @@ innobase_release_temporary_latches( static int wsrep_abort_transaction(handlerton* hton, THD *bf_thd, THD *victim_thd, my_bool signal); +static void +wsrep_fake_trx_id(handlerton* hton, THD *thd); static int innobase_wsrep_set_checkpoint(handlerton* hton, const XID* xid); static int innobase_wsrep_get_checkpoint(handlerton* hton, XID* xid); #endif @@ -2317,6 +2319,7 @@ innobase_init( innobase_hton->wsrep_abort_transaction=wsrep_abort_transaction; innobase_hton->wsrep_set_checkpoint=innobase_wsrep_set_checkpoint; innobase_hton->wsrep_get_checkpoint=innobase_wsrep_get_checkpoint; + innobase_hton->wsrep_fake_trx_id=wsrep_fake_trx_id; #endif /* WITH_WSREP */ ut_a(DATA_MYSQL_TRUE_VARCHAR == (ulint)MYSQL_TYPE_VARCHAR); @@ -7004,7 +7007,12 @@ wsrep_append_foreign_key( &key[1], &len, rec, index, wsrep_protocol_version > 1); if (rcode != DB_SUCCESS) { - WSREP_ERROR("FK key set failed: %lu", rcode); + WSREP_ERROR( + "FK key set failed: %lu (%lu %lu), index: %s %s, %s", + rcode, referenced, shared, + (index->name) ? index->name : "void index", + (index->table_name) ? index->table_name : "void table", + wsrep_thd_query(thd)); return rcode; } strncpy(cache_key, @@ -12168,18 +12176,27 @@ wsrep_abort_slave_trx(wsrep_seqno_t bf_seqno, wsrep_seqno_t victim_seqno) abort(); } int -wsrep_innobase_kill_one_trx(trx_t *bf_trx, trx_t *victim_trx, ibool signal) +wsrep_innobase_kill_one_trx(void *bf_thd_ptr, trx_t *bf_trx, trx_t *victim_trx, ibool signal) { DBUG_ENTER("wsrep_innobase_kill_one_trx"); + THD *bf_thd = (THD *)bf_thd_ptr; THD *thd = (THD *) victim_trx->mysql_thd; - THD *bf_thd = (bf_trx) ? (THD *)bf_trx->mysql_thd : NULL; int64_t bf_seqno = (bf_thd) ? wsrep_thd_trx_seqno(bf_thd) : 0; + if (!bf_thd) bf_thd = (bf_trx) ? (THD *)bf_trx->mysql_thd : NULL; + if (!thd) { DBUG_PRINT("wsrep", ("no thd for conflicting lock")); WSREP_WARN("no THD for trx: %llu", victim_trx->id); DBUG_RETURN(1); } + if (!bf_thd) { + DBUG_PRINT("wsrep", ("no BF thd for conflicting lock")); + WSREP_WARN("no BF THD for trx: %llu", (bf_trx) ? bf_trx->id : 0); + DBUG_RETURN(1); + } + + WSREP_LOG_CONFLICT(bf_thd, thd, TRUE); WSREP_DEBUG("BF kill (%lu, seqno: %lld), victim: (%lu) trx: %llu", signal, (long long)bf_seqno, @@ -12368,8 +12385,8 @@ wsrep_abort_transaction(handlerton* hton, THD *bf_thd, THD *victim_thd, if (victim_trx) { mutex_enter(&kernel_mutex); - int rcode = wsrep_innobase_kill_one_trx(bf_trx, victim_trx, - signal); + int rcode = wsrep_innobase_kill_one_trx( + bf_thd, bf_trx, victim_trx, signal); mutex_exit(&kernel_mutex); DBUG_RETURN(rcode); } else { @@ -12403,6 +12420,19 @@ static int innobase_wsrep_get_checkpoint(handlerton* hton, XID* xid) return 0; } +static void +wsrep_fake_trx_id( +/*==================*/ + handlerton *hton, + THD *thd) /*!< in: user thread handle */ +{ + mutex_enter(&kernel_mutex); + trx_id_t trx_id = trx_sys_get_new_trx_id(); + mutex_exit(&kernel_mutex); + + (void *)wsrep_trx_handle_for_id(wsrep_thd_trx_handle(thd), trx_id); +} + #endif /* WITH_WSREP */ /* plugin options */ static MYSQL_SYSVAR_BOOL(checksums, innobase_use_checksums, diff --git a/storage/innobase/handler/ha_innodb.h b/storage/innobase/handler/ha_innodb.h index 8f211d6e38c..aa653adcabf 100644 --- a/storage/innobase/handler/ha_innodb.h +++ b/storage/innobase/handler/ha_innodb.h @@ -304,6 +304,9 @@ extern "C" bool wsrep_thd_is_wsrep_on(THD *thd); extern "C" enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd); extern "C" enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd); extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd); +extern "C" const char * wsrep_thd_exec_mode_str(THD *thd); +extern "C" const char * wsrep_thd_conflict_state_str(THD *thd); +extern "C" const char * wsrep_thd_query_state_str(THD *thd); extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd); extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode); diff --git a/storage/innobase/include/ha_prototypes.h b/storage/innobase/include/ha_prototypes.h index 4ac365d5c40..664e18c52a8 100644 --- a/storage/innobase/include/ha_prototypes.h +++ b/storage/innobase/include/ha_prototypes.h @@ -288,7 +288,7 @@ thd_set_lock_wait_time( #ifdef WITH_WSREP UNIV_INTERN int -wsrep_innobase_kill_one_trx(trx_t *bf_trx, trx_t *victim_trx, ibool signal); +wsrep_innobase_kill_one_trx(void *bf_thd, trx_t *bf_trx, trx_t *victim_trx, ibool signal); int wsrep_thd_is_brute_force(void *thd_ptr); int wsrep_trx_order_before(void *thd1, void *thd2); void wsrep_innobase_mysql_sort(int mysql_type, uint charset_number, diff --git a/storage/innobase/lock/lock0lock.c b/storage/innobase/lock/lock0lock.c index b24bd05e0b3..bb864937751 100644 --- a/storage/innobase/lock/lock0lock.c +++ b/storage/innobase/lock/lock0lock.c @@ -42,6 +42,7 @@ Created 5/7/1996 Heikki Tuuri #ifdef WITH_WSREP extern my_bool wsrep_debug; +extern my_bool wsrep_log_conflicts; #endif /* Restricts the length of search we will do in the waits-for graph of transactions */ @@ -1528,7 +1529,34 @@ wsrep_kill_victim(trx_t *trx, lock_t *lock) { /* cannot release lock, until our lock is in the queue*/ } else if (lock->trx != trx) { - wsrep_innobase_kill_one_trx(trx, lock->trx, TRUE); + if (wsrep_log_conflicts) { + if (bf_this) + fputs("\n*** Priority TRANSACTION:\n", + stderr); + else + fputs("\n*** Victim TRANSACTION:\n", + stderr); + trx_print(stderr, trx, 3000); + + if (bf_other) + fputs("\n*** Priority TRANSACTION:\n", + stderr); + else + fputs("\n*** Victim TRANSACTION:\n", + stderr); + trx_print(stderr, lock->trx, 3000); + + fputs("*** WAITING FOR THIS LOCK TO BE GRANTED:\n", + stderr); + + if (lock_get_type(lock) == LOCK_REC) { + lock_rec_print(stderr, lock); + } else { + lock_table_print(stderr, lock); + } + } + wsrep_innobase_kill_one_trx( + trx->mysql_thd, trx, lock->trx, TRUE); } } } @@ -4091,33 +4119,12 @@ lock_table_other_has_incompatible( if ((lock->trx != trx) && (!lock_mode_compatible(lock_get_mode(lock), mode)) && (wait || !(lock_get_wait(lock)))) { - #ifdef WITH_WSREP - int bf_this = wsrep_thd_is_brute_force(trx->mysql_thd); - int bf_other = wsrep_thd_is_brute_force( - lock->trx->mysql_thd); - if ((bf_this && !bf_other) || - (bf_this && bf_other && - wsrep_trx_order_before( - trx->mysql_thd, lock->trx->mysql_thd) - ) - ) { - if (lock->trx->que_state == TRX_QUE_LOCK_WAIT) { - if (wsrep_debug) fprintf(stderr, - "WSREP: BF victim waiting"); - return(lock); - } else { - if (bf_this && bf_other) - wsrep_innobase_kill_one_trx( - (trx_t *)trx, lock->trx, TRUE); - return(lock); - } - } else { - return(lock); - } -#else - return(lock); + if (wsrep_debug) + fprintf(stderr, "WSREP: table lock abort"); + wsrep_kill_victim((trx_t *)trx, (lock_t *)lock); #endif + return(lock); } lock = UT_LIST_GET_PREV(un_member.tab_lock.locks, lock); diff --git a/storage/innobase/rem/rem0rec.c b/storage/innobase/rem/rem0rec.c index 0cee1ffb3c8..44249c3a87e 100644 --- a/storage/innobase/rem/rem0rec.c +++ b/storage/innobase/rem/rem0rec.c @@ -1811,8 +1811,8 @@ wsrep_rec_get_primary_key( const dict_col_t* col = dict_field_get_col(field); data = rec_get_nth_field(rec, offsets, i, &len); - if (key_len + len > ((col->prtype & DATA_NOT_NULL) ? - *buf_len : *buf_len - 1)) { + if (key_len + ((len != UNIV_SQL_NULL) ? len + 1 : 1) > + *buf_len) { fprintf (stderr, "WSREP: FK key len exceeded %lu %lu %lu\n", key_len, len, *buf_len); |