diff options
-rw-r--r-- | sql/sql_load.cc | 64 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 51 | ||||
-rw-r--r-- | sql/wsrep_mysqld.h | 9 | ||||
m--------- | wsrep-lib | 0 |
4 files changed, 31 insertions, 93 deletions
diff --git a/sql/sql_load.cc b/sql/sql_load.cc index c95ef72a308..da5356ffb4b 100644 --- a/sql/sql_load.cc +++ b/sql/sql_load.cc @@ -100,41 +100,39 @@ public: #define PUSH(A) *(stack_pos++)=(A) #ifdef WITH_WSREP -/** If requested by wsrep_load_data_splitting, commit and restart -the transaction after every 10,000 inserted rows. */ - -static bool wsrep_load_data_split(THD *thd, const TABLE *table, - const COPY_INFO &info) +/** If requested by wsrep_load_data_splitting and streaming replication is + not enabled, replicate a streaming fragment every 10,000 rows.*/ +class Wsrep_load_data_split { - DBUG_ENTER("wsrep_load_data_split"); - - if (!wsrep_load_data_splitting || !WSREP(thd) - || !info.records || (info.records % 10000) - || !thd->transaction.stmt.ha_list - || thd->transaction.stmt.ha_list->ht() != binlog_hton - || !thd->transaction.stmt.ha_list->next() - || thd->transaction.stmt.ha_list->next()->next()) - DBUG_RETURN(false); - - if (handlerton* hton= thd->transaction.stmt.ha_list->next()->ht()) +public: + Wsrep_load_data_split(THD *thd) + : m_thd(thd) + , m_load_data_splitting(wsrep_load_data_splitting) + , m_fragment_unit(thd->wsrep_trx().streaming_context().fragment_unit()) + , m_fragment_size(thd->wsrep_trx().streaming_context().fragment_size()) { - if (!(hton->flags & HTON_WSREP_REPLICATION)) - DBUG_RETURN(false); - WSREP_DEBUG("intermediate transaction commit in LOAD DATA"); - wsrep_tc_log_commit(thd); - table->file->extra(HA_EXTRA_FAKE_START_STMT); + if (WSREP(m_thd) && m_load_data_splitting) + { + /* Override streaming settings with backward compatible values for + load data splitting */ + m_thd->wsrep_cs().streaming_params(wsrep::streaming_context::row, 10000); + } } - DBUG_RETURN(false); -} -# define WSREP_LOAD_DATA_SPLIT(thd,table,info) \ - if (wsrep_load_data_split(thd,table,info)) \ - { \ - table->auto_increment_field_not_null= FALSE; \ - DBUG_RETURN(1); \ + ~Wsrep_load_data_split() + { + if (WSREP(m_thd) && m_load_data_splitting) + { + /* Restore original settings */ + m_thd->wsrep_cs().streaming_params(m_fragment_unit, m_fragment_size); + } } -#else /* WITH_WSREP */ -#define WSREP_LOAD_DATA_SPLIT(thd,table,info) /* empty */ +private: + THD *m_thd; + my_bool m_load_data_splitting; + enum wsrep::streaming_context::fragment_unit m_fragment_unit; + size_t m_fragment_size; +}; #endif /* WITH_WSREP */ class READ_INFO: public Load_data_param @@ -354,6 +352,9 @@ int mysql_load(THD *thd, const sql_exchange *ex, TABLE_LIST *table_list, bool transactional_table __attribute__((unused)); DBUG_ENTER("mysql_load"); +#ifdef WITH_WSREP + Wsrep_load_data_split wsrep_load_data_split(thd); +#endif /* WITH_WSREP */ /* Bug #34283 mysqlbinlog leaves tmpfile after termination if binlog contains @@ -1005,7 +1006,6 @@ read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, DBUG_RETURN(-1); } - WSREP_LOAD_DATA_SPLIT(thd, table, info); err= write_record(thd, table, &info); table->auto_increment_field_not_null= FALSE; if (err) @@ -1148,7 +1148,6 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, DBUG_RETURN(-1); } - WSREP_LOAD_DATA_SPLIT(thd, table, info); err= write_record(thd, table, &info); table->auto_increment_field_not_null= FALSE; if (err) @@ -1271,7 +1270,6 @@ read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, DBUG_RETURN(-1); } - WSREP_LOAD_DATA_SPLIT(thd, table, info); err= write_record(thd, table, &info); table->auto_increment_field_not_null= false; if (err) diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 401b40590df..06e713ceda0 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -2499,57 +2499,6 @@ int wsrep_ordered_commit_if_no_binlog(THD* thd, bool all) return 0; } -wsrep_status_t wsrep_tc_log_commit(THD* thd) -{ - int cookie; - my_xid xid= thd->transaction.xid_state.xid.get_my_xid(); - - DBUG_ASSERT(thd->lex->sql_command == SQLCOM_LOAD); - if (wsrep_before_commit(thd, true)) - { - WSREP_DEBUG("wsrep_tc_log_commit: wsrep_before_commit failed %llu", - thd->thread_id); - return WSREP_TRX_FAIL; - } - cookie= tc_log->log_and_order(thd, xid, 1, false, true); - if (wsrep_after_commit(thd, true)) - { - WSREP_DEBUG("wsrep_tc_log_commit: wsrep_after_commit failed %llu", - thd->thread_id); - return WSREP_TRX_FAIL; - } - if (!cookie) - { - WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie); - return WSREP_TRX_FAIL; - } - if (tc_log->unlog(cookie, xid)) - { - WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie); - return WSREP_TRX_FAIL; - } - - if (wsrep_after_statement(thd)) - { - return WSREP_TRX_FAIL; - } - /* Set wsrep transaction id if not set. */ - if (thd->wsrep_trx_id() == WSREP_UNDEFINED_TRX_ID) - { - if (thd->wsrep_next_trx_id() == WSREP_UNDEFINED_TRX_ID) - { - thd->set_wsrep_next_trx_id(thd->query_id); - } - DBUG_ASSERT(thd->wsrep_next_trx_id() != WSREP_UNDEFINED_TRX_ID); - } - if (wsrep_start_transaction(thd, thd->wsrep_next_trx_id())) - { - return WSREP_TRX_FAIL; - } - DBUG_ASSERT(thd->wsrep_trx_id() != WSREP_UNDEFINED_TRX_ID); - return WSREP_OK; -} - int wsrep_thd_retry_counter(const THD *thd) { return thd->wsrep_retry_counter; diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index 957f1ef3ab1..3c430ccf487 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -445,15 +445,6 @@ bool wsrep_provider_is_SR_capable(); int wsrep_ordered_commit_if_no_binlog(THD*, bool); /** - * Commit the current transaction with the - * MySQL "Transaction Coordinator Log" (see `class TC_LOG` in sql/log.h). - * Calling this function will generate and assign a new wsrep transaction id - * for `thd`. - * @return WSREP_OK on success or other WSREP_* error code on failure - */ -wsrep_status_t wsrep_tc_log_commit(THD* thd); - -/** * Initialize WSREP server instance. * * @return Zero on success, non-zero on error. diff --git a/wsrep-lib b/wsrep-lib -Subproject e7d72ae7f6a6995a21d743389426a963429a1ff +Subproject 20b52ff1ddc3b2f547b7081471f46dcfa5efabc |