diff options
-rw-r--r-- | mysys/my_default.c | 31 | ||||
-rw-r--r-- | sql/handler.cc | 25 | ||||
-rw-r--r-- | sql/log.cc | 15 | ||||
-rw-r--r-- | sql/mysqld.cc | 4 | ||||
-rw-r--r-- | sql/sql_class.cc | 6 | ||||
-rw-r--r-- | sql/sql_class.h | 13 | ||||
-rw-r--r-- | sql/sql_parse.cc | 23 | ||||
-rw-r--r-- | sql/transaction.cc | 18 | ||||
-rw-r--r-- | sql/wsrep_hton.cc | 199 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 70 | ||||
-rw-r--r-- | sql/wsrep_mysqld.h | 30 | ||||
-rw-r--r-- | sql/wsrep_thd.cc | 41 | ||||
-rw-r--r-- | sql/wsrep_utils.cc | 2 | ||||
-rw-r--r-- | sql/wsrep_var.cc | 2 | ||||
-rw-r--r-- | sql/wsrep_var.h | 2 | ||||
-rw-r--r-- | storage/innobase/handler/ha_innodb.cc | 43 |
16 files changed, 316 insertions, 208 deletions
diff --git a/mysys/my_default.c b/mysys/my_default.c index ff87eb0a0f3..4721382acc2 100644 --- a/mysys/my_default.c +++ b/mysys/my_default.c @@ -92,34 +92,6 @@ static my_bool defaults_already_read= FALSE; /* The only purpose of this global array is to hold full name of my.cnf * which seems to be otherwise unavailable */ char wsrep_defaults_file[FN_REFLEN + 10]={0,}; -/* Command-line only option to start a new wsrep service instance */ -#define WSREP_NEW_CLUSTER1 "--wsrep-new-cluster" -#define WSREP_NEW_CLUSTER2 "--wsrep_new_cluster" -/* This one is set to true when --wsrep-new-cluster is found in the command - * line arguments */ -my_bool wsrep_new_cluster= FALSE; -/* Finds and removes --wsrep-new-cluster from the arguments list. - * Returns true if found. */ -static my_bool find_wsrep_new_cluster (int* argc, char* argv[]) -{ - my_bool ret= FALSE; - int i; - - for (i= *argc - 1; i > 0; i--) - { - if (!strcmp(argv[i], WSREP_NEW_CLUSTER1) || - !strcmp(argv[i], WSREP_NEW_CLUSTER2)) - { - ret= TRUE; - *argc -= 1; - /* preserve the order of remaining arguments */ - memmove(&argv[i], &argv[i + 1], (*argc - i)*sizeof(argv[i])); - argv[*argc]= NULL; - } - } - - return ret; -} #endif /* WITH_WREP */ /* Which directories are searched for options (and in which order) */ @@ -558,9 +530,6 @@ int my_load_defaults(const char *conf_file, const char **groups, init_alloc_root(&alloc, 512, 0, MYF(0)); if ((dirs= init_default_directories(&alloc)) == NULL) goto err; -#ifdef WITH_WSREP - wsrep_new_cluster= find_wsrep_new_cluster(argc, argv[0]); -#endif /* WITH_WSREP */ /* Check if the user doesn't want any default option processing --no-defaults is always the first option diff --git a/sql/handler.cc b/sql/handler.cc index 11ad582a2ff..b060c06150d 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -1303,11 +1303,7 @@ int ha_commit_trans(THD *thd, bool all) Free resources and perform other cleanup even for 'empty' transactions. */ if (is_real_trans) -#ifdef WITH_WSREP - thd->transaction.cleanup(thd); -#else thd->transaction.cleanup(); -#endif /* WITH_WSREP */ DBUG_RETURN(0); } @@ -1388,6 +1384,7 @@ int ha_commit_trans(THD *thd, bool all) status_var_increment(thd->status_var.ha_prepare_count); if (err) #ifdef WITH_WSREP + { if (WSREP(thd) && ht->db_type== DB_TYPE_WSREP) { error= 1; @@ -1397,10 +1394,13 @@ int ha_commit_trans(THD *thd, bool all) my_error(ER_LOCK_DEADLOCK, MYF(0), err); } } - else + else /* not wsrep hton, bail to native mysql behavior */ -#endif +#endif /* WITH_WSREP */ my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); +#ifdef WITH_WSREP + } +#endif /* WITH_WSREP */ if (err) goto err; @@ -1535,12 +1535,7 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans) } /* Free resources and perform other cleanup even for 'empty' transactions. */ if (is_real_trans) -#ifdef WITH_WSREP - thd->transaction.cleanup(thd); -#else - thd->transaction.cleanup(); -#endif /* WITH_WSREP */ - + thd->transaction.cleanup(); DBUG_RETURN(error); } @@ -1614,11 +1609,7 @@ int ha_rollback_trans(THD *thd, bool all) } /* Always cleanup. Even if nht==0. There may be savepoints. */ if (is_real_trans) -#ifdef WITH_WSREP - thd->transaction.cleanup(thd); -#else - thd->transaction.cleanup(); -#endif /* WITH_WSREP */ + thd->transaction.cleanup(); if (all) thd->transaction_rollback_request= FALSE; diff --git a/sql/log.cc b/sql/log.cc index 6dd4aa7038b..d5cafb7de0a 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -548,20 +548,9 @@ IO_CACHE * get_trans_log(THD * thd) bool wsrep_trans_cache_is_empty(THD *thd) { - bool res= TRUE; - - if (thd_sql_command((const THD*) thd) != SQLCOM_SELECT) - res= FALSE; - else - { - binlog_cache_mngr *const cache_mngr= + binlog_cache_mngr *const cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - if (cache_mngr) - { - res= cache_mngr->trx_cache.empty(); - } - } - return res; + return (!cache_mngr || cache_mngr->trx_cache.empty()); } void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end) diff --git a/sql/mysqld.cc b/sql/mysqld.cc index a10dfa1015c..7b825574a7f 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -73,6 +73,7 @@ #include "debug_sync.h" #ifdef WITH_WSREP #include "wsrep_mysqld.h" +#include "wsrep_var.h" #include "wsrep_thd.h" #include "wsrep_sst.h" ulong wsrep_running_threads = 0; // # of currently running wsrep threads @@ -5705,6 +5706,9 @@ int mysqld_main(int argc, char **argv) return 1; } #endif +#ifdef WITH_WSREP + wsrep_filter_new_cluster (&argc, argv); +#endif /* WITH_WSREP */ orig_argc= argc; orig_argv= argv; diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 49fb88f5866..f367ca293c8 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -832,7 +832,7 @@ extern "C" const char *wsrep_thd_exec_mode_str(THD *thd) (!thd) ? "void" : (thd->wsrep_exec_mode == LOCAL_STATE) ? "local" : (thd->wsrep_exec_mode == REPL_RECV) ? "applier" : - (thd->wsrep_exec_mode == TOTAL_ORDER) ? "total order" : + (thd->wsrep_exec_mode == TOTAL_ORDER) ? "total order" : (thd->wsrep_exec_mode == LOCAL_COMMIT) ? "local commit" : "void"; } @@ -896,7 +896,7 @@ extern "C" my_thread_id wsrep_thd_thread_id(THD *thd) } extern "C" wsrep_seqno_t wsrep_thd_trx_seqno(THD *thd) { - return (thd) ? thd->wsrep_trx_meta.gtid.seqno : -1; + return (thd) ? thd->wsrep_trx_meta.gtid.seqno : WSREP_SEQNO_UNDEFINED; } extern "C" query_id_t wsrep_thd_query_id(THD *thd) { @@ -1145,7 +1145,6 @@ THD::THD() wsrep_ws_handle.opaque = NULL; wsrep_retry_counter = 0; wsrep_PA_safe = true; - wsrep_seqno_changed = false; wsrep_retry_query = NULL; wsrep_retry_query_len = 0; wsrep_retry_command = COM_CONNECT; @@ -1552,7 +1551,6 @@ void THD::init(void) wsrep_retry_counter= 0; wsrep_rli= NULL; wsrep_PA_safe= true; - wsrep_seqno_changed= false; wsrep_consistency_check = NO_CONSISTENCY_CHECK; wsrep_mysql_replicated = 0; diff --git a/sql/sql_class.h b/sql/sql_class.h index 81d1a3d4a83..24132fb0913 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -1038,9 +1038,6 @@ struct st_savepoint { /** State of metadata locks before this savepoint was set. */ MDL_savepoint mdl_savepoint; }; -#ifdef WITH_WSREP -void wsrep_cleanup_transaction(THD *thd); // THD.transactions.cleanup calls it -#endif enum xa_states {XA_NOTR=0, XA_ACTIVE, XA_IDLE, XA_PREPARED, XA_ROLLBACK_ONLY}; extern const char *xa_state_names[]; @@ -1984,11 +1981,7 @@ public: */ CHANGED_TABLE_LIST* changed_tables; MEM_ROOT mem_root; // Transaction-life memory allocation pool -#ifdef WITH_WSREP - void cleanup(THD *thd) -#else void cleanup() -#endif { DBUG_ENTER("thd::cleanup"); changed_tables= 0; @@ -2002,11 +1995,6 @@ public: if (!xid_state.rm_error) xid_state.xid.null(); free_root(&mem_root,MYF(MY_KEEP_PREALLOC)); -#ifdef WITH_WSREP - // Todo: convert into a plugin method - // wsrep's post-commit. LOCAL_COMMIT designates wsrep's commit was ok - if (WSREP(thd)) wsrep_cleanup_transaction(thd); -#endif /* WITH_WSREP */ DBUG_VOID_RETURN; } my_bool is_active() @@ -2560,7 +2548,6 @@ public: Relay_log_info* wsrep_rli; bool wsrep_converted_lock_session; wsrep_ws_handle_t wsrep_ws_handle; - bool wsrep_seqno_changed; #ifdef WSREP_PROC_INFO char wsrep_info[128]; /* string for dynamic proc info */ #endif /* WSREP_PROC_INFO */ diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index a9687432849..3abedd40047 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -4629,7 +4629,20 @@ end_with_restore_list: thd->killed= KILL_CONNECTION; thd->print_aborted_warning(3, "RELEASE"); } +#ifdef WITH_WSREP + if (WSREP(thd)) { + + if (thd->wsrep_conflict_state == NO_CONFLICT || + thd->wsrep_conflict_state == REPLAYING) + { + my_ok(thd); + } + } else { +#endif /* WITH_WSREP */ my_ok(thd); +#ifdef WITH_WSREP + } +#endif /* WITH_WSREP */ break; } case SQLCOM_ROLLBACK: @@ -4664,17 +4677,15 @@ end_with_restore_list: /* Disconnect the current client connection. */ if (tx_release) thd->killed= KILL_CONNECTION; - #ifdef WITH_WSREP +#ifdef WITH_WSREP if (WSREP(thd)) { - if (thd->wsrep_conflict_state == NO_CONFLICT || - thd->wsrep_conflict_state == REPLAYING) - { + if (thd->wsrep_conflict_state == NO_CONFLICT) { my_ok(thd); } } else { #endif /* WITH_WSREP */ my_ok(thd); - #ifdef WITH_WSREP +#ifdef WITH_WSREP } #endif /* WITH_WSREP */ break; @@ -5156,7 +5167,7 @@ create_sp_error: if (check_table_access(thd, DROP_ACL, all_tables, FALSE, UINT_MAX, FALSE)) goto error; /* Conditionally writes to binlog. */ - WSREP_TO_ISOLATION_BEGIN(NULL, NULL, NULL) + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) res= mysql_drop_view(thd, first_table, thd->lex->drop_mode); break; } diff --git a/sql/transaction.cc b/sql/transaction.cc index a9236d97633..5195e456432 100644 --- a/sql/transaction.cc +++ b/sql/transaction.cc @@ -145,6 +145,9 @@ bool trans_begin(THD *thd, uint flags) ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); res= test(ha_commit_trans(thd, TRUE)); +#ifdef WITH_WSREP + wsrep_post_commit(thd, TRUE); +#endif /* WITH_WSREP */ } thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); @@ -226,6 +229,9 @@ bool trans_commit(THD *thd) ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); res= ha_commit_trans(thd, TRUE); +#ifdef WITH_WSREP + wsrep_post_commit(thd, TRUE); +#endif /* WITH_WSREP */ /* if res is non-zero, then ha_commit_trans has rolled back the transaction, so the hooks for rollback will be called. @@ -274,6 +280,9 @@ bool trans_commit_implicit(THD *thd) ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); res= test(ha_commit_trans(thd, TRUE)); +#ifdef WITH_WSREP + wsrep_post_commit(thd, TRUE); +#endif /* WITH_WSREP */ } thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); @@ -362,10 +371,14 @@ bool trans_commit_stmt(THD *thd) #endif /* WITH_WSREP */ res= ha_commit_trans(thd, FALSE); if (! thd->in_active_multi_stmt_transaction()) +#ifdef WITH_WSREP { +#endif /* WITH_WSREP */ thd->tx_isolation= (enum_tx_isolation) thd->variables.tx_isolation; - thd->tx_read_only= thd->variables.tx_read_only; +#ifdef WITH_WSREP + wsrep_post_commit(thd, FALSE); } +#endif /* WITH_WSREP */ } /* @@ -781,6 +794,9 @@ bool trans_xa_commit(THD *thd) int r= ha_commit_trans(thd, TRUE); if ((res= test(r))) my_error(r == 1 ? ER_XA_RBROLLBACK : ER_XAER_RMERR, MYF(0)); +#ifdef WITH_WSREP + wsrep_post_commit(thd, TRUE); +#endif /* WITH_WSREP */ } else if (xa_state == XA_PREPARED && thd->lex->xa_opt == XA_NONE) { diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc index d27d8b95b13..24f193e8c7b 100644 --- a/sql/wsrep_hton.cc +++ b/sql/wsrep_hton.cc @@ -33,34 +33,16 @@ enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all); /* - a post-commit cleanup on behalf of wsrep. Can't be a part of hton struct. - Is called by THD::transactions.cleanup() + Cleanup after local transaction commit/rollback, replay or TOI. */ void wsrep_cleanup_transaction(THD *thd) { - if (thd->thread_id == 0) return; - if (thd->wsrep_exec_mode == LOCAL_COMMIT) - { - if (thd->variables.wsrep_on && - thd->wsrep_conflict_state != MUST_REPLAY) - { - if (thd->wsrep_seqno_changed) - { - if (wsrep->post_commit(wsrep, &thd->wsrep_ws_handle)) - { - DBUG_PRINT("wsrep", ("set committed fail")); - WSREP_WARN("set committed fail: %llu %d", - (long long)thd->real_id, thd->get_stmt_da()->status()); - } - } - //else - //WSREP_DEBUG("no trx handle for %s", thd->query()); - thd_binlog_trx_reset(thd); - thd->wsrep_seqno_changed = false; - } - thd->wsrep_exec_mode= LOCAL_STATE; - } - thd->wsrep_ws_handle.trx_id = WSREP_UNDEFINED_TRX_ID; + if (wsrep_emulate_bin_log) thd_binlog_trx_reset(thd); + thd->wsrep_ws_handle.trx_id= WSREP_UNDEFINED_TRX_ID; + thd->wsrep_trx_meta.gtid= WSREP_GTID_UNDEFINED; + thd->wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED; + thd->wsrep_exec_mode= LOCAL_STATE; + return; } /* @@ -68,9 +50,25 @@ void wsrep_cleanup_transaction(THD *thd) */ handlerton *wsrep_hton; + +/* + Registers wsrep hton at commit time if transaction has registered htons + for supported engine types. + + Hton should not be registered for TOTAL_ORDER operations. + + Registration is needed for both LOCAL_MODE and REPL_RECV transactions to run + commit in 2pc so that wsrep position gets properly recorded in storage + engines. + + Note that all hton calls should immediately return for threads that are + in REPL_RECV mode as their states are controlled by wsrep appliers or + replaying code. Only threads in LOCAL_MODE should run wsrep callbacks + from hton methods. +*/ void wsrep_register_hton(THD* thd, bool all) { - if (thd->wsrep_exec_mode == LOCAL_STATE) + if (thd->wsrep_exec_mode != TOTAL_ORDER) { THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt; for (Ha_trx_info *i= trans->ha_list; WSREP(thd) && i; i = i->next()) @@ -91,6 +89,25 @@ void wsrep_register_hton(THD* thd, bool all) } /* + Calls wsrep->post_commit() for locally executed transactions that have + got seqno from provider (must commit) and don't require replaying. + */ +void wsrep_post_commit(THD* thd, bool all) +{ + if (thd->wsrep_exec_mode == LOCAL_COMMIT) + { + DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno != WSREP_SEQNO_UNDEFINED); + if (wsrep->post_commit(wsrep, &thd->wsrep_ws_handle)) + { + DBUG_PRINT("wsrep", ("set committed fail")); + WSREP_WARN("set committed fail: %llu %d", + (long long)thd->real_id, thd->get_stmt_da()->status()); + } + wsrep_cleanup_transaction(thd); + } +} + +/* wsrep exploits binlog's caches even if binlogging itself is not activated. In such case connection close needs calling actual binlog's method. @@ -101,7 +118,13 @@ static int wsrep_close_connection(handlerton* hton, THD* thd) { DBUG_ENTER("wsrep_close_connection"); - if (thd_get_ha_data(thd, binlog_hton) != NULL) + + if (thd->wsrep_exec_mode == REPL_RECV) + { + DBUG_RETURN(0); + } + + if (wsrep_emulate_bin_log && thd_get_ha_data(thd, binlog_hton) != NULL) binlog_hton->close_connection (binlog_hton, thd); DBUG_RETURN(0); } @@ -117,10 +140,17 @@ wsrep_close_connection(handlerton* hton, THD* thd) */ static int wsrep_prepare(handlerton *hton, THD *thd, bool all) { -#ifndef DBUG_OFF - //wsrep_seqno_t old = thd->wsrep_trx_seqno; -#endif DBUG_ENTER("wsrep_prepare"); + + if (thd->wsrep_exec_mode == REPL_RECV) + { + DBUG_RETURN(0); + } + + DBUG_ASSERT(thd->ha_data[wsrep_hton->slot].ha_info[all].is_trx_read_write()); + DBUG_ASSERT(thd->wsrep_exec_mode == LOCAL_STATE); + DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno == WSREP_SEQNO_UNDEFINED); + if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && (thd->variables.wsrep_on && !wsrep_trans_cache_is_empty(thd))) @@ -128,9 +158,6 @@ static int wsrep_prepare(handlerton *hton, THD *thd, bool all) switch (wsrep_run_wsrep_commit(thd, hton, all)) { case WSREP_TRX_OK: - // DBUG_ASSERT(thd->wsrep_trx_seqno > old || - // thd->wsrep_exec_mode == REPL_RECV || - // thd->wsrep_exec_mode == TOTAL_ORDER); break; case WSREP_TRX_ROLLBACK: case WSREP_TRX_ERROR: @@ -142,12 +169,26 @@ static int wsrep_prepare(handlerton *hton, THD *thd, bool all) static int wsrep_savepoint_set(handlerton *hton, THD *thd, void *sv) { + DBUG_ENTER("wsrep_savepoint_set"); + + if (thd->wsrep_exec_mode == REPL_RECV) + { + DBUG_RETURN(0); + } + if (!wsrep_emulate_bin_log) return 0; int rcode = binlog_hton->savepoint_set(binlog_hton, thd, sv); return rcode; } static int wsrep_savepoint_rollback(handlerton *hton, THD *thd, void *sv) { + DBUG_ENTER("wsrep_savepoint_rollback"); + + if (thd->wsrep_exec_mode == REPL_RECV) + { + DBUG_RETURN(0); + } + if (!wsrep_emulate_bin_log) return 0; int rcode = binlog_hton->savepoint_rollback(binlog_hton, thd, sv); return rcode; @@ -156,6 +197,12 @@ static int wsrep_savepoint_rollback(handlerton *hton, THD *thd, void *sv) static int wsrep_rollback(handlerton *hton, THD *thd, bool all) { DBUG_ENTER("wsrep_rollback"); + + if (thd->wsrep_exec_mode == REPL_RECV) + { + DBUG_RETURN(0); + } + mysql_mutex_lock(&thd->LOCK_wsrep_thd); if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && (thd->variables.wsrep_on && thd->wsrep_conflict_state != MUST_REPLAY)) @@ -166,25 +213,54 @@ static int wsrep_rollback(handlerton *hton, THD *thd, bool all) WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", (long long)thd->real_id, thd->query()); } + wsrep_cleanup_transaction(thd); } - - int rcode = 0; - if (!wsrep_emulate_bin_log) - { - if (all) thd_binlog_trx_reset(thd); - } - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - DBUG_RETURN(rcode); + DBUG_RETURN(0); } int wsrep_commit(handlerton *hton, THD *thd, bool all) { DBUG_ENTER("wsrep_commit"); + if (thd->wsrep_exec_mode == REPL_RECV) + { + DBUG_RETURN(0); + } + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && + (thd->variables.wsrep_on && thd->wsrep_conflict_state != MUST_REPLAY)) + { + if (thd->wsrep_exec_mode == LOCAL_COMMIT) + { + DBUG_ASSERT(thd->ha_data[wsrep_hton->slot].ha_info[all].is_trx_read_write()); + /* + Call to wsrep->post_commit() (moved to wsrep_post_commit()) must + be done only after commit has done for all involved htons. + */ + DBUG_PRINT("wsrep", ("commit")); + } + else + { + /* + Transaction didn't go through wsrep->pre_commit() so just roll back + possible changes to clean state. + */ + if (wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle)) + { + DBUG_PRINT("wsrep", ("setting rollback fail")); + WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", + (long long)thd->real_id, thd->query()); + } + wsrep_cleanup_transaction(thd); + } + } + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); DBUG_RETURN(0); } + extern Rpl_filter* binlog_filter; extern my_bool opt_log_slave_updates; @@ -310,9 +386,6 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all) if (data_len == 0) { - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - thd->wsrep_exec_mode = LOCAL_COMMIT; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); if (thd->get_stmt_da()->is_ok() && thd->get_stmt_da()->affected_rows() > 0 && !binlog_filter->is_on()) @@ -354,14 +427,13 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all) 0ULL : WSREP_FLAG_PA_UNSAFE), &thd->wsrep_trx_meta); - switch (rcode) - { - case WSREP_TRX_MISSING: + if (rcode == WSREP_TRX_MISSING) { WSREP_WARN("Transaction missing in provider, thd: %ld, SQL: %s", thd->thread_id, thd->query()); - rcode = WSREP_OK; - break; - case WSREP_BF_ABORT: + rcode = WSREP_TRX_FAIL; + } else if (rcode == WSREP_BF_ABORT) { + WSREP_DEBUG("thd %lu seqno %lld BF aborted by provider, will replay", + thd->thread_id, (long long)thd->wsrep_trx_meta.gtid.seqno); mysql_mutex_lock(&thd->LOCK_wsrep_thd); thd->wsrep_conflict_state = MUST_REPLAY; mysql_mutex_unlock(&thd->LOCK_wsrep_thd); @@ -370,10 +442,6 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all) WSREP_DEBUG("replaying increased: %d, thd: %lu", wsrep_replaying, thd->thread_id); mysql_mutex_unlock(&LOCK_wsrep_replaying); - break; - default: - thd->wsrep_seqno_changed = true; - break; } } else { WSREP_ERROR("I/O error reading from thd's binlog iocache: " @@ -385,7 +453,24 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all) mysql_mutex_lock(&thd->LOCK_wsrep_thd); switch(rcode) { case 0: - thd->wsrep_exec_mode = LOCAL_COMMIT; + /* + About MUST_ABORT: We assume that even if thd conflict state was set + to MUST_ABORT, underlying transaction was not rolled back or marked + as deadlock victim in QUERY_COMMITTING state. Conflict state is + set to NO_CONFLICT and commit proceeds as usual. + */ + if (thd->wsrep_conflict_state == MUST_ABORT) + thd->wsrep_conflict_state= NO_CONFLICT; + + if (thd->wsrep_conflict_state != NO_CONFLICT) + { + WSREP_WARN("thd %lu seqno %lld: conflict state %d after post commit", + thd->thread_id, + (long long)thd->wsrep_trx_meta.gtid.seqno, + thd->wsrep_conflict_state); + } + thd->wsrep_exec_mode= LOCAL_COMMIT; + DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno != WSREP_SEQNO_UNDEFINED); /* Override XID iff it was generated by mysql */ if (thd->transaction.xid_state.xid.get_my_xid()) { @@ -394,10 +479,10 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all) thd->wsrep_trx_meta.gtid.seqno); } DBUG_PRINT("wsrep", ("replicating commit success")); - break; - case WSREP_TRX_FAIL: case WSREP_BF_ABORT: + DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno != WSREP_SEQNO_UNDEFINED); + case WSREP_TRX_FAIL: WSREP_DEBUG("commit failed for reason: %d", rcode); DBUG_PRINT("wsrep", ("replicating commit fail")); diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 6cbbfc47ad3..a842aa84fc9 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -697,8 +697,36 @@ void wsrep_stop_replication(THD *thd) return; } - -extern my_bool wsrep_new_cluster; +/* This one is set to true when --wsrep-new-cluster is found in the command + * line arguments */ +static my_bool wsrep_new_cluster= FALSE; +#define WSREP_NEW_CLUSTER "--wsrep-new-cluster" +/* Finds and hides --wsrep-new-cluster from the arguments list + * by moving it to the end of the list and decrementing argument count */ +void wsrep_filter_new_cluster (int* argc, char* argv[]) +{ + int i; + for (i= *argc - 1; i > 0; i--) + { + /* make a copy of the argument to convert possible underscores to hyphens. + * the copy need not to be longer than WSREP_NEW_CLUSTER option */ + char arg[sizeof(WSREP_NEW_CLUSTER) + 2]= { 0, }; + strncpy(arg, argv[i], sizeof(arg) - 1); + char* underscore; + while (NULL != (underscore= strchr(arg, '_'))) *underscore= '-'; + + if (!strcmp(arg, WSREP_NEW_CLUSTER)) + { + wsrep_new_cluster= TRUE; + *argc -= 1; + /* preserve the order of remaining arguments AND + * preserve the original argument pointers - just in case */ + char* wnc= argv[i]; + memmove(&argv[i], &argv[i + 1], (*argc - i)*sizeof(argv[i])); + argv[*argc]= wnc; /* this will be invisible to the rest of the program */ + } + } +} bool wsrep_start_replication() { @@ -1187,15 +1215,16 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, static void wsrep_TOI_end(THD *thd) { wsrep_status_t ret; wsrep_to_isolation--; + WSREP_DEBUG("TO END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), - thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void") - if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) { - WSREP_DEBUG("TO END: %lld", (long long)wsrep_thd_trx_seqno(thd)); - } - else { - WSREP_WARN("TO isolation end failed for: %d, sql: %s", - ret, (thd->query()) ? thd->query() : "void"); - } + thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void"); + if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) { + WSREP_DEBUG("TO END: %lld", (long long)wsrep_thd_trx_seqno(thd)); + } + else { + WSREP_WARN("TO isolation end failed for: %d, sql: %s", + ret, (thd->query()) ? thd->query() : "void"); + } } static int wsrep_RSU_begin(THD *thd, char *db_, char *table_) @@ -1266,14 +1295,20 @@ static void wsrep_RSU_end(THD *thd) return; } thd->variables.wsrep_on = 1; - return; } int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, const TABLE_LIST* table_list) { + + /* + No isolation for applier or replaying threads. + */ + if (thd->wsrep_exec_mode == REPL_RECV) return 0; + int ret= 0; mysql_mutex_lock(&thd->LOCK_wsrep_thd); + if (thd->wsrep_conflict_state == MUST_ABORT) { WSREP_INFO("thread: %lu, %s has been aborted due to multi-master conflict", @@ -1283,6 +1318,9 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, } mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + DBUG_ASSERT(thd->wsrep_exec_mode == LOCAL_STATE); + DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno == WSREP_SEQNO_UNDEFINED); + if (wsrep_debug && thd->mdl_context.has_locks()) { WSREP_DEBUG("thread holds MDL locks at TI begin: %s %lu", @@ -1309,14 +1347,16 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, return ret; } -void wsrep_to_isolation_end(THD *thd) { - if (thd->wsrep_exec_mode==TOTAL_ORDER) +void wsrep_to_isolation_end(THD *thd) +{ + if (thd->wsrep_exec_mode == TOTAL_ORDER) { switch(wsrep_OSU_method_options) { - case WSREP_OSU_TOI: return wsrep_TOI_end(thd); - case WSREP_OSU_RSU: return wsrep_RSU_end(thd); + case WSREP_OSU_TOI: wsrep_TOI_end(thd); break; + case WSREP_OSU_RSU: wsrep_RSU_end(thd); break; } + wsrep_cleanup_transaction(thd); } } diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index 276f2a7b580..c5647f8d984 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -32,8 +32,9 @@ class THD; LOCAL_STATE, REPL_RECV, TOTAL_ORDER, - LOCAL_COMMIT, + LOCAL_COMMIT }; + enum wsrep_query_state { QUERY_IDLE, QUERY_EXEC, @@ -109,24 +110,22 @@ extern long wsrep_local_index; extern const char* wsrep_provider_name; extern const char* wsrep_provider_version; extern const char* wsrep_provider_vendor; -extern int wsrep_show_status(THD *thd, SHOW_VAR *var, char *buff); -extern void wsrep_free_status(THD *thd); +int wsrep_show_status(THD *thd, SHOW_VAR *var, char *buff); +void wsrep_free_status(THD *thd); -extern int wsrep_init_vars(); -extern void wsrep_provider_init (const char* provider); -extern void wsrep_start_position_init (const char* position); -extern void wsrep_sst_auth_init (const char* auth); +/* Filters out --wsrep-new-cluster oprtion from argv[] + * should be called in the very beginning of main() */ +void wsrep_filter_new_cluster (int* argc, char* argv[]); -extern int wsrep_init(); -extern void wsrep_deinit(); -extern void wsrep_recover(); -extern bool wsrep_before_SE(); // initialize wsrep before storage - // engines (true) or after (false) +int wsrep_init(); +void wsrep_deinit(); +void wsrep_recover(); +bool wsrep_before_SE(); // initialize wsrep before storage + // engines (true) or after (false) /* wsrep initialization sequence at startup * @param before wsrep_before_SE() value */ -extern void wsrep_init_startup(bool before); - +void wsrep_init_startup(bool before); @@ -238,6 +237,7 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all); class Ha_trx_info; struct THD_TRANS; void wsrep_register_hton(THD* thd, bool all); +void wsrep_post_commit(THD* thd, bool all); void wsrep_brute_force_killer(THD *thd); int wsrep_hire_brute_force_killer(THD *thd, uint64_t trx_id); @@ -285,7 +285,7 @@ struct TABLE_LIST; int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, const TABLE_LIST* table_list); void wsrep_to_isolation_end(THD *thd); - +void wsrep_cleanup_transaction(THD *thd); int wsrep_to_buf_helper( THD* thd, const char *query, uint query_len, uchar** buf, int* buf_len); int wsrep_create_sp(THD *thd, uchar** buf, int* buf_len); diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc index 35f5a9bca6a..c25281b7a77 100644 --- a/sql/wsrep_thd.cc +++ b/sql/wsrep_thd.cc @@ -93,8 +93,6 @@ static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow) thd->net.vio= 0; thd->clear_error(); - thd->variables.option_bits|= OPTION_NOT_AUTOCOMMIT; - shadow->tx_isolation = thd->variables.tx_isolation; thd->variables.tx_isolation = ISO_READ_COMMITTED; thd->tx_isolation = ISO_READ_COMMITTED; @@ -140,6 +138,11 @@ void wsrep_replay_transaction(THD *thd) (long long)wsrep_thd_trx_seqno(thd)); struct wsrep_thd_shadow shadow; wsrep_prepare_bf_thd(thd, &shadow); + + /* From trans_begin() */ + thd->variables.option_bits|= OPTION_BEGIN; + thd->server_status|= SERVER_STATUS_IN_TRANS; + int rcode = wsrep->replay_trx(wsrep, &thd->wsrep_ws_handle, (void *)thd); @@ -161,6 +164,14 @@ void wsrep_replay_transaction(THD *thd) { WSREP_WARN("replay ok, thd has reported status"); } + else if (thd->get_stmt_da()->is_set()) + { + if (thd->get_stmt_da()->status() != Diagnostics_area::DA_OK) + { + WSREP_WARN("replay ok, thd has error status %d", + thd->get_stmt_da()->status()); + } + } else { my_ok(thd); @@ -186,6 +197,9 @@ void wsrep_replay_transaction(THD *thd) unireg_abort(1); break; } + + wsrep_cleanup_transaction(thd); + mysql_mutex_lock(&LOCK_wsrep_replaying); wsrep_replaying--; WSREP_DEBUG("replaying decreased: %d, thd: %lu", @@ -204,6 +218,10 @@ static void wsrep_replication_process(THD *thd) struct wsrep_thd_shadow shadow; wsrep_prepare_bf_thd(thd, &shadow); + /* From trans_begin() */ + thd->variables.option_bits|= OPTION_BEGIN; + thd->server_status|= SERVER_STATUS_IN_TRANS; + rcode = wsrep->recv(wsrep, (void *)thd); DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode)); @@ -371,21 +389,24 @@ void wsrep_create_rollbacker() extern "C" int wsrep_thd_is_brute_force(void *thd_ptr) { + /* + Brute force: + Appliers and replaying are running in REPL_RECV mode. TOI statements + in TOTAL_ORDER mode. Locally committing transaction that has got + past wsrep->pre_commit() without error is running in LOCAL_COMMIT mode. + + Everything else is running in LOCAL_STATE and should not be considered + brute force. + */ if (thd_ptr) { switch (((THD *)thd_ptr)->wsrep_exec_mode) { - case LOCAL_STATE: - { - if (((THD *)thd_ptr)->wsrep_conflict_state== REPLAYING) - { - return 1; - } - return 0; - } + case LOCAL_STATE: return 0; case REPL_RECV: return 1; case TOTAL_ORDER: return 2; case LOCAL_COMMIT: return 3; } } + DBUG_ASSERT(0); return 0; } diff --git a/sql/wsrep_utils.cc b/sql/wsrep_utils.cc index b01bdaaa15e..90af2fb8156 100644 --- a/sql/wsrep_utils.cc +++ b/sql/wsrep_utils.cc @@ -14,7 +14,7 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -//! @file declares symbols private to wsrep integration layer +//! @file some utility functions and classes not directly related to replication #ifndef _GNU_SOURCE #define _GNU_SOURCE // POSIX_SPAWN_USEVFORK flag diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc index c3c795a291d..3f6b621a9f5 100644 --- a/sql/wsrep_var.cc +++ b/sql/wsrep_var.cc @@ -37,7 +37,7 @@ const char* wsrep_node_name = 0; const char* wsrep_node_address = 0; const char* wsrep_node_incoming_address = 0; const char* wsrep_start_position = 0; -ulong wsrep_OSU_method_options; +ulong wsrep_OSU_method_options; int wsrep_init_vars() { diff --git a/sql/wsrep_var.h b/sql/wsrep_var.h index 1dd0beac3e3..b69f670a14b 100644 --- a/sql/wsrep_var.h +++ b/sql/wsrep_var.h @@ -25,6 +25,8 @@ class sys_var; class set_var; class THD; +int wsrep_init_vars(); + #define CHECK_ARGS (sys_var *self, THD* thd, set_var *var) #define UPDATE_ARGS (sys_var *self, THD* thd, enum_var_type type) #define DEFAULT_ARGS (THD* thd, enum_var_type var_type) diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index 554acb1dcd1..32c9ea3aa6a 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -9403,22 +9403,6 @@ wsrep_append_key( DBUG_RETURN(0); } -ibool -wsrep_is_cascding_foreign_key_parent( - dict_table_t* table, /*!< in: InnoDB table */ - dict_index_t* index /*!< in: InnoDB index */ -) { - // return referenced_by_foreign_key(); - dict_foreign_t* fk = dict_table_get_referenced_constraint(table, index); - if (fk && - (fk->type & DICT_FOREIGN_ON_UPDATE_CASCADE || - fk->type & DICT_FOREIGN_ON_UPDATE_SET_NULL) - ) { - return TRUE; - } - return FALSE; -} - int ha_innobase::wsrep_append_keys( /*==================*/ @@ -9467,19 +9451,30 @@ ha_innobase::wsrep_append_keys( uint i; for (i=0; i<table->s->keys; ++i) { - uint len; - char keyval0[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'}; - char keyval1[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'}; - char *key0 = &keyval0[1]; - char *key1 = &keyval1[1]; - KEY *key_info = table->key_info + i; - ibool is_null; + uint len; + char keyval0[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'}; + char keyval1[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'}; + char* key0 = &keyval0[1]; + char* key1 = &keyval1[1]; + KEY* key_info = table->key_info + i; + ibool is_null; + + dict_index_t* idx = innobase_get_index(i); + dict_table_t* tab = (idx) ? idx->table : NULL; keyval0[0] = (char)i; keyval1[0] = (char)i; + if (!tab) { + WSREP_WARN("MySQL-InnoDB key mismatch %s %s", + table->s->table_name.str, + key_info->name); + } if (key_info->flags & HA_NOSAME || - referenced_by_foreign_key()) { + ((tab && + dict_table_get_referenced_constraint(tab, idx)) || + (!tab && referenced_by_foreign_key()))) { + if (key_info->flags & HA_NOSAME || shared) key_appended = true; |