diff options
Diffstat (limited to 'sql/sql_insert.cc')
-rw-r--r-- | sql/sql_insert.cc | 66 |
1 files changed, 42 insertions, 24 deletions
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index f5e4185db92..785d7c12bd2 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -82,6 +82,10 @@ #include "debug_sync.h" +#ifdef WITH_WSREP +#include "wsrep_trans_observer.h" /* wsrep_start_transction() */ +#endif /* WITH_WSREP */ + #ifndef EMBEDDED_LIBRARY static bool delayed_get_table(THD *thd, MDL_request *grl_protection_request, TABLE_LIST *table_list); @@ -3934,10 +3938,13 @@ bool select_insert::prepare_eof() DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'", trans_table, table->file->table_type())); - error= (IF_WSREP((thd->wsrep_conflict_state == MUST_ABORT || - thd->wsrep_conflict_state == CERT_FAILURE) ? -1 :, ) - (thd->locked_tables_mode <= LTM_LOCK_TABLES ? - table->file->ha_end_bulk_insert() : 0)); +#ifdef WITH_WSREP + error= (thd->wsrep_cs().current_error()) ? -1 : + (thd->locked_tables_mode <= LTM_LOCK_TABLES) ? +#else + error= (thd->locked_tables_mode <= LTM_LOCK_TABLES) ? +#endif /* WITH_WSREP */ + table->file->ha_end_bulk_insert() : 0; if (likely(!error) && unlikely(thd->is_error())) error= thd->get_stmt_da()->sql_errno(); @@ -4534,9 +4541,16 @@ select_create::binlog_show_create_table(TABLE **tables, uint count) /* suppress_use */ FALSE, errcode); } - - ha_fake_trx_id(thd); - +#ifdef WITH_WSREP + if (thd->wsrep_trx().active()) + { + WSREP_DEBUG("transaction already started for CTAS"); + } + else + { + wsrep_start_transaction(thd, thd->wsrep_next_trx_id()); + } +#endif return result; } @@ -4594,10 +4608,18 @@ bool select_create::send_eof() if (!table->s->tmp_table) { #ifdef WITH_WSREP - if (WSREP_ON) + if (WSREP(thd)) { + if (thd->wsrep_trx_id() == WSREP_UNDEFINED_TRX_ID) + { + wsrep_start_transaction(thd, thd->wsrep_next_trx_id()); + } + DBUG_ASSERT(thd->wsrep_trx_id() != WSREP_UNDEFINED_TRX_ID); + WSREP_DEBUG("CTAS key append for trx: %lu thd %llu query %lld ", + thd->wsrep_trx_id(), thd->thread_id, thd->query_id); + /* - append table level exclusive key for CTAS + append table level exclusive key for CTAS */ wsrep_key_arr_t key_arr= {0, 0}; wsrep_prepare_keys_for_isolation(thd, @@ -4605,38 +4627,34 @@ bool select_create::send_eof() create_table->table_name.str, table_list, &key_arr); - int rcode = wsrep->append_key( - wsrep, - &thd->wsrep_ws_handle, - key_arr.keys, //&wkey, - key_arr.keys_len, - WSREP_KEY_EXCLUSIVE, - false); + int rcode= wsrep_thd_append_key(thd, key_arr.keys, key_arr.keys_len, + WSREP_SERVICE_KEY_EXCLUSIVE); wsrep_keys_free(&key_arr); - if (rcode) { + if (rcode) + { DBUG_PRINT("wsrep", ("row key failed: %d", rcode)); WSREP_ERROR("Appending table key for CTAS failed: %s, %d", (wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void", rcode); - return true; + DBUG_RETURN(true); } /* If commit fails, we should be able to reset the OK status. */ - thd->get_stmt_da()->set_overwrite_status(TRUE); + thd->get_stmt_da()->set_overwrite_status(true); } #endif /* WITH_WSREP */ trans_commit_stmt(thd); if (!(thd->variables.option_bits & OPTION_GTID_BEGIN)) trans_commit_implicit(thd); #ifdef WITH_WSREP - if (WSREP_ON) + if (WSREP(thd)) { thd->get_stmt_da()->set_overwrite_status(FALSE); mysql_mutex_lock(&thd->LOCK_thd_data); - if (thd->wsrep_conflict_state != NO_CONFLICT) + if (wsrep_current_error(thd)) { - WSREP_DEBUG("select_create commit failed, thd: %lld err: %d %s", - (longlong) thd->thread_id, thd->wsrep_conflict_state, - thd->query()); + WSREP_DEBUG("select_create commit failed, thd: %llu err: %s %s", + thd->thread_id, + wsrep_thd_transaction_state_str(thd), WSREP_QUERY(thd)); mysql_mutex_unlock(&thd->LOCK_thd_data); abort_result_set(); DBUG_RETURN(true); |