summaryrefslogtreecommitdiff
path: root/sql/sql_load.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_load.cc')
-rw-r--r--sql/sql_load.cc64
1 files changed, 31 insertions, 33 deletions
diff --git a/sql/sql_load.cc b/sql/sql_load.cc
index c95ef72a308..da5356ffb4b 100644
--- a/sql/sql_load.cc
+++ b/sql/sql_load.cc
@@ -100,41 +100,39 @@ public:
#define PUSH(A) *(stack_pos++)=(A)
#ifdef WITH_WSREP
-/** If requested by wsrep_load_data_splitting, commit and restart
-the transaction after every 10,000 inserted rows. */
-
-static bool wsrep_load_data_split(THD *thd, const TABLE *table,
- const COPY_INFO &info)
+/** If requested by wsrep_load_data_splitting and streaming replication is
+ not enabled, replicate a streaming fragment every 10,000 rows.*/
+class Wsrep_load_data_split
{
- DBUG_ENTER("wsrep_load_data_split");
-
- if (!wsrep_load_data_splitting || !WSREP(thd)
- || !info.records || (info.records % 10000)
- || !thd->transaction.stmt.ha_list
- || thd->transaction.stmt.ha_list->ht() != binlog_hton
- || !thd->transaction.stmt.ha_list->next()
- || thd->transaction.stmt.ha_list->next()->next())
- DBUG_RETURN(false);
-
- if (handlerton* hton= thd->transaction.stmt.ha_list->next()->ht())
+public:
+ Wsrep_load_data_split(THD *thd)
+ : m_thd(thd)
+ , m_load_data_splitting(wsrep_load_data_splitting)
+ , m_fragment_unit(thd->wsrep_trx().streaming_context().fragment_unit())
+ , m_fragment_size(thd->wsrep_trx().streaming_context().fragment_size())
{
- if (!(hton->flags & HTON_WSREP_REPLICATION))
- DBUG_RETURN(false);
- WSREP_DEBUG("intermediate transaction commit in LOAD DATA");
- wsrep_tc_log_commit(thd);
- table->file->extra(HA_EXTRA_FAKE_START_STMT);
+ if (WSREP(m_thd) && m_load_data_splitting)
+ {
+ /* Override streaming settings with backward compatible values for
+ load data splitting */
+ m_thd->wsrep_cs().streaming_params(wsrep::streaming_context::row, 10000);
+ }
}
- DBUG_RETURN(false);
-}
-# define WSREP_LOAD_DATA_SPLIT(thd,table,info) \
- if (wsrep_load_data_split(thd,table,info)) \
- { \
- table->auto_increment_field_not_null= FALSE; \
- DBUG_RETURN(1); \
+ ~Wsrep_load_data_split()
+ {
+ if (WSREP(m_thd) && m_load_data_splitting)
+ {
+ /* Restore original settings */
+ m_thd->wsrep_cs().streaming_params(m_fragment_unit, m_fragment_size);
+ }
}
-#else /* WITH_WSREP */
-#define WSREP_LOAD_DATA_SPLIT(thd,table,info) /* empty */
+private:
+ THD *m_thd;
+ my_bool m_load_data_splitting;
+ enum wsrep::streaming_context::fragment_unit m_fragment_unit;
+ size_t m_fragment_size;
+};
#endif /* WITH_WSREP */
class READ_INFO: public Load_data_param
@@ -354,6 +352,9 @@ int mysql_load(THD *thd, const sql_exchange *ex, TABLE_LIST *table_list,
bool transactional_table __attribute__((unused));
DBUG_ENTER("mysql_load");
+#ifdef WITH_WSREP
+ Wsrep_load_data_split wsrep_load_data_split(thd);
+#endif /* WITH_WSREP */
/*
Bug #34283
mysqlbinlog leaves tmpfile after termination if binlog contains
@@ -1005,7 +1006,6 @@ read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
DBUG_RETURN(-1);
}
- WSREP_LOAD_DATA_SPLIT(thd, table, info);
err= write_record(thd, table, &info);
table->auto_increment_field_not_null= FALSE;
if (err)
@@ -1148,7 +1148,6 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
DBUG_RETURN(-1);
}
- WSREP_LOAD_DATA_SPLIT(thd, table, info);
err= write_record(thd, table, &info);
table->auto_increment_field_not_null= FALSE;
if (err)
@@ -1271,7 +1270,6 @@ read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
DBUG_RETURN(-1);
}
- WSREP_LOAD_DATA_SPLIT(thd, table, info);
err= write_record(thd, table, &info);
table->auto_increment_field_not_null= false;
if (err)