diff options
author | Teemu Ollakka <teemu.ollakka@galeracluster.com> | 2019-02-11 14:47:08 +0200 |
---|---|---|
committer | Teemu Ollakka <teemu.ollakka@galeracluster.com> | 2019-02-12 10:37:05 +0200 |
commit | f06a0b5338694755842a58798bb3a9a40da78bfd (patch) | |
tree | 889ab78496e5f830ecb397f2e79d61bd7815c53c | |
parent | ce6505f890956f81354269a991e69f20babae8e4 (diff) | |
download | mariadb-git-f06a0b5338694755842a58798bb3a9a40da78bfd.tar.gz |
Implement wsrep_load_data_splitting with streaming replication
If wsrep_load_data_splitting is configured, change streaming replication
parameters internally to match the original behavior, i.e. replicate
on every 10000 rows. After load data is over, restore original
streaming replication settings.
Removed redundant wsrep_tc_log_commit().
-rw-r--r-- | sql/sql_load.cc | 64 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 51 | ||||
-rw-r--r-- | sql/wsrep_mysqld.h | 9 | ||||
m--------- | wsrep-lib | 0 |
4 files changed, 31 insertions, 93 deletions
diff --git a/sql/sql_load.cc b/sql/sql_load.cc index c95ef72a308..da5356ffb4b 100644 --- a/sql/sql_load.cc +++ b/sql/sql_load.cc @@ -100,41 +100,39 @@ public: #define PUSH(A) *(stack_pos++)=(A) #ifdef WITH_WSREP -/** If requested by wsrep_load_data_splitting, commit and restart -the transaction after every 10,000 inserted rows. */ - -static bool wsrep_load_data_split(THD *thd, const TABLE *table, - const COPY_INFO &info) +/** If requested by wsrep_load_data_splitting and streaming replication is + not enabled, replicate a streaming fragment every 10,000 rows.*/ +class Wsrep_load_data_split { - DBUG_ENTER("wsrep_load_data_split"); - - if (!wsrep_load_data_splitting || !WSREP(thd) - || !info.records || (info.records % 10000) - || !thd->transaction.stmt.ha_list - || thd->transaction.stmt.ha_list->ht() != binlog_hton - || !thd->transaction.stmt.ha_list->next() - || thd->transaction.stmt.ha_list->next()->next()) - DBUG_RETURN(false); - - if (handlerton* hton= thd->transaction.stmt.ha_list->next()->ht()) +public: + Wsrep_load_data_split(THD *thd) + : m_thd(thd) + , m_load_data_splitting(wsrep_load_data_splitting) + , m_fragment_unit(thd->wsrep_trx().streaming_context().fragment_unit()) + , m_fragment_size(thd->wsrep_trx().streaming_context().fragment_size()) { - if (!(hton->flags & HTON_WSREP_REPLICATION)) - DBUG_RETURN(false); - WSREP_DEBUG("intermediate transaction commit in LOAD DATA"); - wsrep_tc_log_commit(thd); - table->file->extra(HA_EXTRA_FAKE_START_STMT); + if (WSREP(m_thd) && m_load_data_splitting) + { + /* Override streaming settings with backward compatible values for + load data splitting */ + m_thd->wsrep_cs().streaming_params(wsrep::streaming_context::row, 10000); + } } - DBUG_RETURN(false); -} -# define WSREP_LOAD_DATA_SPLIT(thd,table,info) \ - if (wsrep_load_data_split(thd,table,info)) \ - { \ - table->auto_increment_field_not_null= FALSE; \ - DBUG_RETURN(1); \ + ~Wsrep_load_data_split() + { + if (WSREP(m_thd) && m_load_data_splitting) + { + /* Restore original settings */ + m_thd->wsrep_cs().streaming_params(m_fragment_unit, m_fragment_size); + } } -#else /* WITH_WSREP */ -#define WSREP_LOAD_DATA_SPLIT(thd,table,info) /* empty */ +private: + THD *m_thd; + my_bool m_load_data_splitting; + enum wsrep::streaming_context::fragment_unit m_fragment_unit; + size_t m_fragment_size; +}; #endif /* WITH_WSREP */ class READ_INFO: public Load_data_param @@ -354,6 +352,9 @@ int mysql_load(THD *thd, const sql_exchange *ex, TABLE_LIST *table_list, bool transactional_table __attribute__((unused)); DBUG_ENTER("mysql_load"); +#ifdef WITH_WSREP + Wsrep_load_data_split wsrep_load_data_split(thd); +#endif /* WITH_WSREP */ /* Bug #34283 mysqlbinlog leaves tmpfile after termination if binlog contains @@ -1005,7 +1006,6 @@ read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, DBUG_RETURN(-1); } - WSREP_LOAD_DATA_SPLIT(thd, table, info); err= write_record(thd, table, &info); table->auto_increment_field_not_null= FALSE; if (err) @@ -1148,7 +1148,6 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, DBUG_RETURN(-1); } - WSREP_LOAD_DATA_SPLIT(thd, table, info); err= write_record(thd, table, &info); table->auto_increment_field_not_null= FALSE; if (err) @@ -1271,7 +1270,6 @@ read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, DBUG_RETURN(-1); } - WSREP_LOAD_DATA_SPLIT(thd, table, info); err= write_record(thd, table, &info); table->auto_increment_field_not_null= false; if (err) diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 401b40590df..06e713ceda0 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -2499,57 +2499,6 @@ int wsrep_ordered_commit_if_no_binlog(THD* thd, bool all) return 0; } -wsrep_status_t wsrep_tc_log_commit(THD* thd) -{ - int cookie; - my_xid xid= thd->transaction.xid_state.xid.get_my_xid(); - - DBUG_ASSERT(thd->lex->sql_command == SQLCOM_LOAD); - if (wsrep_before_commit(thd, true)) - { - WSREP_DEBUG("wsrep_tc_log_commit: wsrep_before_commit failed %llu", - thd->thread_id); - return WSREP_TRX_FAIL; - } - cookie= tc_log->log_and_order(thd, xid, 1, false, true); - if (wsrep_after_commit(thd, true)) - { - WSREP_DEBUG("wsrep_tc_log_commit: wsrep_after_commit failed %llu", - thd->thread_id); - return WSREP_TRX_FAIL; - } - if (!cookie) - { - WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie); - return WSREP_TRX_FAIL; - } - if (tc_log->unlog(cookie, xid)) - { - WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie); - return WSREP_TRX_FAIL; - } - - if (wsrep_after_statement(thd)) - { - return WSREP_TRX_FAIL; - } - /* Set wsrep transaction id if not set. */ - if (thd->wsrep_trx_id() == WSREP_UNDEFINED_TRX_ID) - { - if (thd->wsrep_next_trx_id() == WSREP_UNDEFINED_TRX_ID) - { - thd->set_wsrep_next_trx_id(thd->query_id); - } - DBUG_ASSERT(thd->wsrep_next_trx_id() != WSREP_UNDEFINED_TRX_ID); - } - if (wsrep_start_transaction(thd, thd->wsrep_next_trx_id())) - { - return WSREP_TRX_FAIL; - } - DBUG_ASSERT(thd->wsrep_trx_id() != WSREP_UNDEFINED_TRX_ID); - return WSREP_OK; -} - int wsrep_thd_retry_counter(const THD *thd) { return thd->wsrep_retry_counter; diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index 957f1ef3ab1..3c430ccf487 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -445,15 +445,6 @@ bool wsrep_provider_is_SR_capable(); int wsrep_ordered_commit_if_no_binlog(THD*, bool); /** - * Commit the current transaction with the - * MySQL "Transaction Coordinator Log" (see `class TC_LOG` in sql/log.h). - * Calling this function will generate and assign a new wsrep transaction id - * for `thd`. - * @return WSREP_OK on success or other WSREP_* error code on failure - */ -wsrep_status_t wsrep_tc_log_commit(THD* thd); - -/** * Initialize WSREP server instance. * * @return Zero on success, non-zero on error. diff --git a/wsrep-lib b/wsrep-lib -Subproject e7d72ae7f6a6995a21d743389426a963429a1ff +Subproject 20b52ff1ddc3b2f547b7081471f46dcfa5efabc |