summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTeemu Ollakka <teemu.ollakka@galeracluster.com>2019-02-11 14:47:08 +0200
committerTeemu Ollakka <teemu.ollakka@galeracluster.com>2019-02-12 10:37:05 +0200
commitf06a0b5338694755842a58798bb3a9a40da78bfd (patch)
tree889ab78496e5f830ecb397f2e79d61bd7815c53c
parentce6505f890956f81354269a991e69f20babae8e4 (diff)
downloadmariadb-git-f06a0b5338694755842a58798bb3a9a40da78bfd.tar.gz
Implement wsrep_load_data_splitting with streaming replication
If wsrep_load_data_splitting is configured, change streaming replication parameters internally to match the original behavior, i.e. replicate on every 10000 rows. After load data is over, restore original streaming replication settings. Removed redundant wsrep_tc_log_commit().
-rw-r--r--sql/sql_load.cc64
-rw-r--r--sql/wsrep_mysqld.cc51
-rw-r--r--sql/wsrep_mysqld.h9
m---------wsrep-lib0
4 files changed, 31 insertions, 93 deletions
diff --git a/sql/sql_load.cc b/sql/sql_load.cc
index c95ef72a308..da5356ffb4b 100644
--- a/sql/sql_load.cc
+++ b/sql/sql_load.cc
@@ -100,41 +100,39 @@ public:
#define PUSH(A) *(stack_pos++)=(A)
#ifdef WITH_WSREP
-/** If requested by wsrep_load_data_splitting, commit and restart
-the transaction after every 10,000 inserted rows. */
-
-static bool wsrep_load_data_split(THD *thd, const TABLE *table,
- const COPY_INFO &info)
+/** If requested by wsrep_load_data_splitting and streaming replication is
+ not enabled, replicate a streaming fragment every 10,000 rows.*/
+class Wsrep_load_data_split
{
- DBUG_ENTER("wsrep_load_data_split");
-
- if (!wsrep_load_data_splitting || !WSREP(thd)
- || !info.records || (info.records % 10000)
- || !thd->transaction.stmt.ha_list
- || thd->transaction.stmt.ha_list->ht() != binlog_hton
- || !thd->transaction.stmt.ha_list->next()
- || thd->transaction.stmt.ha_list->next()->next())
- DBUG_RETURN(false);
-
- if (handlerton* hton= thd->transaction.stmt.ha_list->next()->ht())
+public:
+ Wsrep_load_data_split(THD *thd)
+ : m_thd(thd)
+ , m_load_data_splitting(wsrep_load_data_splitting)
+ , m_fragment_unit(thd->wsrep_trx().streaming_context().fragment_unit())
+ , m_fragment_size(thd->wsrep_trx().streaming_context().fragment_size())
{
- if (!(hton->flags & HTON_WSREP_REPLICATION))
- DBUG_RETURN(false);
- WSREP_DEBUG("intermediate transaction commit in LOAD DATA");
- wsrep_tc_log_commit(thd);
- table->file->extra(HA_EXTRA_FAKE_START_STMT);
+ if (WSREP(m_thd) && m_load_data_splitting)
+ {
+ /* Override streaming settings with backward compatible values for
+ load data splitting */
+ m_thd->wsrep_cs().streaming_params(wsrep::streaming_context::row, 10000);
+ }
}
- DBUG_RETURN(false);
-}
-# define WSREP_LOAD_DATA_SPLIT(thd,table,info) \
- if (wsrep_load_data_split(thd,table,info)) \
- { \
- table->auto_increment_field_not_null= FALSE; \
- DBUG_RETURN(1); \
+ ~Wsrep_load_data_split()
+ {
+ if (WSREP(m_thd) && m_load_data_splitting)
+ {
+ /* Restore original settings */
+ m_thd->wsrep_cs().streaming_params(m_fragment_unit, m_fragment_size);
+ }
}
-#else /* WITH_WSREP */
-#define WSREP_LOAD_DATA_SPLIT(thd,table,info) /* empty */
+private:
+ THD *m_thd;
+ my_bool m_load_data_splitting;
+ enum wsrep::streaming_context::fragment_unit m_fragment_unit;
+ size_t m_fragment_size;
+};
#endif /* WITH_WSREP */
class READ_INFO: public Load_data_param
@@ -354,6 +352,9 @@ int mysql_load(THD *thd, const sql_exchange *ex, TABLE_LIST *table_list,
bool transactional_table __attribute__((unused));
DBUG_ENTER("mysql_load");
+#ifdef WITH_WSREP
+ Wsrep_load_data_split wsrep_load_data_split(thd);
+#endif /* WITH_WSREP */
/*
Bug #34283
mysqlbinlog leaves tmpfile after termination if binlog contains
@@ -1005,7 +1006,6 @@ read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
DBUG_RETURN(-1);
}
- WSREP_LOAD_DATA_SPLIT(thd, table, info);
err= write_record(thd, table, &info);
table->auto_increment_field_not_null= FALSE;
if (err)
@@ -1148,7 +1148,6 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
DBUG_RETURN(-1);
}
- WSREP_LOAD_DATA_SPLIT(thd, table, info);
err= write_record(thd, table, &info);
table->auto_increment_field_not_null= FALSE;
if (err)
@@ -1271,7 +1270,6 @@ read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
DBUG_RETURN(-1);
}
- WSREP_LOAD_DATA_SPLIT(thd, table, info);
err= write_record(thd, table, &info);
table->auto_increment_field_not_null= false;
if (err)
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index 401b40590df..06e713ceda0 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -2499,57 +2499,6 @@ int wsrep_ordered_commit_if_no_binlog(THD* thd, bool all)
return 0;
}
-wsrep_status_t wsrep_tc_log_commit(THD* thd)
-{
- int cookie;
- my_xid xid= thd->transaction.xid_state.xid.get_my_xid();
-
- DBUG_ASSERT(thd->lex->sql_command == SQLCOM_LOAD);
- if (wsrep_before_commit(thd, true))
- {
- WSREP_DEBUG("wsrep_tc_log_commit: wsrep_before_commit failed %llu",
- thd->thread_id);
- return WSREP_TRX_FAIL;
- }
- cookie= tc_log->log_and_order(thd, xid, 1, false, true);
- if (wsrep_after_commit(thd, true))
- {
- WSREP_DEBUG("wsrep_tc_log_commit: wsrep_after_commit failed %llu",
- thd->thread_id);
- return WSREP_TRX_FAIL;
- }
- if (!cookie)
- {
- WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie);
- return WSREP_TRX_FAIL;
- }
- if (tc_log->unlog(cookie, xid))
- {
- WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie);
- return WSREP_TRX_FAIL;
- }
-
- if (wsrep_after_statement(thd))
- {
- return WSREP_TRX_FAIL;
- }
- /* Set wsrep transaction id if not set. */
- if (thd->wsrep_trx_id() == WSREP_UNDEFINED_TRX_ID)
- {
- if (thd->wsrep_next_trx_id() == WSREP_UNDEFINED_TRX_ID)
- {
- thd->set_wsrep_next_trx_id(thd->query_id);
- }
- DBUG_ASSERT(thd->wsrep_next_trx_id() != WSREP_UNDEFINED_TRX_ID);
- }
- if (wsrep_start_transaction(thd, thd->wsrep_next_trx_id()))
- {
- return WSREP_TRX_FAIL;
- }
- DBUG_ASSERT(thd->wsrep_trx_id() != WSREP_UNDEFINED_TRX_ID);
- return WSREP_OK;
-}
-
int wsrep_thd_retry_counter(const THD *thd)
{
return thd->wsrep_retry_counter;
diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h
index 957f1ef3ab1..3c430ccf487 100644
--- a/sql/wsrep_mysqld.h
+++ b/sql/wsrep_mysqld.h
@@ -445,15 +445,6 @@ bool wsrep_provider_is_SR_capable();
int wsrep_ordered_commit_if_no_binlog(THD*, bool);
/**
- * Commit the current transaction with the
- * MySQL "Transaction Coordinator Log" (see `class TC_LOG` in sql/log.h).
- * Calling this function will generate and assign a new wsrep transaction id
- * for `thd`.
- * @return WSREP_OK on success or other WSREP_* error code on failure
- */
-wsrep_status_t wsrep_tc_log_commit(THD* thd);
-
-/**
* Initialize WSREP server instance.
*
* @return Zero on success, non-zero on error.
diff --git a/wsrep-lib b/wsrep-lib
-Subproject e7d72ae7f6a6995a21d743389426a963429a1ff
+Subproject 20b52ff1ddc3b2f547b7081471f46dcfa5efabc