summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/sql_load.cc64
-rw-r--r--sql/wsrep_mysqld.cc51
-rw-r--r--sql/wsrep_mysqld.h9
m---------wsrep-lib0
4 files changed, 31 insertions, 93 deletions
diff --git a/sql/sql_load.cc b/sql/sql_load.cc
index c95ef72a308..da5356ffb4b 100644
--- a/sql/sql_load.cc
+++ b/sql/sql_load.cc
@@ -100,41 +100,39 @@ public:
#define PUSH(A) *(stack_pos++)=(A)
#ifdef WITH_WSREP
-/** If requested by wsrep_load_data_splitting, commit and restart
-the transaction after every 10,000 inserted rows. */
-
-static bool wsrep_load_data_split(THD *thd, const TABLE *table,
- const COPY_INFO &info)
+/** If requested by wsrep_load_data_splitting and streaming replication is
+ not enabled, replicate a streaming fragment every 10,000 rows.*/
+class Wsrep_load_data_split
{
- DBUG_ENTER("wsrep_load_data_split");
-
- if (!wsrep_load_data_splitting || !WSREP(thd)
- || !info.records || (info.records % 10000)
- || !thd->transaction.stmt.ha_list
- || thd->transaction.stmt.ha_list->ht() != binlog_hton
- || !thd->transaction.stmt.ha_list->next()
- || thd->transaction.stmt.ha_list->next()->next())
- DBUG_RETURN(false);
-
- if (handlerton* hton= thd->transaction.stmt.ha_list->next()->ht())
+public:
+ Wsrep_load_data_split(THD *thd)
+ : m_thd(thd)
+ , m_load_data_splitting(wsrep_load_data_splitting)
+ , m_fragment_unit(thd->wsrep_trx().streaming_context().fragment_unit())
+ , m_fragment_size(thd->wsrep_trx().streaming_context().fragment_size())
{
- if (!(hton->flags & HTON_WSREP_REPLICATION))
- DBUG_RETURN(false);
- WSREP_DEBUG("intermediate transaction commit in LOAD DATA");
- wsrep_tc_log_commit(thd);
- table->file->extra(HA_EXTRA_FAKE_START_STMT);
+ if (WSREP(m_thd) && m_load_data_splitting)
+ {
+ /* Override streaming settings with backward compatible values for
+ load data splitting */
+ m_thd->wsrep_cs().streaming_params(wsrep::streaming_context::row, 10000);
+ }
}
- DBUG_RETURN(false);
-}
-# define WSREP_LOAD_DATA_SPLIT(thd,table,info) \
- if (wsrep_load_data_split(thd,table,info)) \
- { \
- table->auto_increment_field_not_null= FALSE; \
- DBUG_RETURN(1); \
+ ~Wsrep_load_data_split()
+ {
+ if (WSREP(m_thd) && m_load_data_splitting)
+ {
+ /* Restore original settings */
+ m_thd->wsrep_cs().streaming_params(m_fragment_unit, m_fragment_size);
+ }
}
-#else /* WITH_WSREP */
-#define WSREP_LOAD_DATA_SPLIT(thd,table,info) /* empty */
+private:
+ THD *m_thd;
+ my_bool m_load_data_splitting;
+ enum wsrep::streaming_context::fragment_unit m_fragment_unit;
+ size_t m_fragment_size;
+};
#endif /* WITH_WSREP */
class READ_INFO: public Load_data_param
@@ -354,6 +352,9 @@ int mysql_load(THD *thd, const sql_exchange *ex, TABLE_LIST *table_list,
bool transactional_table __attribute__((unused));
DBUG_ENTER("mysql_load");
+#ifdef WITH_WSREP
+ Wsrep_load_data_split wsrep_load_data_split(thd);
+#endif /* WITH_WSREP */
/*
Bug #34283
mysqlbinlog leaves tmpfile after termination if binlog contains
@@ -1005,7 +1006,6 @@ read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
DBUG_RETURN(-1);
}
- WSREP_LOAD_DATA_SPLIT(thd, table, info);
err= write_record(thd, table, &info);
table->auto_increment_field_not_null= FALSE;
if (err)
@@ -1148,7 +1148,6 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
DBUG_RETURN(-1);
}
- WSREP_LOAD_DATA_SPLIT(thd, table, info);
err= write_record(thd, table, &info);
table->auto_increment_field_not_null= FALSE;
if (err)
@@ -1271,7 +1270,6 @@ read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
DBUG_RETURN(-1);
}
- WSREP_LOAD_DATA_SPLIT(thd, table, info);
err= write_record(thd, table, &info);
table->auto_increment_field_not_null= false;
if (err)
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index 401b40590df..06e713ceda0 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -2499,57 +2499,6 @@ int wsrep_ordered_commit_if_no_binlog(THD* thd, bool all)
return 0;
}
-wsrep_status_t wsrep_tc_log_commit(THD* thd)
-{
- int cookie;
- my_xid xid= thd->transaction.xid_state.xid.get_my_xid();
-
- DBUG_ASSERT(thd->lex->sql_command == SQLCOM_LOAD);
- if (wsrep_before_commit(thd, true))
- {
- WSREP_DEBUG("wsrep_tc_log_commit: wsrep_before_commit failed %llu",
- thd->thread_id);
- return WSREP_TRX_FAIL;
- }
- cookie= tc_log->log_and_order(thd, xid, 1, false, true);
- if (wsrep_after_commit(thd, true))
- {
- WSREP_DEBUG("wsrep_tc_log_commit: wsrep_after_commit failed %llu",
- thd->thread_id);
- return WSREP_TRX_FAIL;
- }
- if (!cookie)
- {
- WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie);
- return WSREP_TRX_FAIL;
- }
- if (tc_log->unlog(cookie, xid))
- {
- WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie);
- return WSREP_TRX_FAIL;
- }
-
- if (wsrep_after_statement(thd))
- {
- return WSREP_TRX_FAIL;
- }
- /* Set wsrep transaction id if not set. */
- if (thd->wsrep_trx_id() == WSREP_UNDEFINED_TRX_ID)
- {
- if (thd->wsrep_next_trx_id() == WSREP_UNDEFINED_TRX_ID)
- {
- thd->set_wsrep_next_trx_id(thd->query_id);
- }
- DBUG_ASSERT(thd->wsrep_next_trx_id() != WSREP_UNDEFINED_TRX_ID);
- }
- if (wsrep_start_transaction(thd, thd->wsrep_next_trx_id()))
- {
- return WSREP_TRX_FAIL;
- }
- DBUG_ASSERT(thd->wsrep_trx_id() != WSREP_UNDEFINED_TRX_ID);
- return WSREP_OK;
-}
-
int wsrep_thd_retry_counter(const THD *thd)
{
return thd->wsrep_retry_counter;
diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h
index 957f1ef3ab1..3c430ccf487 100644
--- a/sql/wsrep_mysqld.h
+++ b/sql/wsrep_mysqld.h
@@ -445,15 +445,6 @@ bool wsrep_provider_is_SR_capable();
int wsrep_ordered_commit_if_no_binlog(THD*, bool);
/**
- * Commit the current transaction with the
- * MySQL "Transaction Coordinator Log" (see `class TC_LOG` in sql/log.h).
- * Calling this function will generate and assign a new wsrep transaction id
- * for `thd`.
- * @return WSREP_OK on success or other WSREP_* error code on failure
- */
-wsrep_status_t wsrep_tc_log_commit(THD* thd);
-
-/**
* Initialize WSREP server instance.
*
* @return Zero on success, non-zero on error.
diff --git a/wsrep-lib b/wsrep-lib
-Subproject e7d72ae7f6a6995a21d743389426a963429a1ff
+Subproject 20b52ff1ddc3b2f547b7081471f46dcfa5efabc