diff options
Diffstat (limited to 'sql/handler.cc')
-rw-r--r-- | sql/handler.cc | 165 |
1 files changed, 164 insertions, 1 deletions
diff --git a/sql/handler.cc b/sql/handler.cc index 07dc0cf04a9..657cb01cbc8 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -51,6 +51,10 @@ #include "../storage/maria/ha_maria.h" #endif +#ifdef WITH_WSREP +#include "wsrep_mysqld.h" +#include "wsrep_xid.h" +#endif /* While we have legacy_db_type, we have this array to check for dups and to find handlerton from legacy_db_type. @@ -1169,10 +1173,27 @@ int ha_prepare(THD *thd) { if ((err= ht->prepare(ht, thd, all))) { +#ifdef WITH_WSREP + if (WSREP(thd) && ht->db_type== DB_TYPE_WSREP) + { + error= 1; + /* avoid sending error, if we need to replay */ + if (thd->wsrep_conflict_state!= MUST_REPLAY) + { + my_error(ER_LOCK_DEADLOCK, MYF(0), err); + } + } + else + { + /* not wsrep hton, bail to native mysql behavior */ +#endif my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); ha_rollback_trans(thd, all); error=1; break; +#ifdef WITH_WSREP + } +#endif } } else @@ -1368,7 +1389,12 @@ int ha_commit_trans(THD *thd, bool all) mdl_request.init(MDL_key::COMMIT, "", "", MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT); +#ifdef WITH_WSREP + if (!WSREP(thd) && + thd->mdl_context.acquire_lock(&mdl_request, +#else if (thd->mdl_context.acquire_lock(&mdl_request, +#endif /* WITH_WSREP */ thd->variables.lock_wait_timeout)) { ha_rollback_trans(thd, all); @@ -1415,7 +1441,33 @@ int ha_commit_trans(THD *thd, bool all) err= ht->prepare(ht, thd, all); status_var_increment(thd->status_var.ha_prepare_count); if (err) - my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); +#ifdef WITH_WSREP + { + if (WSREP(thd) && ht->db_type== DB_TYPE_WSREP) + { + error= 1; + switch (err) + { + case WSREP_TRX_SIZE_EXCEEDED: + /* give user size exeeded erro from wsrep_api.h */ + my_error(ER_ERROR_DURING_COMMIT, MYF(0), WSREP_SIZE_EXCEEDED); + break; + case WSREP_TRX_CERT_FAIL: + case WSREP_TRX_ERROR: + /* avoid sending error, if we need to replay */ + if (thd->wsrep_conflict_state!= MUST_REPLAY) + { + my_error(ER_LOCK_DEADLOCK, MYF(0), err); + } + } + } + else + /* not wsrep hton, bail to native mysql behavior */ +#endif /* WITH_WSREP */ + my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); +#ifdef WITH_WSREP + } +#endif /* WITH_WSREP */ if (err) goto err; @@ -1426,6 +1478,13 @@ int ha_commit_trans(THD *thd, bool all) DEBUG_SYNC(thd, "ha_commit_trans_after_prepare"); DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE();); +#ifdef WITH_WSREP + if (!error && wsrep_is_wsrep_xid(&thd->transaction.xid_state.xid)) + { + // xid was rewritten by wsrep + xid= wsrep_xid_seqno(thd->transaction.xid_state.xid); + } +#endif // WITH_WSREP if (!is_real_trans) { error= commit_one_phase_2(thd, all, trans, is_real_trans); @@ -1643,6 +1702,11 @@ int ha_rollback_trans(THD *thd, bool all) { // cannot happen my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err); error=1; +#ifdef WITH_WSREP + WSREP_WARN("handlerton rollback failed, thd %lu %lld conf %d SQL %s", + thd->thread_id, thd->query_id, thd->wsrep_conflict_state, + thd->query()); +#endif /* WITH_WSREP */ } status_var_increment(thd->status_var.ha_rollback_count); ha_info_next= ha_info->next(); @@ -1830,7 +1894,13 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin, got, hton_name(hton)->str); for (int i=0; i < got; i ++) { +#ifdef WITH_WSREP + my_xid x=(wsrep_is_wsrep_xid(&info->list[i]) ? + wsrep_xid_seqno(info->list[i]) : + info->list[i].get_my_xid()); +#else my_xid x=info->list[i].get_my_xid(); +#endif /* WITH_WSREP */ if (!x) // not "mine" - that is generated by external TM { #ifndef DBUG_OFF @@ -3158,7 +3228,12 @@ int handler::update_auto_increment() variables->auto_increment_increment); auto_inc_intervals_count++; /* Row-based replication does not need to store intervals in binlog */ +#ifdef WITH_WSREP + if (((WSREP(thd) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()) && + !thd->is_current_stmt_binlog_format_row()) +#else if (mysql_bin_log.is_open() && !thd->is_current_stmt_binlog_format_row()) +#endif /* WITH_WSREP */ thd->auto_inc_intervals_in_cur_stmt_for_binlog.append(auto_inc_interval_for_cur_row.minimum(), auto_inc_interval_for_cur_row.values(), variables->auto_increment_increment); @@ -5783,7 +5858,13 @@ static bool check_table_binlog_row_based(THD *thd, TABLE *table) return (thd->is_current_stmt_binlog_format_row() && table->s->cached_row_logging_check && (thd->variables.option_bits & OPTION_BIN_LOG) && +#ifdef WITH_WSREP + /* applier and replayer should not binlog */ + ((WSREP_EMULATE_BINLOG(thd) && (thd->wsrep_exec_mode != REPL_RECV)) || + mysql_bin_log.is_open())); +#else mysql_bin_log.is_open()); +#endif } @@ -5883,6 +5964,30 @@ static int binlog_log_row(TABLE* table, bool error= 0; THD *const thd= table->in_use; +#ifdef WITH_WSREP + /* only InnoDB tables will be replicated through binlog emulation */ + if (WSREP_EMULATE_BINLOG(thd) && + table->file->ht->db_type != DB_TYPE_INNODB && + !(table->file->ht->db_type == DB_TYPE_PARTITION_DB && + (((ha_partition*)(table->file))->wsrep_db_type() == DB_TYPE_INNODB))) + { + return 0; + } + + /* enforce wsrep_max_ws_rows */ + if (table->s->tmp_table == NO_TMP_TABLE && WSREP(thd)) + { + thd->wsrep_affected_rows++; + if (wsrep_max_ws_rows && + thd->wsrep_exec_mode != REPL_RECV && + thd->wsrep_affected_rows > wsrep_max_ws_rows) + { + trans_rollback_stmt(thd) || trans_rollback(thd); + my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0)); + return ER_ERROR_DURING_COMMIT; + } + } +#endif /* WITH_WSREP */ if (check_table_binlog_row_based(thd, table)) { MY_BITMAP cols; @@ -6214,6 +6319,64 @@ void handler::set_lock_type(enum thr_lock_type lock) table->reginfo.lock_type= lock; } +#ifdef WITH_WSREP +/** + @details + This function makes the storage engine to force the victim transaction + to abort. Currently, only innodb has this functionality, but any SE + implementing the wsrep API should provide this service to support + multi-master operation. + + @param bf_thd brute force THD asking for the abort + @param victim_thd victim THD to be aborted + + @return + always 0 +*/ + +int ha_wsrep_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal) +{ + DBUG_ENTER("ha_wsrep_abort_transaction"); + if (!WSREP(bf_thd) && + !(bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU && + bf_thd->wsrep_exec_mode == TOTAL_ORDER)) { + DBUG_RETURN(0); + } + + handlerton *hton= installed_htons[DB_TYPE_INNODB]; + if (hton && hton->wsrep_abort_transaction) + { + hton->wsrep_abort_transaction(hton, bf_thd, victim_thd, signal); + } + else + { + WSREP_WARN("cannot abort InnoDB transaction"); + } + + DBUG_RETURN(0); +} + +void ha_wsrep_fake_trx_id(THD *thd) +{ + DBUG_ENTER("ha_wsrep_fake_trx_id"); + if (!WSREP(thd)) + { + DBUG_VOID_RETURN; + } + + handlerton *hton= installed_htons[DB_TYPE_INNODB]; + if (hton && hton->wsrep_fake_trx_id) + { + hton->wsrep_fake_trx_id(hton, thd); + } + else + { + WSREP_WARN("cannot get fake InnoDB transaction ID"); + } + + DBUG_VOID_RETURN; +} +#endif /* WITH_WSREP */ #ifdef TRANS_LOG_MGM_EXAMPLE_CODE /* Example of transaction log management functions based on assumption that logs |