diff options
-rw-r--r-- | include/mysql/service_wsrep.h | 6 | ||||
-rw-r--r-- | mysql-test/suite/galera_3nodes/r/galera_load_data_ist.result | 36 | ||||
-rw-r--r-- | mysql-test/suite/galera_3nodes/t/galera_load_data_ist.cnf | 4 | ||||
-rw-r--r-- | mysql-test/suite/galera_3nodes/t/galera_load_data_ist.test | 124 | ||||
-rw-r--r-- | sql/log.cc | 51 | ||||
-rw-r--r-- | sql/sql_class.cc | 2 | ||||
-rw-r--r-- | sql/sql_class.h | 8 | ||||
-rw-r--r-- | sql/sql_load.cc | 55 | ||||
-rw-r--r-- | sql/sql_plugin_services.ic | 2 | ||||
-rw-r--r-- | sql/wsrep_dummy.cc | 6 | ||||
-rw-r--r-- | sql/wsrep_hton.cc | 1 | ||||
-rw-r--r-- | sql/wsrep_thd.cc | 10 | ||||
-rw-r--r-- | storage/innobase/handler/ha_innodb.cc | 10 |
13 files changed, 295 insertions, 20 deletions
diff --git a/include/mysql/service_wsrep.h b/include/mysql/service_wsrep.h index ee28856ac73..7237cccfaeb 100644 --- a/include/mysql/service_wsrep.h +++ b/include/mysql/service_wsrep.h @@ -108,6 +108,8 @@ extern struct wsrep_service_st { long long (*wsrep_thd_trx_seqno_func)(THD *thd); struct wsrep_ws_handle * (*wsrep_thd_ws_handle_func)(THD *thd); void (*wsrep_thd_auto_increment_variables_func)(THD *thd, unsigned long long *offset, unsigned long long *increment); + void (*wsrep_set_load_multi_commit_func)(THD *thd, bool split); + bool (*wsrep_is_load_multi_commit_func)(THD *thd); int (*wsrep_trx_is_aborting_func)(MYSQL_THD thd); int (*wsrep_trx_order_before_func)(MYSQL_THD, MYSQL_THD); void (*wsrep_unlock_rollback_func)(); @@ -152,6 +154,8 @@ extern struct wsrep_service_st { #define wsrep_thd_trx_seqno(T) wsrep_service->wsrep_thd_trx_seqno_func(T) #define wsrep_thd_ws_handle(T) wsrep_service->wsrep_thd_ws_handle_func(T) #define wsrep_thd_auto_increment_variables(T,O,I) wsrep_service->wsrep_thd_auto_increment_variables_func(T,O,I) +#define wsrep_set_load_multi_commit(T,S) wsrep_service->wsrep_set_load_multi_commit_func(T,S) +#define wsrep_is_load_multi_commit(T) wsrep_service->wsrep_is_load_multi_commit_func(T) #define wsrep_trx_is_aborting(T) wsrep_service->wsrep_trx_is_aborting_func(T) #define wsrep_trx_order_before(T1,T2) wsrep_service->wsrep_trx_order_before_func(T1,T2) #define wsrep_unlock_rollback() wsrep_service->wsrep_unlock_rollback_func() @@ -206,6 +210,8 @@ my_bool wsrep_thd_is_wsrep(MYSQL_THD thd); struct wsrep *get_wsrep(); struct wsrep_ws_handle *wsrep_thd_ws_handle(THD *thd); void wsrep_thd_auto_increment_variables(THD *thd, unsigned long long *offset, unsigned long long *increment); +void wsrep_set_load_multi_commit(THD *thd, bool split); +bool wsrep_is_load_multi_commit(THD *thd); void wsrep_aborting_thd_enqueue(THD *thd); void wsrep_lock_rollback(); void wsrep_post_commit(THD* thd, bool all); diff --git a/mysql-test/suite/galera_3nodes/r/galera_load_data_ist.result b/mysql-test/suite/galera_3nodes/r/galera_load_data_ist.result new file mode 100644 index 00000000000..cfb897e1076 --- /dev/null +++ b/mysql-test/suite/galera_3nodes/r/galera_load_data_ist.result @@ -0,0 +1,36 @@ +connection node_1; +connection node_2; +connection node_3; +connection node_1; +CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB; +connection node_2; +connection node_3; +SET GLOBAL wsrep_provider_options = 'gmcast.isolate = 1'; +SET SESSION wsrep_on = OFF; +SET SESSION wsrep_on = ON; +SET SESSION wsrep_sync_wait = 0; +connection node_2a; +SET SESSION wsrep_sync_wait = 0; +connection node_2; +SET GLOBAL wsrep_load_data_splitting = TRUE; +SET DEBUG_SYNC='intermediate_transaction_commit SIGNAL commited WAIT_FOR ist'; +connection node_2a; +SET DEBUG_SYNC='now WAIT_FOR commited'; +connection node_3; +SET GLOBAL wsrep_provider_options = 'gmcast.isolate = 0'; +connection node_2a; +SET DEBUG_SYNC='now SIGNAL ist'; +connection node_1; +connection node_2; +SET DEBUG_SYNC='RESET'; +SELECT COUNT(*) = 95000 FROM t1; +COUNT(*) = 95000 +1 +wsrep_last_committed_diff +1 +connection node_1; +SET GLOBAL wsrep_load_data_splitting = 1;; +DROP TABLE t1; +disconnect node_3; +disconnect node_2; +disconnect node_1; diff --git a/mysql-test/suite/galera_3nodes/t/galera_load_data_ist.cnf b/mysql-test/suite/galera_3nodes/t/galera_load_data_ist.cnf new file mode 100644 index 00000000000..35ecb8b5937 --- /dev/null +++ b/mysql-test/suite/galera_3nodes/t/galera_load_data_ist.cnf @@ -0,0 +1,4 @@ +!include ../galera_3nodes.cnf + +[mysqld] +wsrep-causal-reads=OFF diff --git a/mysql-test/suite/galera_3nodes/t/galera_load_data_ist.test b/mysql-test/suite/galera_3nodes/t/galera_load_data_ist.test new file mode 100644 index 00000000000..e1140da229b --- /dev/null +++ b/mysql-test/suite/galera_3nodes/t/galera_load_data_ist.test @@ -0,0 +1,124 @@ +--source include/have_debug_sync.inc +--source include/galera_cluster.inc +--source include/have_innodb.inc +--source include/big_test.inc + +# Establish connection to the third node: +--let $galera_connection_name = node_3 +--let $galera_server_number = 3 +--source include/galera_connect.inc + +# Establish additional connection to the second node +# (which is used in the test for synchronization control): +--let $galera_connection_name = node_2a +--let $galera_server_number = 2 +--source include/galera_connect.inc + +# Save original auto_increment_offset values: +--let $node_1=node_1 +--let $node_2=node_2 +--let $node_3=node_3 +--source ../galera/include/auto_increment_offset_save.inc + +# Create a file for LOAD DATA with 95K entries +--connection node_1 +--perl +open(FILE, ">", "$ENV{'MYSQLTEST_VARDIR'}/tmp/galera_var_load_data_splitting.csv") or die; +foreach my $i (1..95000) { + print FILE "$i\n"; +} +EOF + +CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB; + +# Let's wait for the completion of the formation of a cluster +# of three nodes: +--let $wait_condition = SELECT VARIABLE_VALUE = 3 FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_cluster_size'; +--source include/wait_condition.inc +--connection node_2 +--source include/wait_until_ready.inc +--connection node_3 +--source include/wait_until_ready.inc + +# Disconnect the third node from the cluster: +SET GLOBAL wsrep_provider_options = 'gmcast.isolate = 1'; +SET SESSION wsrep_on = OFF; +--let $wait_condition = SELECT VARIABLE_VALUE = 'non-Primary' FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_cluster_status'; +--source include/wait_condition.inc +SET SESSION wsrep_on = ON; +SET SESSION wsrep_sync_wait = 0; + +# Disable sync wait for control connection: +--connection node_2a +SET SESSION wsrep_sync_wait = 0; + +# Let's wait until the other nodes stop seeing the third +# node in the cluster: +--let $wait_condition = SELECT VARIABLE_VALUE = 2 FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_cluster_size'; +--source include/wait_condition.inc + +# Record wsrep_last_committed as it was before LOAD DATA: +--connection node_2 +--let $wsrep_last_committed_before = `SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.SESSION_STATUS WHERE VARIABLE_NAME = 'wsrep_last_committed'` + +# Enable splitting for LOAD DATA: +--let $wsrep_load_data_splitting_orig = `SELECT @@wsrep_load_data_splitting` +SET GLOBAL wsrep_load_data_splitting = TRUE; + +# Stop after the first commit and wait for the IST signal: +SET DEBUG_SYNC='intermediate_transaction_commit SIGNAL commited WAIT_FOR ist'; + +# Perform the LOAD DATA statement: +--disable_query_log +let v1='$MYSQLTEST_VARDIR/tmp/galera_var_load_data_splitting.csv'; +--send_eval LOAD DATA INFILE $v1 INTO TABLE t1; +--enable_query_log + +# Wait for the first commit: +--connection node_2a +SET DEBUG_SYNC='now WAIT_FOR commited'; + +# Initiate the IST: +--connection node_3 +SET GLOBAL wsrep_provider_options = 'gmcast.isolate = 0'; + +# Continue the execution of LOAD DATA: +--connection node_2a +SET DEBUG_SYNC='now SIGNAL ist'; + +# Let's wait for the recovery of the cluster +# of three nodes: +--connection node_1 +--let $wait_condition = SELECT VARIABLE_VALUE = 3 FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_cluster_size'; +--source include/wait_condition.inc + +# Save the LOAD DATA results: +--connection node_2 +--reap + +# Reset all synchronization points and signals: +SET DEBUG_SYNC='RESET'; + +# Read the wsrep_last_commited after LOAD DATA: +--let $wsrep_last_committed_after = `SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.SESSION_STATUS WHERE VARIABLE_NAME = 'wsrep_last_committed'` + +# Check the records: +SELECT COUNT(*) = 95000 FROM t1; + +# LOAD-ing 95K rows should causes 10 commits to be registered: +--disable_query_log +--eval SELECT $wsrep_last_committed_after = $wsrep_last_committed_before + 10 AS wsrep_last_committed_diff; +--enable_query_log + +# Restore the original splitting: +--connection node_1 +--eval SET GLOBAL wsrep_load_data_splitting = $wsrep_load_data_splitting_orig; + +# Drop test table: +DROP TABLE t1; + +# Restore original auto_increment_offset values: +--source ../galera/include/auto_increment_offset_restore.inc + +--let $galera_cluster_size=3 +--source include/galera_end.inc diff --git a/sql/log.cc b/sql/log.cc index 4d62c9783cd..df928e89390 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -6413,8 +6413,25 @@ err: update_binlog_end_pos(offset); signal_update(); + /* + If a transaction with the LOAD DATA statement is divided + into logical mini-transactions (of the 10K rows) and binlog + is rotated, then the last portion of data may be lost due to + wsrep handler re-registration at the boundary of the split. + Since splitting of the LOAD DATA into mini-transactions is + logical, we should not allow these mini-transactions to fall + into separate binlogs. Therefore, it is necessary to prohibit + the rotation of binlog in the middle of processing LOAD DATA: + */ +#ifdef WITH_WSREP + if (!thd->wsrep_split_flag) + { +#endif /* WITH_WSREP */ if ((error= rotate(false, &check_purge))) check_purge= false; +#ifdef WITH_WSREP + } +#endif /* WITH_WSREP */ } } } @@ -7139,8 +7156,25 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd) !(error= flush_and_sync(0))) { signal_update(); + /* + If a transaction with the LOAD DATA statement is divided + into logical mini-transactions (of the 10K rows) and binlog + is rotated, then the last portion of data may be lost due to + wsrep handler re-registration at the boundary of the split. + Since splitting of the LOAD DATA into mini-transactions is + logical, we should not allow these mini-transactions to fall + into separate binlogs. Therefore, it is necessary to prohibit + the rotation of binlog in the middle of processing LOAD DATA: + */ +#ifdef WITH_WSREP + if (!thd->wsrep_split_flag) + { +#endif /* WITH_WSREP */ if ((error= rotate(false, &check_purge))) check_purge= false; +#ifdef WITH_WSREP + } +#endif /* WITH_WSREP */ } offset= my_b_tell(&log_file); @@ -7906,6 +7940,20 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) mark_xids_active(binlog_id, xid_count); } + /* + If a transaction with the LOAD DATA statement is divided + into logical mini-transactions (of the 10K rows) and binlog + is rotated, then the last portion of data may be lost due to + wsrep handler re-registration at the boundary of the split. + Since splitting of the LOAD DATA into mini-transactions is + logical, we should not allow these mini-transactions to fall + into separate binlogs. Therefore, it is necessary to prohibit + the rotation of binlog in the middle of processing LOAD DATA: + */ +#ifdef WITH_WSREP + if (!leader->thd->wsrep_split_flag) + { +#endif /* WITH_WSREP */ if (rotate(false, &check_purge)) { /* @@ -7925,6 +7973,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, errno); check_purge= false; } +#ifdef WITH_WSREP + } +#endif /* WITH_WSREP */ /* In case of binlog rotate, update the correct current binlog offset. */ commit_offset= my_b_write_tell(&log_file); } diff --git a/sql/sql_class.cc b/sql/sql_class.cc index d6aa6456710..512f7fdfd56 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -784,6 +784,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) wsrep_affected_rows = 0; wsrep_replicate_GTID = false; wsrep_skip_wsrep_GTID = false; + wsrep_split_flag = false; #endif /* Call to init() below requires fully initialized Open_tables_state. */ reset_open_tables_state(this); @@ -1218,6 +1219,7 @@ void THD::init(void) wsrep_affected_rows = 0; wsrep_replicate_GTID = false; wsrep_skip_wsrep_GTID = false; + wsrep_split_flag = false; #endif /* WITH_WSREP */ if (variables.sql_log_bin) diff --git a/sql/sql_class.h b/sql/sql_class.h index d701d4cb46c..cb182f55bf1 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -4431,6 +4431,14 @@ public: ulong wsrep_affected_rows; bool wsrep_replicate_GTID; bool wsrep_skip_wsrep_GTID; + /* This flag is set when innodb do an intermediate commit to + processing the LOAD DATA INFILE statement by splitting it into 10K + rows chunks. If flag is set, then binlog rotation is not performed + while intermediate transaction try to commit, because in this case + rotation causes unregistration of innodb handler. Later innodb handler + registered again, but replication of last chunk of rows is skipped + by the innodb engine: */ + bool wsrep_split_flag; #endif /* WITH_WSREP */ /* Handling of timeouts for commands */ diff --git a/sql/sql_load.cc b/sql/sql_load.cc index 8c2f17dac3f..8e0bdcb32b8 100644 --- a/sql/sql_load.cc +++ b/sql/sql_load.cc @@ -41,6 +41,7 @@ #include "sql_trigger.h" #include "sql_derived.h" #include "sql_show.h" +#include "debug_sync.h" extern "C" int _my_b_net_read(IO_CACHE *info, uchar *Buffer, size_t Count); @@ -119,21 +120,43 @@ static bool wsrep_load_data_split(THD *thd, const TABLE *table, if (hton->db_type != DB_TYPE_INNODB) DBUG_RETURN(false); WSREP_DEBUG("intermediate transaction commit in LOAD DATA"); + wsrep_set_load_multi_commit(thd, true); if (wsrep_run_wsrep_commit(thd, true) != WSREP_TRX_OK) DBUG_RETURN(true); if (binlog_hton->commit(binlog_hton, thd, true)) DBUG_RETURN(true); wsrep_post_commit(thd, true); hton->commit(hton, thd, true); + wsrep_set_load_multi_commit(thd, false); + DEBUG_SYNC(thd, "intermediate_transaction_commit"); table->file->extra(HA_EXTRA_FAKE_START_STMT); } DBUG_RETURN(false); } -# define WSREP_LOAD_DATA_SPLIT(thd,table,info) \ - if (wsrep_load_data_split(thd,table,info)) DBUG_RETURN(1) +/* + If the commit fails, then an early return from + the function occurs there and therefore we need + to reset the table->auto_increment_field_not_null + flag, which is usually reset after calling + the write_record(): +*/ +#define WSREP_LOAD_DATA_SPLIT(thd,table,info) \ + if (wsrep_load_data_split(thd,table,info)) \ + { \ + table->auto_increment_field_not_null= FALSE; \ + DBUG_RETURN(1); \ + } #else /* WITH_WSREP */ #define WSREP_LOAD_DATA_SPLIT(thd,table,info) /* empty */ #endif /* WITH_WSREP */ +#define WRITE_RECORD(thd,table,info) \ + do { \ + int err_= write_record(thd, table, &info); \ + table->auto_increment_field_not_null= FALSE; \ + if (err_) \ + DBUG_RETURN(1); \ + } while (0) + class READ_INFO: public Load_data_param { File file; @@ -911,7 +934,7 @@ read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, List_iterator_fast<Item> it(fields_vars); Item *item; TABLE *table= table_list->table; - bool err, progress_reports; + bool progress_reports; ulonglong counter, time_to_report_progress; DBUG_ENTER("read_fixed_length"); @@ -1003,11 +1026,8 @@ read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, } WSREP_LOAD_DATA_SPLIT(thd, table, info); - err= write_record(thd, table, &info); - table->auto_increment_field_not_null= FALSE; - if (err) - DBUG_RETURN(1); - + WRITE_RECORD(thd, table, info); + /* We don't need to reset auto-increment field since we are restoring its default value at the beginning of each loop iteration. @@ -1040,7 +1060,7 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, Item *item; TABLE *table= table_list->table; uint enclosed_length; - bool err, progress_reports; + bool progress_reports; ulonglong counter, time_to_report_progress; DBUG_ENTER("read_sep_field"); @@ -1124,7 +1144,7 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, { Load_data_outvar *dst= item->get_load_data_outvar_or_error(); DBUG_ASSERT(dst); - if (dst->load_data_set_no_data(thd, &read_info)) + if (unlikely(dst->load_data_set_no_data(thd, &read_info))) DBUG_RETURN(1); } } @@ -1146,10 +1166,8 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, } WSREP_LOAD_DATA_SPLIT(thd, table, info); - err= write_record(thd, table, &info); - table->auto_increment_field_not_null= FALSE; - if (err) - DBUG_RETURN(1); + WRITE_RECORD(thd, table, info); + /* We don't need to reset auto-increment field since we are restoring its default value at the beginning of each loop iteration. @@ -1267,13 +1285,10 @@ read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, case VIEW_CHECK_ERROR: DBUG_RETURN(-1); } - + WSREP_LOAD_DATA_SPLIT(thd, table, info); - err= write_record(thd, table, &info); - table->auto_increment_field_not_null= false; - if (err) - DBUG_RETURN(1); - + WRITE_RECORD(thd, table, info); + /* We don't need to reset auto-increment field since we are restoring its default value at the beginning of each loop iteration. diff --git a/sql/sql_plugin_services.ic b/sql/sql_plugin_services.ic index 3d6cf0a0723..e7de45b5ee2 100644 --- a/sql/sql_plugin_services.ic +++ b/sql/sql_plugin_services.ic @@ -178,6 +178,8 @@ static struct wsrep_service_st wsrep_handler = { wsrep_thd_trx_seqno, wsrep_thd_ws_handle, wsrep_thd_auto_increment_variables, + wsrep_set_load_multi_commit, + wsrep_is_load_multi_commit, wsrep_trx_is_aborting, wsrep_trx_order_before, wsrep_unlock_rollback, diff --git a/sql/wsrep_dummy.cc b/sql/wsrep_dummy.cc index 7297dbfe0fd..aff75cf7790 100644 --- a/sql/wsrep_dummy.cc +++ b/sql/wsrep_dummy.cc @@ -133,6 +133,12 @@ void wsrep_thd_auto_increment_variables(THD *thd, *increment= thd->variables.auto_increment_increment; } +void wsrep_set_load_multi_commit(THD *thd, bool split) +{ } + +bool wsrep_is_load_multi_commit(THD *thd) +{ return false; } + int wsrep_trx_is_aborting(THD *) { return 0; } diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc index e3da6a79f26..3603e05fd5f 100644 --- a/sql/wsrep_hton.cc +++ b/sql/wsrep_hton.cc @@ -45,6 +45,7 @@ void wsrep_cleanup_transaction(THD *thd) thd->wsrep_exec_mode= LOCAL_STATE; thd->wsrep_affected_rows= 0; thd->wsrep_skip_wsrep_GTID= false; + thd->wsrep_split_flag= false; return; } diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc index dab9f91b381..00afbec290e 100644 --- a/sql/wsrep_thd.cc +++ b/sql/wsrep_thd.cc @@ -708,3 +708,13 @@ my_bool wsrep_thd_is_applier(MYSQL_THD thd) return (is_applier); } + +void wsrep_set_load_multi_commit(THD *thd, bool split) +{ + thd->wsrep_split_flag= split; +} + +bool wsrep_is_load_multi_commit(THD *thd) +{ + return thd->wsrep_split_flag; +} diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index fc1d88a0be2..5ea6648c4a4 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -8199,6 +8199,16 @@ ha_innobase::write_row( ++trx->will_lock; } +#ifdef WITH_WSREP + if (wsrep_is_load_multi_commit(m_user_thd)) + { + /* Note that this transaction is still active. */ + trx_register_for_2pc(m_prebuilt->trx); + /* We will need an IX lock on the destination table. */ + m_prebuilt->sql_stat_start = TRUE; + } +#endif /* WITH_WSREP */ + /* Handling of Auto-Increment Columns. */ if (table->next_number_field && record == table->record[0]) { |