diff options
32 files changed, 1193 insertions, 691 deletions
diff --git a/mysql-test/extra/rpl_tests/rpl_semi_sync.inc b/mysql-test/extra/rpl_tests/rpl_semi_sync.inc index ed56e405e27..393b49372e1 100644 --- a/mysql-test/extra/rpl_tests/rpl_semi_sync.inc +++ b/mysql-test/extra/rpl_tests/rpl_semi_sync.inc @@ -16,6 +16,7 @@ connection master; call mtr.add_suppression("Timeout waiting for reply of binlog"); call mtr.add_suppression("Read semi-sync reply"); call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT."); +call mtr.add_suppression("mysqld: Got an error reading communication packets"); connection slave; call mtr.add_suppression("Master server does not support semi-sync"); call mtr.add_suppression("Semi-sync slave .* reply"); diff --git a/mysql-test/suite/binlog_encryption/rpl_semi_sync.result b/mysql-test/suite/binlog_encryption/rpl_semi_sync.result index 6d574681d73..106efb555d3 100644 --- a/mysql-test/suite/binlog_encryption/rpl_semi_sync.result +++ b/mysql-test/suite/binlog_encryption/rpl_semi_sync.result @@ -4,6 +4,7 @@ connection master; call mtr.add_suppression("Timeout waiting for reply of binlog"); call mtr.add_suppression("Read semi-sync reply"); call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT."); +call mtr.add_suppression("mysqld: Got an error reading communication packets"); connection slave; call mtr.add_suppression("Master server does not support semi-sync"); call mtr.add_suppression("Semi-sync slave .* reply"); @@ -176,7 +177,7 @@ Variable_name Value Rpl_semi_sync_master_yes_tx 14 show status like 'Rpl_semi_sync_master_clients'; Variable_name Value -Rpl_semi_sync_master_clients 1 +Rpl_semi_sync_master_clients 0 [ semi-sync replication of these transactions will fail ] insert into t1 values (500); [ master status should be OFF ] @@ -321,7 +322,6 @@ connection slave; include/stop_slave.inc reset slave; connection master; -kill query _tid; connection slave; include/start_slave.inc connection master; @@ -353,7 +353,6 @@ include/stop_slave.inc reset slave; connection master; reset master; -kill query _tid; set sql_log_bin=0; grant replication slave on *.* to rpl@127.0.0.1 identified by 'rpl_password'; flush privileges; @@ -404,7 +403,6 @@ SHOW STATUS LIKE 'Rpl_semi_sync_slave_status'; Variable_name Value Rpl_semi_sync_slave_status OFF connection master; -kill query _tid; [ Semi-sync status on master should be ON ] show status like 'Rpl_semi_sync_master_clients'; Variable_name Value diff --git a/mysql-test/suite/perfschema/r/dml_setup_instruments.result b/mysql-test/suite/perfschema/r/dml_setup_instruments.result index 940839b105f..3f4d6107717 100644 --- a/mysql-test/suite/perfschema/r/dml_setup_instruments.result +++ b/mysql-test/suite/perfschema/r/dml_setup_instruments.result @@ -4,6 +4,7 @@ where name like 'Wait/Synch/Mutex/sql/%' and name not in ('wait/synch/mutex/sql/DEBUG_SYNC::mutex') order by name limit 10; NAME ENABLED TIMED +wait/synch/mutex/sql/Ack_receiver::m_mutex YES YES wait/synch/mutex/sql/Cversion_lock YES YES wait/synch/mutex/sql/Delayed_insert::mutex YES YES wait/synch/mutex/sql/Event_scheduler::LOCK_scheduler_state YES YES @@ -13,7 +14,6 @@ wait/synch/mutex/sql/HA_DATA_PARTITION::LOCK_auto_inc YES YES wait/synch/mutex/sql/LOCK_active_mi YES YES wait/synch/mutex/sql/LOCK_after_binlog_sync YES YES wait/synch/mutex/sql/LOCK_audit_mask YES YES -wait/synch/mutex/sql/LOCK_binlog YES YES select * from performance_schema.setup_instruments where name like 'Wait/Synch/Rwlock/sql/%' and name not in ('wait/synch/rwlock/sql/CRYPTO_dynlock_value::lock') @@ -36,6 +36,7 @@ where name like 'Wait/Synch/Cond/sql/%' 'wait/synch/cond/sql/DEBUG_SYNC::cond') order by name limit 10; NAME ENABLED TIMED +wait/synch/cond/sql/Ack_receiver::m_cond YES YES wait/synch/cond/sql/COND_binlog_send YES YES wait/synch/cond/sql/COND_flush_thread_cache YES YES wait/synch/cond/sql/COND_group_commit_orderer YES YES @@ -45,7 +46,6 @@ wait/synch/cond/sql/COND_parallel_entry YES YES wait/synch/cond/sql/COND_prepare_ordered YES YES wait/synch/cond/sql/COND_queue_state YES YES wait/synch/cond/sql/COND_rpl_thread YES YES -wait/synch/cond/sql/COND_rpl_thread_pool YES YES select * from performance_schema.setup_instruments where name='Wait'; select * from performance_schema.setup_instruments diff --git a/mysql-test/suite/perfschema/r/relaylog.result b/mysql-test/suite/perfschema/r/relaylog.result index b8655a781eb..3fcf7367b53 100644 --- a/mysql-test/suite/perfschema/r/relaylog.result +++ b/mysql-test/suite/perfschema/r/relaylog.result @@ -86,6 +86,7 @@ EVENT_NAME COUNT_STAR SUM_TIMER_WAIT MIN_TIMER_WAIT AVG_TIMER_WAIT MAX_TIMER_WAI wait/synch/cond/sql/MYSQL_RELAY_LOG::COND_bin_log_updated 0 0 0 0 0 wait/synch/cond/sql/MYSQL_RELAY_LOG::COND_queue_busy 0 0 0 0 0 wait/synch/cond/sql/MYSQL_RELAY_LOG::COND_relay_log_updated 0 0 0 0 0 +wait/synch/mutex/sql/MYSQL_RELAY_LOG::LOCK_binlog_end_pos 0 0 0 0 0 wait/synch/mutex/sql/MYSQL_RELAY_LOG::LOCK_index 0 0 0 0 0 connection slave; "============ Performance schema on slave ============" @@ -193,5 +194,6 @@ EVENT_NAME COUNT_STAR wait/synch/cond/sql/MYSQL_RELAY_LOG::COND_bin_log_updated NONE wait/synch/cond/sql/MYSQL_RELAY_LOG::COND_queue_busy NONE wait/synch/cond/sql/MYSQL_RELAY_LOG::COND_relay_log_updated MANY +wait/synch/mutex/sql/MYSQL_RELAY_LOG::LOCK_binlog_end_pos NONE wait/synch/mutex/sql/MYSQL_RELAY_LOG::LOCK_index MANY include/stop_slave.inc diff --git a/mysql-test/suite/rpl/r/rpl_semi_sync.result b/mysql-test/suite/rpl/r/rpl_semi_sync.result index 6d574681d73..106efb555d3 100644 --- a/mysql-test/suite/rpl/r/rpl_semi_sync.result +++ b/mysql-test/suite/rpl/r/rpl_semi_sync.result @@ -4,6 +4,7 @@ connection master; call mtr.add_suppression("Timeout waiting for reply of binlog"); call mtr.add_suppression("Read semi-sync reply"); call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT."); +call mtr.add_suppression("mysqld: Got an error reading communication packets"); connection slave; call mtr.add_suppression("Master server does not support semi-sync"); call mtr.add_suppression("Semi-sync slave .* reply"); @@ -176,7 +177,7 @@ Variable_name Value Rpl_semi_sync_master_yes_tx 14 show status like 'Rpl_semi_sync_master_clients'; Variable_name Value -Rpl_semi_sync_master_clients 1 +Rpl_semi_sync_master_clients 0 [ semi-sync replication of these transactions will fail ] insert into t1 values (500); [ master status should be OFF ] @@ -321,7 +322,6 @@ connection slave; include/stop_slave.inc reset slave; connection master; -kill query _tid; connection slave; include/start_slave.inc connection master; @@ -353,7 +353,6 @@ include/stop_slave.inc reset slave; connection master; reset master; -kill query _tid; set sql_log_bin=0; grant replication slave on *.* to rpl@127.0.0.1 identified by 'rpl_password'; flush privileges; @@ -404,7 +403,6 @@ SHOW STATUS LIKE 'Rpl_semi_sync_slave_status'; Variable_name Value Rpl_semi_sync_slave_status OFF connection master; -kill query _tid; [ Semi-sync status on master should be ON ] show status like 'Rpl_semi_sync_master_clients'; Variable_name Value diff --git a/mysql-test/suite/rpl/r/rpl_semi_sync_event.result b/mysql-test/suite/rpl/r/rpl_semi_sync_event.result index c347ff410ac..917e7c2b02b 100644 --- a/mysql-test/suite/rpl/r/rpl_semi_sync_event.result +++ b/mysql-test/suite/rpl/r/rpl_semi_sync_event.result @@ -5,6 +5,7 @@ call mtr.add_suppression("Timeout waiting for reply of binlog"); call mtr.add_suppression("Semi-sync master .* waiting for slave reply"); call mtr.add_suppression("Read semi-sync reply"); call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT."); +call mtr.add_suppression("mysqld: Got an error reading communication packets"); connection slave; call mtr.add_suppression("Master server does not support semi-sync"); call mtr.add_suppression("Semi-sync slave .* reply"); diff --git a/mysql-test/suite/rpl/r/rpl_semi_sync_event_after_sync.result b/mysql-test/suite/rpl/r/rpl_semi_sync_event_after_sync.result index c237eb8df47..24daf0d72b5 100644 --- a/mysql-test/suite/rpl/r/rpl_semi_sync_event_after_sync.result +++ b/mysql-test/suite/rpl/r/rpl_semi_sync_event_after_sync.result @@ -6,6 +6,7 @@ call mtr.add_suppression("Timeout waiting for reply of binlog"); call mtr.add_suppression("Semi-sync master .* waiting for slave reply"); call mtr.add_suppression("Read semi-sync reply"); call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT."); +call mtr.add_suppression("mysqld: Got an error reading communication packets"); connection slave; call mtr.add_suppression("Master server does not support semi-sync"); call mtr.add_suppression("Semi-sync slave .* reply"); diff --git a/mysql-test/suite/rpl/t/rpl_semi_sync_event.test b/mysql-test/suite/rpl/t/rpl_semi_sync_event.test index d7685413a07..4d96fd694ec 100644 --- a/mysql-test/suite/rpl/t/rpl_semi_sync_event.test +++ b/mysql-test/suite/rpl/t/rpl_semi_sync_event.test @@ -10,6 +10,8 @@ call mtr.add_suppression("Timeout waiting for reply of binlog"); call mtr.add_suppression("Semi-sync master .* waiting for slave reply"); call mtr.add_suppression("Read semi-sync reply"); call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT."); +call mtr.add_suppression("mysqld: Got an error reading communication packets"); + connection slave; call mtr.add_suppression("Master server does not support semi-sync"); call mtr.add_suppression("Semi-sync slave .* reply"); diff --git a/mysql-test/suite/sys_vars/r/rpl_semi_sync_master_enabled_basic.result b/mysql-test/suite/sys_vars/r/rpl_semi_sync_master_enabled_basic.result index 7454f0b0089..0c90462df59 100644 --- a/mysql-test/suite/sys_vars/r/rpl_semi_sync_master_enabled_basic.result +++ b/mysql-test/suite/sys_vars/r/rpl_semi_sync_master_enabled_basic.result @@ -65,6 +65,12 @@ set global rpl_semi_sync_master_enabled=1e1; ERROR 42000: Incorrect argument type to variable 'rpl_semi_sync_master_enabled' set global rpl_semi_sync_master_enabled="some text"; ERROR 42000: Variable 'rpl_semi_sync_master_enabled' can't be set to the value of 'some text' +connect con1,localhost,root,,; +connect con2,localhost,root,,; +disconnect con1; +disconnect con2; +connection default; +SET @@global.rpl_semi_sync_master_enabled = 1; SET @@global.rpl_semi_sync_master_enabled = @start_global_value; select @@global.rpl_semi_sync_master_enabled; @@global.rpl_semi_sync_master_enabled diff --git a/mysql-test/suite/sys_vars/t/rpl_semi_sync_master_enabled_basic.test b/mysql-test/suite/sys_vars/t/rpl_semi_sync_master_enabled_basic.test index da22d0535f4..68653d3a9a7 100644 --- a/mysql-test/suite/sys_vars/t/rpl_semi_sync_master_enabled_basic.test +++ b/mysql-test/suite/sys_vars/t/rpl_semi_sync_master_enabled_basic.test @@ -51,6 +51,35 @@ set global rpl_semi_sync_master_enabled=1e1; --error ER_WRONG_VALUE_FOR_VAR set global rpl_semi_sync_master_enabled="some text"; +# +# Test conflicting concurrent setting +# +--let $val_saved= `SELECT @@global.rpl_semi_sync_master_enabled` +connect (con1,localhost,root,,); +connect (con2,localhost,root,,); +--let $iter=100 +--disable_query_log +while ($iter) +{ + --connection con1 + --send_eval SET @@global.rpl_semi_sync_master_enabled = $iter % 2 + + --connection con2 + --send_eval SET @@global.rpl_semi_sync_master_enabled = ($iter + 1) % 2 + + --connection con1 + reap; + --connection con2 + reap; + + --dec $iter +} +--enable_query_log +disconnect con1; +disconnect con2; + +--connection default +--eval SET @@global.rpl_semi_sync_master_enabled = $val_saved # # Cleanup diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 6c63f3feca3..fadee80491b 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -139,6 +139,7 @@ SET (SQL_SOURCE my_json_writer.cc rpl_gtid.cc rpl_parallel.cc semisync.cc semisync_master.cc semisync_slave.cc + semisync_master_ack_receiver.cc sql_type.cc item_windowfunc.cc sql_window.cc sql_cte.cc diff --git a/sql/handler.cc b/sql/handler.cc index 47eb58e17f3..7ea02278abc 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -50,6 +50,7 @@ #ifdef WITH_ARIA_STORAGE_ENGINE #include "../storage/maria/ha_maria.h" #endif +#include "semisync_master.h" #include "wsrep_mysqld.h" #include "wsrep.h" @@ -1485,6 +1486,10 @@ done: mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync); mysql_mutex_assert_not_owner(&LOCK_commit_ordered); (void) RUN_HOOK(transaction, after_commit, (thd, FALSE)); +#ifdef REPLICATION + repl_semisync_master.waitAfterCommit(thd, all); + DEBUG_SYNC(thd, "after_group_after_commit"); +#endif goto end; /* Come here if error and we need to rollback. */ @@ -1730,6 +1735,9 @@ int ha_rollback_trans(THD *thd, bool all) ER_WARNING_NOT_COMPLETE_ROLLBACK, ER_THD(thd, ER_WARNING_NOT_COMPLETE_ROLLBACK)); (void) RUN_HOOK(transaction, after_rollback, (thd, FALSE)); +#ifdef REPLICATION + repl_semisync_master.waitAfterRollback(thd, all); +#endif DBUG_RETURN(error); } diff --git a/sql/log.cc b/sql/log.cc index 9d4a622d400..a866d72d785 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -53,6 +53,7 @@ #include "debug_sync.h" #include "sql_show.h" #include "my_pthread.h" +#include "semisync_master.h" #include "wsrep_mysqld.h" #include "sp_rcontext.h" #include "sp_head.h" @@ -3329,7 +3330,7 @@ void MYSQL_BIN_LOG::init_pthread_objects() mysql_cond_init(key_BINLOG_COND_binlog_background_thread_end, &COND_binlog_background_thread_end, 0); - mysql_mutex_init(key_LOCK_binlog_end_pos, &LOCK_binlog_end_pos, + mysql_mutex_init(m_key_LOCK_binlog_end_pos, &LOCK_binlog_end_pos, MY_MUTEX_INIT_SLOW); } @@ -5250,6 +5251,7 @@ end: close(LOG_CLOSE_INDEX); sql_print_error(fatal_log_error, new_name_ptr, errno); } + mysql_mutex_unlock(&LOCK_index); if (need_lock) mysql_mutex_unlock(&LOCK_log); @@ -6376,7 +6378,12 @@ err: mysql_mutex_assert_not_owner(&LOCK_commit_ordered); if ((error= RUN_HOOK(binlog_storage, after_flush, (thd, log_file_name, file->pos_in_file, - synced, true, true)))) + synced, true, true))) +#ifdef REPLICATION + || repl_semisync_master.reportBinlogUpdate(thd, log_file_name, + file->pos_in_file) +#endif + ) { sql_print_error("Failed to run 'after_flush' hooks"); error= 1; @@ -6408,7 +6415,12 @@ err: mysql_mutex_assert_not_owner(&LOCK_commit_ordered); if (RUN_HOOK(binlog_storage, after_sync, (thd, log_file_name, file->pos_in_file, - true, true))) + true, true)) +#ifdef REPLICATION + || repl_semisync_master.waitAfterSync(log_file_name, + file->pos_in_file) +#endif + ) { error=1; /* error is already printed inside hook */ @@ -7589,7 +7601,11 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) else if (is_leader) trx_group_commit_leader(entry); else if (!entry->queued_by_other) + { + DEBUG_SYNC(entry->thd, "after_semisync_queue"); + entry->thd->wait_for_wakeup_ready(); + } else { /* @@ -7840,11 +7856,20 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) { last= current->next == NULL; if (!current->error && - RUN_HOOK(binlog_storage, after_flush, - (current->thd, - current->cache_mngr->last_commit_pos_file, - current->cache_mngr->last_commit_pos_offset, synced, - first, last))) + (RUN_HOOK(binlog_storage, after_flush, + (current->thd, + current->cache_mngr->last_commit_pos_file, + current->cache_mngr->last_commit_pos_offset, synced, + first, last)) +#ifdef REPLICATION + || (DBUG_EVALUATE_IF("failed_report_binlog_update", 1, 0) || + repl_semisync_master. + reportBinlogUpdate(current->thd, + current->cache_mngr->last_commit_pos_file, + current->cache_mngr-> + last_commit_pos_offset)) +#endif + )) { current->error= ER_ERROR_ON_WRITE; current->commit_errno= -1; @@ -7927,12 +7952,22 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) { last= current->next == NULL; if (!current->error && - RUN_HOOK(binlog_storage, after_sync, + (RUN_HOOK(binlog_storage, after_sync, (current->thd, current->cache_mngr->last_commit_pos_file, current->cache_mngr->last_commit_pos_offset, - first, last))) + first, last)) +#ifdef REPLICATION + || (DBUG_EVALUATE_IF("simulate_after_sync_hook_error", 1, 0) || + repl_semisync_master.waitAfterSync(current->cache_mngr-> + last_commit_pos_file, + current->cache_mngr-> + last_commit_pos_offset)) +#endif + )) { - /* error is already printed inside hook */ + const char *hook_name= rpl_semi_sync_master_enabled ? + "'waitAfterSync'" : "binlog_storage 'after_sync'"; + sql_print_error("Failed to call '%s'", hook_name); } first= false; } @@ -8268,24 +8303,6 @@ void MYSQL_BIN_LOG::wait_for_update_relay_log(THD* thd) LOCK_log is released by the caller. */ -int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd, - const struct timespec *timeout) -{ - int ret= 0; - DBUG_ENTER("wait_for_update_bin_log"); - - thd_wait_begin(thd, THD_WAIT_BINLOG); - mysql_mutex_assert_owner(&LOCK_binlog_end_pos); - if (!timeout) - mysql_cond_wait(&COND_bin_log_updated, &LOCK_binlog_end_pos); - else - ret= mysql_cond_timedwait(&COND_bin_log_updated, &LOCK_binlog_end_pos, - const_cast<struct timespec *>(timeout)); - thd_wait_end(thd); - DBUG_RETURN(ret); -} - - int MYSQL_BIN_LOG::wait_for_update_binlog_end_pos(THD* thd, struct timespec *timeout) { @@ -10427,7 +10444,7 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list) *out_gtid_list= NULL; - if (!(ev= Log_event::read_log_event(cache, 0, &init_fdle, + if (!(ev= Log_event::read_log_event(cache, &init_fdle, opt_master_verify_checksum)) || ev->get_type_code() != FORMAT_DESCRIPTION_EVENT) { @@ -10443,7 +10460,7 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list) { Log_event_type typ; - ev= Log_event::read_log_event(cache, 0, fdle, opt_master_verify_checksum); + ev= Log_event::read_log_event(cache, fdle, opt_master_verify_checksum); if (!ev) { errormsg= "Could not read GTID list event while looking for GTID " @@ -10473,6 +10490,7 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list) return errormsg; } + struct st_mysql_storage_engine binlog_storage_engine= { MYSQL_HANDLERTON_INTERFACE_VERSION }; diff --git a/sql/log.h b/sql/log.h index 0a82f8813f0..02ace7c7921 100644 --- a/sql/log.h +++ b/sql/log.h @@ -349,6 +349,11 @@ public: /* for documentation of mutexes held in various places in code */ }; +/* Tell the io thread if we can delay the master info sync. */ +#define SEMI_SYNC_SLAVE_DELAY_SYNC 1 +/* Tell the io thread if the current event needs a ack. */ +#define SEMI_SYNC_NEED_ACK 2 + class MYSQL_QUERY_LOG: public MYSQL_LOG { public: @@ -435,6 +440,8 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG PSI_file_key m_key_file_log_index; PSI_file_key m_key_COND_queue_busy; + /** The instrumentation key to use for LOCK_binlog_end_pos. */ + PSI_mutex_key m_key_LOCK_binlog_end_pos; #endif struct group_commit_entry @@ -667,7 +674,8 @@ public: PSI_cond_key key_bin_log_update, PSI_file_key key_file_log, PSI_file_key key_file_log_index, - PSI_file_key key_COND_queue_busy) + PSI_file_key key_COND_queue_busy, + PSI_mutex_key key_LOCK_binlog_end_pos) { m_key_LOCK_index= key_LOCK_index; m_key_relay_log_update= key_relay_log_update; @@ -675,6 +683,7 @@ public: m_key_file_log= key_file_log; m_key_file_log_index= key_file_log_index; m_key_COND_queue_busy= key_COND_queue_busy; + m_key_LOCK_binlog_end_pos= key_LOCK_binlog_end_pos; } #endif diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 71e0aeee473..86759b8ed8b 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -933,6 +933,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, key_LOCK_thread_count, key_LOCK_thread_cache, key_PARTITION_LOCK_auto_inc; PSI_mutex_key key_RELAYLOG_LOCK_index; +PSI_mutex_key key_LOCK_relaylog_end_pos; PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state, key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool, key_LOCK_parallel_entry; PSI_mutex_key key_LOCK_binlog; @@ -947,6 +948,8 @@ PSI_mutex_key key_LOCK_after_binlog_sync; PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered, key_LOCK_slave_background; PSI_mutex_key key_TABLE_SHARE_LOCK_share; +PSI_mutex_key key_ss_mutex_LOCK_binlog_; +PSI_mutex_key key_ss_mutex_Ack_receiver_mutex; static PSI_mutex_info all_server_mutexes[]= { @@ -967,6 +970,7 @@ static PSI_mutex_info all_server_mutexes[]= { &key_BINLOG_LOCK_binlog_background_thread, "MYSQL_BIN_LOG::LOCK_binlog_background_thread", 0}, { &key_LOCK_binlog_end_pos, "MYSQL_BIN_LOG::LOCK_binlog_end_pos", 0 }, { &key_RELAYLOG_LOCK_index, "MYSQL_RELAY_LOG::LOCK_index", 0}, + { &key_LOCK_relaylog_end_pos, "MYSQL_RELAY_LOG::LOCK_binlog_end_pos", 0}, { &key_delayed_insert_mutex, "Delayed_insert::mutex", 0}, { &key_hash_filo_lock, "hash_filo::lock", 0}, { &key_LOCK_active_mi, "LOCK_active_mi", PSI_FLAG_GLOBAL}, @@ -1024,6 +1028,7 @@ static PSI_mutex_info all_server_mutexes[]= { &key_LOCK_rpl_thread, "LOCK_rpl_thread", 0}, { &key_LOCK_rpl_thread_pool, "LOCK_rpl_thread_pool", 0}, { &key_LOCK_parallel_entry, "LOCK_parallel_entry", 0}, + { &key_ss_mutex_Ack_receiver_mutex, "Ack_receiver::m_mutex", 0}, { &key_LOCK_binlog, "LOCK_binlog", 0} }; @@ -1078,6 +1083,7 @@ PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread, key_COND_parallel_entry, key_COND_group_commit_orderer, key_COND_prepare_ordered, key_COND_slave_background; PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates; +PSI_cond_key key_ss_cond_Ack_receiver_cond; static PSI_cond_info all_server_conds[]= { @@ -1131,6 +1137,7 @@ static PSI_cond_info all_server_conds[]= { &key_COND_start_thread, "COND_start_thread", PSI_FLAG_GLOBAL}, { &key_COND_wait_gtid, "COND_wait_gtid", 0}, { &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0}, + { &key_ss_cond_Ack_receiver_cond, "Ack_receiver::m_cond", 0}, { &key_COND_binlog_send, "COND_binlog_send", 0} }; @@ -1138,6 +1145,7 @@ PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert, key_thread_handle_manager, key_thread_main, key_thread_one_connection, key_thread_signal_hand, key_thread_slave_background, key_rpl_parallel_thread; +PSI_thread_key key_ss_thread_Ack_receiver_thread; static PSI_thread_info all_server_threads[]= { @@ -1164,6 +1172,7 @@ static PSI_thread_info all_server_threads[]= { &key_thread_one_connection, "one_connection", 0}, { &key_thread_signal_hand, "signal_handler", PSI_FLAG_GLOBAL}, { &key_thread_slave_background, "slave_background", PSI_FLAG_GLOBAL}, + { &key_ss_thread_Ack_receiver_thread, "Ack_receiver", PSI_FLAG_GLOBAL}, { &key_rpl_parallel_thread, "rpl_parallel_thread", 0} }; @@ -1743,6 +1752,7 @@ static void close_connections(void) Events::deinit(); slave_prepare_for_shutdown(); mysql_bin_log.stop_background_thread(); + ack_receiver.stop(); /* Give threads time to die. @@ -2226,7 +2236,6 @@ void clean_up(bool print_message) tc_log->close(); #ifdef HAVE_REPLICATION semi_sync_master_deinit(); - semi_sync_slave_deinit(); #endif delegates_destroy(); xid_cache_free(); @@ -4251,7 +4260,8 @@ static int init_common_variables() key_BINLOG_COND_bin_log_updated, key_file_binlog, key_file_binlog_index, - key_BINLOG_COND_queue_busy); + key_BINLOG_COND_queue_busy, + key_LOCK_binlog_end_pos); #endif /* @@ -5183,8 +5193,12 @@ static int init_server_components() "--log-bin option is not defined."); } - semi_sync_master_init(); - semi_sync_slave_init(); + if (repl_semisync_master.initObject() || + repl_semisync_slave.initObject()) + { + sql_print_error("Could not initialize semisync."); + unireg_abort(1); + } #endif if (opt_bin_log) @@ -8599,14 +8613,10 @@ SHOW_VAR status_vars[]= { {"Rpl_semi_sync_master_net_wait_time", (char*) &SHOW_FNAME(net_wait_time), SHOW_FUNC}, {"Rpl_semi_sync_master_net_waits", (char*) &SHOW_FNAME(net_wait_num), SHOW_FUNC}, {"Rpl_semi_sync_master_net_avg_wait_time", (char*) &SHOW_FNAME(avg_net_wait_time), SHOW_FUNC}, -#ifdef HAVE_ACC_RECEIVER {"Rpl_semi_sync_master_request_ack", (char*) &rpl_semi_sync_master_request_ack, SHOW_LONGLONG}, {"Rpl_semi_sync_master_get_ack", (char*)&rpl_semi_sync_master_get_ack, SHOW_LONGLONG}, -#endif {"Rpl_semi_sync_slave_status", (char*) &rpl_semi_sync_slave_status, SHOW_BOOL}, -#ifdef HAVE_ACC_RECEIVER {"Rpl_semi_sync_slave_send_ack", (char*) &rpl_semi_sync_slave_send_ack, SHOW_LONGLONG}, -#endif #endif /* HAVE_REPLICATION */ #ifdef HAVE_QUERY_CACHE {"Qcache_free_blocks", (char*) &query_cache.free_memory_blocks, SHOW_LONG_NOFLUSH}, @@ -8950,7 +8960,7 @@ static int mysql_init_variables(void) transactions_multi_engine= 0; rpl_transactions_multi_engine= 0; transactions_gtid_foreign_engine= 0; - run_hooks_enabled= 0; + run_hooks_enabled= 0; // don't run hooks, semisync does not need 'em log_bin_basename= NULL; log_bin_index= NULL; @@ -10348,6 +10358,8 @@ PSI_stage_info stage_waiting_for_master_update= { 0, "Waiting for master update" PSI_stage_info stage_waiting_for_relay_log_space= { 0, "Waiting for the slave SQL thread to free enough relay log space", 0}; PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave= { 0, "Waiting for semi-sync ACK from slave", 0}; +PSI_stage_info stage_waiting_for_semi_sync_slave={ 0, "Waiting for semi-sync slave connection", 0}; +PSI_stage_info stage_reading_semi_sync_ack={ 0, "Reading semi-sync ACK from slave", 0}; PSI_stage_info stage_waiting_for_slave_mutex_on_exit= { 0, "Waiting for slave mutex on exit", 0}; PSI_stage_info stage_waiting_for_slave_thread_to_start= { 0, "Waiting for slave thread to start", 0}; PSI_stage_info stage_waiting_for_table_flush= { 0, "Waiting for table flush", 0}; @@ -10508,6 +10520,9 @@ PSI_stage_info *all_server_stages[]= & stage_gtid_wait_other_connection, & stage_slave_background_process_request, & stage_slave_background_wait_request, + & stage_waiting_for_semi_sync_ack_from_slave, + & stage_waiting_for_semi_sync_slave, + & stage_reading_semi_sync_ack, & stage_waiting_for_deadlock_kill }; diff --git a/sql/mysqld.h b/sql/mysqld.h index 1da95fd13f5..5399ec91b19 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -310,6 +310,7 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, key_LOCK_start_thread, key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc; extern PSI_mutex_key key_RELAYLOG_LOCK_index; +extern PSI_mutex_key key_LOCK_relaylog_end_pos; extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state, key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool, key_LOCK_parallel_entry; diff --git a/sql/rpl_handler.h b/sql/rpl_handler.h index 62bd7d2606c..f8b11adbb69 100644 --- a/sql/rpl_handler.h +++ b/sql/rpl_handler.h @@ -209,6 +209,10 @@ extern Binlog_relay_IO_delegate *binlog_relay_io_delegate; if semisync replication is not enabled, we can return immediately. */ #ifdef HAVE_REPLICATION +/* + As semisync is unpluggined and its hooks are turned into static + invocations all other hooks are not run for optimization sake. +*/ #define RUN_HOOK(group, hook, args) \ (unlikely(run_hooks_enabled) ? group ##_delegate->hook args : 0) #else diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index 610bc77b683..14d74dc4bb7 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -311,6 +311,11 @@ class Master_info : public Slave_reporting_capability /* The parallel replication mode. */ enum_slave_parallel_mode parallel_mode; + /* + semi_ack is used to identify if the current binlog event needs an + ACK from slave, or if delay_master is enabled. + */ + int semi_ack; }; int init_master_info(Master_info* mi, const char* master_info_fname, diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 6e8b6edb44c..321eef97700 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -74,7 +74,8 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) key_RELAYLOG_COND_bin_log_updated, key_file_relaylog, key_file_relaylog_index, - key_RELAYLOG_COND_queue_busy); + key_RELAYLOG_COND_queue_busy, + key_LOCK_relaylog_end_pos); #endif group_relay_log_name[0]= event_relay_log_name[0]= diff --git a/sql/semisync.h b/sql/semisync.h index 3142f920f1e..cf791b36c1b 100644 --- a/sql/semisync.h +++ b/sql/semisync.h @@ -47,6 +47,20 @@ public: return exit_code; } + inline bool function_exit(const char *func_name, bool exit_code) + { + if (trace_level_ & kTraceFunction) + sql_print_information("<--- %s exit (%s)", func_name, + exit_code ? "True" : "False"); + return exit_code; + } + + inline void function_exit(const char *func_name) + { + if (trace_level_ & kTraceFunction) + sql_print_information("<--- %s exit", func_name); + } + Trace() :trace_level_(0L) {} @@ -79,5 +93,7 @@ public: #define REPLY_MAGIC_NUM_OFFSET 0 #define REPLY_BINLOG_POS_OFFSET (REPLY_MAGIC_NUM_OFFSET + REPLY_MAGIC_NUM_LEN) #define REPLY_BINLOG_NAME_OFFSET (REPLY_BINLOG_POS_OFFSET + REPLY_BINLOG_POS_LEN) +#define REPLY_MESSAGE_MAX_LENGTH \ + (REPLY_MAGIC_NUM_LEN + REPLY_BINLOG_POS_LEN + REPLY_BINLOG_NAME_LEN) #endif /* SEMISYNC_H */ diff --git a/sql/semisync_master.cc b/sql/semisync_master.cc index 21c52addce9..de91e30beec 100644 --- a/sql/semisync_master.cc +++ b/sql/semisync_master.cc @@ -24,7 +24,9 @@ #define TIME_BILLION 1000000000 /* This indicates whether semi-synchronous replication is enabled. */ -my_bool rpl_semi_sync_master_enabled; +my_bool rpl_semi_sync_master_enabled= 0; +unsigned long long rpl_semi_sync_master_request_ack = 0; +unsigned long long rpl_semi_sync_master_get_ack = 0; my_bool rpl_semi_sync_master_wait_no_slave = 1; my_bool rpl_semi_sync_master_status = 0; ulong rpl_semi_sync_master_wait_point = @@ -47,6 +49,15 @@ ulonglong rpl_semi_sync_master_net_wait_time = 0; ulonglong rpl_semi_sync_master_trx_wait_time = 0; ReplSemiSyncMaster repl_semisync_master; +Ack_receiver ack_receiver; + +/* + structure to save transaction log filename and position +*/ +typedef struct Trans_binlog_info { + my_off_t log_pos; + char log_file[FN_REFLEN]; +} Trans_binlog_info; static int getWaitTime(const struct timespec& start_ts); @@ -336,7 +347,8 @@ ReplSemiSyncMaster::ReplSemiSyncMaster() wait_file_pos_(0), master_enabled_(false), wait_timeout_(0L), - state_(0) + state_(0), + wait_point_(0) { strcpy(reply_file_name_, ""); strcpy(wait_file_name_, ""); @@ -345,18 +357,13 @@ ReplSemiSyncMaster::ReplSemiSyncMaster() int ReplSemiSyncMaster::initObject() { int result; - const char *kWho = "ReplSemiSyncMaster::initObject"; - if (init_done_) - { - fprintf(stderr, "%s called twice\n", kWho); - return 1; - } init_done_ = true; /* References to the parameter works after set_options(). */ setWaitTimeout(rpl_semi_sync_master_timeout); setTraceLevel(rpl_semi_sync_master_trace_level); + setWaitPoint(rpl_semi_sync_master_wait_point); /* Mutex initialization can only be done after MY_INIT(). */ mysql_mutex_init(key_LOCK_binlog, @@ -365,9 +372,22 @@ int ReplSemiSyncMaster::initObject() &COND_binlog_send, NULL); if (rpl_semi_sync_master_enabled) + { result = enableMaster(); + if (!result) + result= ack_receiver.start(); /* Start the ACK thread. */ + } else + { result = disableMaster(); + } + + /* + If rpl_semi_sync_master_wait_no_slave is disabled, let's temporarily + switch off semisync to avoid hang if there's none active slave. + */ + if (!rpl_semi_sync_master_wait_no_slave) + switch_off(); return result; } @@ -390,7 +410,6 @@ int ReplSemiSyncMaster::enableMaster() set_master_enabled(true); state_ = true; - run_hooks_enabled= 1; sql_print_information("Semi-sync replication enabled on the master."); } else @@ -498,12 +517,50 @@ void ReplSemiSyncMaster::remove_slave() unlock(); } -bool ReplSemiSyncMaster::is_semi_sync_slave() +int ReplSemiSyncMaster::reportReplyPacket(uint32 server_id, const uchar *packet, + ulong packet_len) { - int null_value; - long long val= 0; - get_user_var_int("rpl_semi_sync_slave", &val, &null_value); - return val; + const char *kWho = "ReplSemiSyncMaster::reportReplyPacket"; + int result= -1; + char log_file_name[FN_REFLEN+1]; + my_off_t log_file_pos; + ulong log_file_len = 0; + + function_enter(kWho); + + if (unlikely(packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)) + { + sql_print_error("Read semi-sync reply magic number error"); + goto l_end; + } + + if (unlikely(packet_len < REPLY_BINLOG_NAME_OFFSET)) + { + sql_print_error("Read semi-sync reply length error: packet is too small"); + goto l_end; + } + + log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET); + log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET; + if (unlikely(log_file_len >= FN_REFLEN)) + { + sql_print_error("Read semi-sync reply binlog file length too large"); + goto l_end; + } + strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len); + log_file_name[log_file_len] = 0; + + DBUG_ASSERT(dirname_length(log_file_name) == 0); + + if (trace_level_ & kTraceDetail) + sql_print_information("%s: Got reply(%s, %lu) from server %u", + kWho, log_file_name, (ulong)log_file_pos, server_id); + + rpl_semi_sync_master_get_ack++; + reportReplyBinlog(server_id, log_file_name, log_file_pos); + +l_end: + return function_exit(kWho, result); } int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id, @@ -602,6 +659,121 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id, return function_exit(kWho, 0); } +int ReplSemiSyncMaster::waitAfterSync(const char *log_file, my_off_t log_pos) +{ + if (!getMasterEnabled()) + return 0; + + int ret= 0; + if(log_pos && + waitPoint() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC) + ret= commitTrx(log_file + dirname_length(log_file), log_pos); + + return ret; +} + +int ReplSemiSyncMaster::waitAfterCommit(THD* thd, bool all) +{ + if (!getMasterEnabled()) + return 0; + + int ret= 0; + const char *log_file; + my_off_t log_pos; + + bool is_real_trans= + (all || thd->transaction.all.ha_list == 0); + /* + The coordinates are propagated to this point having been computed + in reportBinlogUpdate + */ + Trans_binlog_info *log_info= thd->semisync_info; + log_file= log_info && log_info->log_file[0] ? log_info->log_file : 0; + log_pos= log_info ? log_info->log_pos : 0; + + DBUG_ASSERT(!log_file || dirname_length(log_file) == 0); + + if (is_real_trans && + log_pos && + waitPoint() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT) + ret= commitTrx(log_file, log_pos); + + if (is_real_trans && log_info) + { + log_info->log_file[0]= 0; + log_info->log_pos= 0; + } + + return ret; +} + +int ReplSemiSyncMaster::waitAfterRollback(THD *thd, bool all) +{ + return waitAfterCommit(thd, all); +} + +/** + The method runs after flush to binary log is done. +*/ +int ReplSemiSyncMaster::reportBinlogUpdate(THD* thd, const char *log_file, + my_off_t log_pos) +{ + if (getMasterEnabled()) + { + Trans_binlog_info *log_info; + + if (!(log_info= thd->semisync_info)) + { + if(!(log_info= + (Trans_binlog_info*) my_malloc(sizeof(Trans_binlog_info), MYF(0)))) + return 1; + thd->semisync_info= log_info; + } + strcpy(log_info->log_file, log_file + dirname_length(log_file)); + log_info->log_pos = log_pos; + + return writeTranxInBinlog(log_info->log_file, log_pos); + } + + return 0; +} + +void ReplSemiSyncMaster::dump_start(THD* thd, + const char *log_file, + my_off_t log_pos) +{ + if (!thd->semi_sync_slave) + return; + + if (ack_receiver.add_slave(thd)) + { + sql_print_error("Failed to register slave to semi-sync ACK receiver " + "thread. Turning off semisync"); + thd->semi_sync_slave= 0; + return; + } + + add_slave(); + reportReplyBinlog(thd->variables.server_id, log_file + dirname_length(log_file), log_pos); + sql_print_information("Start semi-sync binlog_dump to slave (server_id: %d), pos(%s, %lu", + thd->variables.server_id, log_file, (unsigned long)log_pos); + + return; +} + +void ReplSemiSyncMaster::dump_end(THD* thd) +{ + if (!thd->semi_sync_slave) + return; + + sql_print_information("Stop semi-sync binlog_dump to slave (server_id: %d)", thd->variables.server_id); + + remove_slave(); + ack_receiver.remove_slave(thd); + + return; +} + int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, my_off_t trx_wait_binlog_pos) { @@ -850,42 +1022,23 @@ int ReplSemiSyncMaster::try_switch_on(int server_id, return function_exit(kWho, 0); } -int ReplSemiSyncMaster::reserveSyncHeader(unsigned char *header, - ulong size) +int ReplSemiSyncMaster::reserveSyncHeader(String* packet) { const char *kWho = "ReplSemiSyncMaster::reserveSyncHeader"; function_enter(kWho); - int hlen=0; - if (!is_semi_sync_slave()) - { - hlen= 0; - } - else - { - /* No enough space for the extra header, disable semi-sync master */ - if (sizeof(kSyncHeader) > size) - { - sql_print_warning("No enough space in the packet " - "for semi-sync extra header, " - "semi-sync replication disabled"); - disableMaster(); - return 0; - } - - /* Set the magic number and the sync status. By default, no sync - * is required. - */ - memcpy(header, kSyncHeader, sizeof(kSyncHeader)); - hlen= sizeof(kSyncHeader); - } - return function_exit(kWho, hlen); + /* Set the magic number and the sync status. By default, no sync + * is required. + */ + packet->append(reinterpret_cast<const char*>(kSyncHeader), + sizeof(kSyncHeader)); + return function_exit(kWho, 0); } -int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet, +int ReplSemiSyncMaster::updateSyncHeader(THD* thd, unsigned char *packet, const char *log_file_name, my_off_t log_file_pos, - uint32 server_id) + bool* need_sync) { const char *kWho = "ReplSemiSyncMaster::updateSyncHeader"; int cmp = 0; @@ -894,8 +1047,11 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet, /* If the semi-sync master is not enabled, or the slave is not a semi-sync * target, do not request replies from the slave. */ - if (!getMasterEnabled() || !is_semi_sync_slave()) + if (!getMasterEnabled() || !thd->semi_sync_slave) + { + *need_sync = false; return 0; + } function_enter(kWho); @@ -903,12 +1059,15 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet, /* This is the real check inside the mutex. */ if (!getMasterEnabled()) - goto l_end; // sync= false at this point in time + { + assert(sync == false); + goto l_end; + } if (is_on()) { /* semi-sync is ON */ - /* sync= false; No sync unless a transaction is involved. */ + sync = false; /* No sync unless a transaction is involved. */ if (reply_file_name_inited_) { @@ -962,8 +1121,9 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet, if (trace_level_ & kTraceDetail) sql_print_information("%s: server(%d), (%s, %lu) sync(%d), repl(%d)", - kWho, server_id, log_file_name, + kWho, thd->variables.server_id, log_file_name, (ulong)log_file_pos, sync, (int)is_on()); + *need_sync= sync; l_end: unlock(); @@ -1033,6 +1193,10 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name, log_file_name, (ulong)log_file_pos); switch_off(); } + else + { + rpl_semi_sync_master_request_ack++; + } } l_end: @@ -1041,19 +1205,12 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name, return function_exit(kWho, result); } -int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id, - const char *event_buf) +int ReplSemiSyncMaster::flushNet(THD *thd, + const char *event_buf) { - const char *kWho = "ReplSemiSyncMaster::readSlaveReply"; - const unsigned char *packet; - char log_file_name[FN_REFLEN]; - my_off_t log_file_pos; - ulong log_file_len = 0; - ulong packet_len; + const char *kWho = "ReplSemiSyncMaster::flushNet"; int result = -1; - struct timespec start_ts; - ulong trc_level = trace_level_; - LINT_INIT_STRUCT(start_ts); + NET* net= &thd->net; function_enter(kWho); @@ -1065,9 +1222,6 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id, goto l_end; } - if (trc_level & kTraceNetWait) - set_timespec(start_ts, 0); - /* We flush to make sure that the current event is sent to the network, * instead of being buffered in the TCP/IP stack. */ @@ -1079,82 +1233,35 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id, } net_clear(net, 0); - if (trc_level & kTraceDetail) - sql_print_information("%s: Wait for replica's reply", kWho); - - /* Wait for the network here. Though binlog dump thread can indefinitely wait - * here, transactions would not wait indefintely. - * Transactions wait on binlog replies detected by binlog dump threads. If - * binlog dump threads wait too long, transactions will timeout and continue. - */ - packet_len = my_net_read(net); - - if (trc_level & kTraceNetWait) - { - int wait_time = getWaitTime(start_ts); - if (wait_time < 0) - { - sql_print_error("Semi-sync master wait for reply " - "fail to get wait time."); - rpl_semi_sync_master_timefunc_fails++; - } - else - { - rpl_semi_sync_master_net_wait_num++; - rpl_semi_sync_master_net_wait_time += wait_time; - } - } - - if (packet_len == packet_error || packet_len < REPLY_BINLOG_NAME_OFFSET) - { - if (packet_len == packet_error) - sql_print_error("Read semi-sync reply network error: %s (errno: %d)", - net->last_error, net->last_errno); - else - sql_print_error("Read semi-sync reply length error: %s (errno: %d)", - net->last_error, net->last_errno); - goto l_end; - } - - packet = net->read_pos; - if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum) - { - sql_print_error("Read semi-sync reply magic number error"); - goto l_end; - } - - log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET); - log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET; - if (log_file_len >= FN_REFLEN) - { - sql_print_error("Read semi-sync reply binlog file length too large"); - goto l_end; - } - strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len); - log_file_name[log_file_len] = 0; - - if (trc_level & kTraceDetail) - sql_print_information("%s: Got reply (%s, %lu)", - kWho, log_file_name, (ulong)log_file_pos); - - result = reportReplyBinlog(server_id, log_file_name, log_file_pos); + net->pkt_nr++; + result = 0; + rpl_semi_sync_master_net_wait_num++; l_end: + thd->clear_error(); return function_exit(kWho, result); } - -int ReplSemiSyncMaster::resetMaster() +int ReplSemiSyncMaster::afterResetMaster() { - const char *kWho = "ReplSemiSyncMaster::resetMaster"; + const char *kWho = "ReplSemiSyncMaster::afterResetMaster"; int result = 0; function_enter(kWho); + if (rpl_semi_sync_master_enabled) + { + sql_print_information("Enable Semi-sync Master after reset master"); + enableMaster(); + } lock(); - state_ = getMasterEnabled()? 1 : 0; + if (rpl_semi_sync_master_clients == 0 && + !rpl_semi_sync_master_wait_no_slave) + state_ = 0; + else + state_ = getMasterEnabled()? 1 : 0; wait_file_name_inited_ = false; reply_file_name_inited_ = false; @@ -1176,6 +1283,31 @@ int ReplSemiSyncMaster::resetMaster() return function_exit(kWho, result); } +int ReplSemiSyncMaster::beforeResetMaster() +{ + const char *kWho = "ReplSemiSyncMaster::beforeResetMaster"; + int result = 0; + + function_enter(kWho); + + if (rpl_semi_sync_master_enabled) + disableMaster(); + + return function_exit(kWho, result); +} + +void ReplSemiSyncMaster::checkAndSwitch() +{ + lock(); + if (getMasterEnabled() && is_on()) + { + if (!rpl_semi_sync_master_wait_no_slave + && rpl_semi_sync_master_clients == 0) + switch_off(); + } + unlock(); +} + void ReplSemiSyncMaster::setExportStats() { lock(); @@ -1219,212 +1351,8 @@ static int getWaitTime(const struct timespec& start_ts) return (int)(end_usecs - start_usecs); } -/*************************************************************************** - Semisync master interface setup and deinit -***************************************************************************/ - -C_MODE_START - -int repl_semi_report_binlog_update(Binlog_storage_param *param, - const char *log_file, - my_off_t log_pos, uint32 flags) -{ - int error= 0; - - if (repl_semisync_master.getMasterEnabled()) - { - /* - Let us store the binlog file name and the position, so that - we know how long to wait for the binlog to the replicated to - the slave in synchronous replication. - */ - error= repl_semisync_master.writeTranxInBinlog(log_file, - log_pos); - } - - return error; -} - -int repl_semi_request_commit(Trans_param *param) -{ - return 0; -} - -int repl_semi_report_binlog_sync(Binlog_storage_param *param, - const char *log_file, - my_off_t log_pos, uint32 flags) -{ - int error= 0; - if (rpl_semi_sync_master_wait_point == - SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC) - { - error= repl_semisync_master.commitTrx(log_file, log_pos); - } - - return error; -} - -int repl_semi_report_commit(Trans_param *param) -{ - if (rpl_semi_sync_master_wait_point != - SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT) - { - return 0; - } - - bool is_real_trans= param->flags & TRANS_IS_REAL_TRANS; - - if (is_real_trans && param->log_pos) - { - const char *binlog_name= param->log_file; - return repl_semisync_master.commitTrx(binlog_name, param->log_pos); - } - return 0; -} - -int repl_semi_report_rollback(Trans_param *param) -{ - return repl_semi_report_commit(param); -} - -int repl_semi_binlog_dump_start(Binlog_transmit_param *param, - const char *log_file, - my_off_t log_pos) -{ - bool semi_sync_slave= repl_semisync_master.is_semi_sync_slave(); - - if (semi_sync_slave) - { - /* One more semi-sync slave */ - repl_semisync_master.add_slave(); - - /* - Let's assume this semi-sync slave has already received all - binlog events before the filename and position it requests. - */ - repl_semisync_master.reportReplyBinlog(param->server_id, log_file, log_pos); - } - sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)", - semi_sync_slave ? "semi-sync" : "asynchronous", - param->server_id, log_file, (ulong)log_pos); - - return 0; -} - -int repl_semi_binlog_dump_end(Binlog_transmit_param *param) -{ - bool semi_sync_slave= repl_semisync_master.is_semi_sync_slave(); - - sql_print_information("Stop %s binlog_dump to slave (server_id: %d)", - semi_sync_slave ? "semi-sync" : "asynchronous", - param->server_id); - if (semi_sync_slave) - { - /* One less semi-sync slave */ - repl_semisync_master.remove_slave(); - } - return 0; -} - -int repl_semi_reserve_header(Binlog_transmit_param *param, - unsigned char *header, - ulong size, ulong *len) -{ - *len += repl_semisync_master.reserveSyncHeader(header, size); - return 0; -} - -int repl_semi_before_send_event(Binlog_transmit_param *param, - unsigned char *packet, ulong len, - const char *log_file, my_off_t log_pos) -{ - return repl_semisync_master.updateSyncHeader(packet, - log_file, - log_pos, - param->server_id); -} - -int repl_semi_after_send_event(Binlog_transmit_param *param, - const char *event_buf, ulong len) -{ - if (repl_semisync_master.is_semi_sync_slave()) - { - THD *thd= current_thd; - /* - Possible errors in reading slave reply are ignored deliberately - because we do not want dump thread to quit on this. Error - messages are already reported. - */ - (void) repl_semisync_master.readSlaveReply(&thd->net, - param->server_id, event_buf); - thd->clear_error(); - } - return 0; -} - -int repl_semi_reset_master(Binlog_transmit_param *param) -{ - if (repl_semisync_master.resetMaster()) - return 1; - return 0; -} - -C_MODE_END - -Trans_observer trans_observer= -{ - sizeof(Trans_observer), // len - - repl_semi_report_commit, // after_commit - repl_semi_report_rollback, // after_rollback -}; - -Binlog_storage_observer storage_observer= -{ - sizeof(Binlog_storage_observer), // len - - repl_semi_report_binlog_update, // report_update - repl_semi_report_binlog_sync, // after_sync -}; - -Binlog_transmit_observer transmit_observer= -{ - sizeof(Binlog_transmit_observer), // len - - repl_semi_binlog_dump_start, // start - repl_semi_binlog_dump_end, // stop - repl_semi_reserve_header, // reserve_header - repl_semi_before_send_event, // before_send_event - repl_semi_after_send_event, // after_send_event - repl_semi_reset_master, // reset -}; - -static bool semi_sync_master_inited= 0; - -int semi_sync_master_init() -{ - void *p= 0; - if (repl_semisync_master.initObject()) - return 1; - if (register_trans_observer(&trans_observer, p)) - return 1; - if (register_binlog_storage_observer(&storage_observer, p)) - return 1; - if (register_binlog_transmit_observer(&transmit_observer, p)) - return 1; - semi_sync_master_inited= 1; - return 0; -} - void semi_sync_master_deinit() { - void *p= 0; - if (!semi_sync_master_inited) - return; - - unregister_trans_observer(&trans_observer, p); - unregister_binlog_storage_observer(&storage_observer, p); - unregister_binlog_transmit_observer(&transmit_observer, p); repl_semisync_master.cleanup(); - semi_sync_master_inited= 0; + ack_receiver.cleanup(); } diff --git a/sql/semisync_master.h b/sql/semisync_master.h index ff1e3dd48b4..66fac17cd45 100644 --- a/sql/semisync_master.h +++ b/sql/semisync_master.h @@ -20,14 +20,13 @@ #define SEMISYNC_MASTER_H #include "semisync.h" +#include "semisync_master_ack_receiver.h" #ifdef HAVE_PSI_INTERFACE extern PSI_mutex_key key_LOCK_binlog; extern PSI_cond_key key_COND_binlog_send; #endif -extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave; - struct TranxNode { char log_name_[FN_REFLEN]; my_off_t log_pos_; @@ -432,6 +431,9 @@ class ReplSemiSyncMaster bool state_; /* whether semi-sync is switched */ + /*Waiting for ACK before/after innodb commit*/ + ulong wait_point_; + void lock(); void unlock(); void cond_broadcast(); @@ -473,6 +475,17 @@ class ReplSemiSyncMaster wait_timeout_ = wait_timeout; } + /*set the ACK point, after binlog sync or after transaction commit*/ + void setWaitPoint(unsigned long ack_point) + { + wait_point_ = ack_point; + } + + ulong waitPoint() //no cover line + { + return wait_point_; //no cover line + } + /* Initialize this class after MySQL parameters are initialized. this * function should be called once at bootstrap time. */ @@ -490,8 +503,9 @@ class ReplSemiSyncMaster /* Remove a semi-sync replication slave */ void remove_slave(); - /* Is the slave servered by the thread requested semi-sync */ - bool is_semi_sync_slave(); + /* It parses a reply packet and call reportReplyBinlog to handle it. */ + int reportReplyPacket(uint32 server_id, const uchar *packet, + ulong packet_len); /* In semi-sync replication, reports up to which binlog position we have * received replies from the slave indicating that it already get the events. @@ -527,42 +541,61 @@ class ReplSemiSyncMaster int commitTrx(const char* trx_wait_binlog_name, my_off_t trx_wait_binlog_pos); + /*Wait for ACK after writing/sync binlog to file*/ + int waitAfterSync(const char* log_file, my_off_t log_pos); + + /*Wait for ACK after commting the transaction*/ + int waitAfterCommit(THD* thd, bool all); + + /*Wait after the transaction is rollback*/ + int waitAfterRollback(THD *thd, bool all); + /*Store the current binlog position in active_tranxs_. This position should + * be acked by slave*/ + int reportBinlogUpdate(THD *thd, const char *log_file,my_off_t log_pos); + + void dump_start(THD* thd, + const char *log_file, + my_off_t log_pos); + + void dump_end(THD* thd); + /* Reserve space in the replication event packet header: * . slave semi-sync off: 1 byte - (0) * . slave semi-sync on: 3 byte - (0, 0xef, 0/1} - * + * * Input: - * header - (IN) the header buffer - * size - (IN) size of the header buffer + * packet - (IN) the header buffer * * Return: * size of the bytes reserved for header */ - int reserveSyncHeader(unsigned char *header, unsigned long size); + int reserveSyncHeader(String* packet); /* Update the sync bit in the packet header to indicate to the slave whether * the master will wait for the reply of the event. If semi-sync is switched * off and we detect that the slave is catching up, we switch semi-sync on. * * Input: + * THD - (IN) current dump thread * packet - (IN) the packet containing the replication event * log_file_name - (IN) the event ending position's file name * log_file_pos - (IN) the event ending position's file offset + * need_sync - (IN) identify if flushNet is needed to call. * server_id - (IN) master server id number * * Return: * 0: success; non-zero: error */ - int updateSyncHeader(unsigned char *packet, + int updateSyncHeader(THD* thd, unsigned char *packet, const char *log_file_name, - my_off_t log_file_pos, - uint32 server_id); + my_off_t log_file_pos, + bool* need_sync); /* Called when a transaction finished writing binlog events. * . update the 'largest' transactions' binlog event position * . insert the ending position in the active transaction list if * semi-sync is on - * + * * Input: (the transaction events' ending binlog position) * log_file_name - (IN) transaction ending position's file name * log_file_pos - (IN) transaction ending position's file offset @@ -574,16 +607,8 @@ class ReplSemiSyncMaster /* Read the slave's reply so that we know how much progress the slave makes * on receive replication events. - * - * Input: - * net - (IN) the connection to master - * server_id - (IN) master server id number - * event_buf - (IN) pointer to the event packet - * - * Return: - * 0: success; non-zero: error */ - int readSlaveReply(NET *net, uint32 server_id, const char *event_buf); + int flushNet(THD* thd, const char *event_buf); /* Export internal statistics for semi-sync replication. */ void setExportStats(); @@ -591,7 +616,12 @@ class ReplSemiSyncMaster /* 'reset master' command is issued from the user and semi-sync need to * go off for that. */ - int resetMaster(); + int afterResetMaster(); + + /*called before reset master*/ + int beforeResetMaster(); + + void checkAndSwitch(); }; enum rpl_semi_sync_master_wait_point_t { @@ -599,6 +629,9 @@ enum rpl_semi_sync_master_wait_point_t { SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT, }; +extern ReplSemiSyncMaster repl_semisync_master; +extern Ack_receiver ack_receiver; + /* System and status variables for the master component */ extern my_bool rpl_semi_sync_master_enabled; extern my_bool rpl_semi_sync_master_status; @@ -620,6 +653,8 @@ extern ulonglong rpl_semi_sync_master_net_wait_num; extern ulonglong rpl_semi_sync_master_trx_wait_num; extern ulonglong rpl_semi_sync_master_net_wait_time; extern ulonglong rpl_semi_sync_master_trx_wait_time; +extern unsigned long long rpl_semi_sync_master_request_ack; +extern unsigned long long rpl_semi_sync_master_get_ack; /* This indicates whether we should keep waiting if no semi-sync slave @@ -630,7 +665,10 @@ extern ulonglong rpl_semi_sync_master_trx_wait_time; extern char rpl_semi_sync_master_wait_no_slave; extern ReplSemiSyncMaster repl_semisync_master; -int semi_sync_master_init(); +extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave; +extern PSI_stage_info stage_reading_semi_sync_ack; +extern PSI_stage_info stage_waiting_for_semi_sync_slave; + void semi_sync_master_deinit(); #endif /* SEMISYNC_MASTER_H */ diff --git a/sql/semisync_master_ack_receiver.cc b/sql/semisync_master_ack_receiver.cc new file mode 100644 index 00000000000..eee35cc122f --- /dev/null +++ b/sql/semisync_master_ack_receiver.cc @@ -0,0 +1,308 @@ +/* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include <my_global.h> +#include "semisync_master.h" +#include "semisync_master_ack_receiver.h" + +extern PSI_mutex_key key_ss_mutex_Ack_receiver_mutex; +extern PSI_cond_key key_ss_cond_Ack_receiver_cond; +extern PSI_thread_key key_ss_thread_Ack_receiver_thread; +extern ReplSemiSyncMaster repl_semisync; + +/* Callback function of ack receive thread */ +pthread_handler_t ack_receive_handler(void *arg) +{ + Ack_receiver *recv= reinterpret_cast<Ack_receiver *>(arg); + + my_thread_init(); + recv->run(); + my_thread_end(); + + return NULL; +} + +Ack_receiver::Ack_receiver() +{ + const char *kWho = "Ack_receiver::Ack_receiver"; + function_enter(kWho); + + m_status= ST_DOWN; + mysql_mutex_init(key_ss_mutex_Ack_receiver_mutex, &m_mutex, + MY_MUTEX_INIT_FAST); + mysql_cond_init(key_ss_cond_Ack_receiver_cond, &m_cond, NULL); + m_pid= 0; + + function_exit(kWho); +} + +void Ack_receiver::cleanup() +{ + const char *kWho = "Ack_receiver::~Ack_receiver"; + function_enter(kWho); + + stop(); + mysql_mutex_destroy(&m_mutex); + mysql_cond_destroy(&m_cond); + + function_exit(kWho); +} + +bool Ack_receiver::start() +{ + const char *kWho = "Ack_receiver::start"; + function_enter(kWho); + + mysql_mutex_lock(&m_mutex); + if(m_status == ST_DOWN) + { + pthread_attr_t attr; + + m_status= ST_UP; + + if (DBUG_EVALUATE_IF("rpl_semisync_simulate_create_thread_failure", 1, 0) || + pthread_attr_init(&attr) != 0 || + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) != 0 || +#ifndef _WIN32 + pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0 || +#endif + mysql_thread_create(key_ss_thread_Ack_receiver_thread, &m_pid, + &attr, ack_receive_handler, this)) + { + sql_print_error("Failed to start semi-sync ACK receiver thread, " + " could not create thread(errno:%d)", errno); + + m_status= ST_DOWN; + mysql_mutex_unlock(&m_mutex); + + return function_exit(kWho, true); + } + (void) pthread_attr_destroy(&attr); + } + mysql_mutex_unlock(&m_mutex); + + return function_exit(kWho, false); +} + +void Ack_receiver::stop() +{ + const char *kWho = "Ack_receiver::stop"; + function_enter(kWho); + + mysql_mutex_lock(&m_mutex); + if (m_status == ST_UP) + { + m_status= ST_STOPPING; + mysql_cond_broadcast(&m_cond); + + while (m_status == ST_STOPPING) + mysql_cond_wait(&m_cond, &m_mutex); + + DBUG_ASSERT(m_status == ST_DOWN); + + m_pid= 0; + } + mysql_mutex_unlock(&m_mutex); + + function_exit(kWho); +} + +bool Ack_receiver::add_slave(THD *thd) +{ + Slave *slave; + const char *kWho = "Ack_receiver::add_slave"; + function_enter(kWho); + + if (!(slave= new Slave)) + return function_exit(kWho, true); + + slave->thd= thd; + slave->vio= *thd->net.vio; + slave->vio.mysql_socket.m_psi= NULL; + slave->vio.read_timeout= 1; + + mysql_mutex_lock(&m_mutex); + m_slaves.push_back(slave); + m_slaves_changed= true; + mysql_cond_broadcast(&m_cond); + mysql_mutex_unlock(&m_mutex); + + return function_exit(kWho, false); +} + +void Ack_receiver::remove_slave(THD *thd) +{ + I_List_iterator<Slave> it(m_slaves); + Slave *slave; + const char *kWho = "Ack_receiver::remove_slave"; + function_enter(kWho); + + mysql_mutex_lock(&m_mutex); + + while ((slave= it++)) + { + if (slave->thd == thd) + { + delete slave; + m_slaves_changed= true; + break; + } + } + mysql_mutex_unlock(&m_mutex); + function_exit(kWho); +} + +inline void Ack_receiver::set_stage_info(const PSI_stage_info &stage) +{ + MYSQL_SET_STAGE(stage.m_key, __FILE__, __LINE__); +} + +inline void Ack_receiver::wait_for_slave_connection() +{ + set_stage_info(stage_waiting_for_semi_sync_slave); + mysql_cond_wait(&m_cond, &m_mutex); +} + +my_socket Ack_receiver::get_slave_sockets(fd_set *fds, uint *count) +{ + my_socket max_fd= INVALID_SOCKET; + Slave *slave; + I_List_iterator<Slave> it(m_slaves); + + *count= 0; + FD_ZERO(fds); + while ((slave= it++)) + { + (*count)++; + my_socket fd= slave->sock_fd(); + max_fd= (fd > max_fd ? fd : max_fd); + FD_SET(fd, fds); + } + + return max_fd; +} + +/* Auxilary function to initialize a NET object with given net buffer. */ +static void init_net(NET *net, unsigned char *buff, unsigned int buff_len) +{ + memset(net, 0, sizeof(NET)); + net->max_packet= buff_len; + net->buff= buff; + net->buff_end= buff + buff_len; + net->read_pos= net->buff; +} + +void Ack_receiver::run() +{ + // skip LOCK_global_system_variables due to the 3rd arg + THD *thd= new THD(next_thread_id(), false, true); + NET net; + unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH]; + fd_set read_fds; + my_socket max_fd= INVALID_SOCKET; + Slave *slave; + + my_thread_init(); + + DBUG_ENTER("Ack_receiver::run"); + + sql_print_information("Starting ack receiver thread"); + thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND; + thd->thread_stack= (char*) &thd; + thd->store_globals(); + thd->security_ctx->skip_grants(); + thread_safe_increment32(&service_thread_count); + thd->set_command(COM_DAEMON); + init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH); + + mysql_mutex_lock(&m_mutex); + m_slaves_changed= true; + mysql_mutex_unlock(&m_mutex); + + while (1) + { + fd_set fds; + int ret; + uint slave_count; + + mysql_mutex_lock(&m_mutex); + if (unlikely(m_status == ST_STOPPING)) + goto end; + + set_stage_info(stage_waiting_for_semi_sync_ack_from_slave); + if (unlikely(m_slaves_changed)) + { + if (unlikely(m_slaves.is_empty())) + { + wait_for_slave_connection(); + mysql_mutex_unlock(&m_mutex); + continue; + } + + max_fd= get_slave_sockets(&read_fds, &slave_count); + m_slaves_changed= false; + DBUG_PRINT("info", ("fd count %u, max_fd %d", slave_count, max_fd)); + } + + struct timeval tv= {1, 0}; + fds= read_fds; + /* select requires max fd + 1 for the first argument */ + ret= select(max_fd+1, &fds, NULL, NULL, &tv); + if (ret <= 0) + { + mysql_mutex_unlock(&m_mutex); + + ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error", -1, ret); + + if (ret == -1) + sql_print_information("Failed to select() on semi-sync dump sockets, " + "error: errno=%d", socket_errno); + /* Sleep 1us, so other threads can catch the m_mutex easily. */ + my_sleep(1); + continue; + } + + set_stage_info(stage_reading_semi_sync_ack); + I_List_iterator<Slave> it(m_slaves); + + while ((slave= it++)) + { + if (FD_ISSET(slave->sock_fd(), &fds)) + { + ulong len; + + net_clear(&net, 0); + net.vio= &slave->vio; + + len= my_net_read(&net); + if (likely(len != packet_error)) + repl_semisync_master.reportReplyPacket(slave->server_id(), + net.read_pos, len); + else if (net.last_errno == ER_NET_READ_ERROR) + FD_CLR(slave->sock_fd(), &read_fds); + } + } + mysql_mutex_unlock(&m_mutex); + } +end: + sql_print_information("Stopping ack receiver thread"); + m_status= ST_DOWN; + delete thd; + thread_safe_decrement32(&service_thread_count); + signal_thd_deleted(); + mysql_cond_broadcast(&m_cond); + mysql_mutex_unlock(&m_mutex); + DBUG_VOID_RETURN; +} diff --git a/sql/semisync_master_ack_receiver.h b/sql/semisync_master_ack_receiver.h new file mode 100644 index 00000000000..25307131bad --- /dev/null +++ b/sql/semisync_master_ack_receiver.h @@ -0,0 +1,119 @@ +/* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#ifndef SEMISYNC_MASTER_ACK_RECEIVER_DEFINED +#define SEMISYNC_MASTER_ACK_RECEIVER_DEFINED + +#include "my_global.h" +#include "my_pthread.h" +#include "sql_class.h" +#include "semisync.h" +/** + Ack_receiver is responsible to control ack receive thread and maintain + slave information used by ack receive thread. + + There are mainly four operations on ack receive thread: + start: start ack receive thread + stop: stop ack receive thread + add_slave: maintain a new semisync slave's information + remove_slave: remove a semisync slave's information + */ +class Ack_receiver : public ReplSemiSyncBase +{ +public: + Ack_receiver(); + ~Ack_receiver() {} + void cleanup(); + /** + Notify ack receiver to receive acks on the dump session. + + It adds the given dump thread into the slave list and wakes + up ack thread if it is waiting for any slave coming. + + @param[in] thd THD of a dump thread. + + @return it return false if succeeds, otherwise true is returned. + */ + bool add_slave(THD *thd); + + /** + Notify ack receiver not to receive ack on the dump session. + + it removes the given dump thread from slave list. + + @param[in] thd THD of a dump thread. + */ + void remove_slave(THD *thd); + + /** + Start ack receive thread + + @return it return false if succeeds, otherwise true is returned. + */ + bool start(); + + /** + Stop ack receive thread + */ + void stop(); + + /** + The core of ack receive thread. + + It monitors all slaves' sockets and receives acks when they come. + */ + void run(); + + void setTraceLevel(unsigned long trace_level) + { + trace_level_= trace_level; + } +private: + enum status {ST_UP, ST_DOWN, ST_STOPPING}; + uint8 m_status; + /* + Protect m_status, m_slaves_changed and m_slaves. ack thread and other + session may access the variables at the same time. + */ + mysql_mutex_t m_mutex; + mysql_cond_t m_cond; + /* If slave list is updated(add or remove). */ + bool m_slaves_changed; + + class Slave :public ilink + { +public: + THD *thd; + Vio vio; + + my_socket sock_fd() { return vio.mysql_socket.fd; } + uint server_id() { return thd->variables.server_id; } + }; + + I_List<Slave> m_slaves; + + pthread_t m_pid; + +/* Declare them private, so no one can copy the object. */ + Ack_receiver(const Ack_receiver &ack_receiver); + Ack_receiver& operator=(const Ack_receiver &ack_receiver); + + void set_stage_info(const PSI_stage_info &stage); + void wait_for_slave_connection(); + my_socket get_slave_sockets(fd_set *fds, uint *count); +}; + +extern Ack_receiver ack_receiver; +#endif diff --git a/sql/semisync_slave.cc b/sql/semisync_slave.cc index 63bf9dca0e8..012f807d28e 100644 --- a/sql/semisync_slave.cc +++ b/sql/semisync_slave.cc @@ -18,10 +18,13 @@ #include <my_global.h> #include "semisync_slave.h" -my_bool rpl_semi_sync_slave_enabled; +ReplSemiSyncSlave repl_semisync_slave; + +my_bool rpl_semi_sync_slave_enabled= 0; + +char rpl_semi_sync_slave_delay_master; my_bool rpl_semi_sync_slave_status= 0; ulong rpl_semi_sync_slave_trace_level; -ReplSemiSyncSlave repl_semisync_slave; /* indicate whether or not the slave should send a reply to the master. @@ -31,30 +34,27 @@ ReplSemiSyncSlave repl_semisync_slave; checked in repl_semi_slave_queue_event. */ bool semi_sync_need_reply= false; - +unsigned int rpl_semi_sync_slave_kill_conn_timeout; +unsigned long long rpl_semi_sync_slave_send_ack = 0; int ReplSemiSyncSlave::initObject() { int result= 0; - const char *kWho = "ReplSemiSyncSlave::initObject"; - if (init_done_) - { - fprintf(stderr, "%s called twice\n", kWho); - return 1; - } init_done_ = true; /* References to the parameter works after set_options(). */ setSlaveEnabled(rpl_semi_sync_slave_enabled); setTraceLevel(rpl_semi_sync_slave_trace_level); + setDelayMaster(rpl_semi_sync_slave_delay_master); + setKillConnTimeout(rpl_semi_sync_slave_kill_conn_timeout); return result; } int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header, unsigned long total_len, - bool *need_reply, + int *semi_flags, const char **payload, unsigned long *payload_len) { @@ -62,128 +62,119 @@ int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header, int read_res = 0; function_enter(kWho); - if ((unsigned char)(header[0]) == kPacketMagicNum) - { - *need_reply = (header[1] & kPacketFlagSync); - *payload_len = total_len - 2; - *payload = header + 2; - - if (trace_level_ & kTraceDetail) - sql_print_information("%s: reply - %d", kWho, *need_reply); - } - else + if (rpl_semi_sync_slave_status) { - sql_print_error("Missing magic number for semi-sync packet, packet " - "len: %lu", total_len); - read_res = -1; + if (DBUG_EVALUATE_IF("semislave_corrupt_log", 0, 1) + && (unsigned char)(header[0]) == kPacketMagicNum) + { + semi_sync_need_reply = (header[1] & kPacketFlagSync); + *payload_len = total_len - 2; + *payload = header + 2; + + if (trace_level_ & kTraceDetail) + sql_print_information("%s: reply - %d", kWho, semi_sync_need_reply); + + if (semi_sync_need_reply) + *semi_flags |= SEMI_SYNC_NEED_ACK; + if (isDelayMaster()) + *semi_flags |= SEMI_SYNC_SLAVE_DELAY_SYNC; + } + else + { + sql_print_error("Missing magic number for semi-sync packet, packet " + "len: %lu", total_len); + read_res = -1; + } + } else { + *payload= header; + *payload_len= total_len; } return function_exit(kWho, read_res); } -int ReplSemiSyncSlave::slaveStart(Binlog_relay_IO_param *param) +int ReplSemiSyncSlave::slaveStart(Master_info *mi) { bool semi_sync= getSlaveEnabled(); sql_print_information("Slave I/O thread: Start %s replication to\ master '%s@%s:%d' in log '%s' at position %lu", semi_sync ? "semi-sync" : "asynchronous", - param->user, param->host, param->port, - param->master_log_name[0] ? param->master_log_name : "FIRST", - (unsigned long)param->master_log_pos); + const_cast<char *>(mi->user), mi->host, mi->port, + const_cast<char *>(mi->master_log_name), + (unsigned long)(mi->master_log_pos)); if (semi_sync && !rpl_semi_sync_slave_status) rpl_semi_sync_slave_status= 1; + + /*clear the counter*/ + rpl_semi_sync_slave_send_ack= 0; return 0; } -int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param) +int ReplSemiSyncSlave::slaveStop(Master_info *mi) { if (rpl_semi_sync_slave_status) rpl_semi_sync_slave_status= 0; - if (mysql_reply) - mysql_close(mysql_reply); - mysql_reply= 0; + if (getSlaveEnabled()) + killConnection(mi->mysql); return 0; } -int ReplSemiSyncSlave::slaveReply(MYSQL *mysql, - const char *binlog_filename, - my_off_t binlog_filepos) +int ReplSemiSyncSlave::resetSlave(Master_info *mi) { - const char *kWho = "ReplSemiSyncSlave::slaveReply"; - NET *net= &mysql->net; - uchar reply_buffer[REPLY_MAGIC_NUM_LEN - + REPLY_BINLOG_POS_LEN - + REPLY_BINLOG_NAME_LEN]; - int reply_res, name_len = strlen(binlog_filename); - - function_enter(kWho); - - /* Prepare the buffer of the reply. */ - reply_buffer[REPLY_MAGIC_NUM_OFFSET] = kPacketMagicNum; - int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos); - memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET, - binlog_filename, - name_len + 1 /* including trailing '\0' */); - - if (trace_level_ & kTraceDetail) - sql_print_information("%s: reply (%s, %lu)", kWho, - binlog_filename, (ulong)binlog_filepos); - - net_clear(net, 0); - /* Send the reply. */ - reply_res = my_net_write(net, reply_buffer, - name_len + REPLY_BINLOG_NAME_OFFSET); - if (!reply_res) - { - reply_res = net_flush(net); - if (reply_res) - sql_print_error("Semi-sync slave net_flush() reply failed"); - } - else - { - sql_print_error("Semi-sync slave send reply failed: %s (%d)", - net->last_error, net->last_errno); - } - - return function_exit(kWho, reply_res); + return 0; } -/*************************************************************************** - Semisync slave interface setup and deinit -***************************************************************************/ +void ReplSemiSyncSlave::killConnection(MYSQL *mysql) +{ + if (!mysql) + return; -C_MODE_START + char kill_buffer[30]; + MYSQL *kill_mysql = NULL; + kill_mysql = mysql_init(kill_mysql); + mysql_options(kill_mysql, MYSQL_OPT_CONNECT_TIMEOUT, &kill_conn_timeout_); + mysql_options(kill_mysql, MYSQL_OPT_READ_TIMEOUT, &kill_conn_timeout_); + mysql_options(kill_mysql, MYSQL_OPT_WRITE_TIMEOUT, &kill_conn_timeout_); -int repl_semi_reset_slave(Binlog_relay_IO_param *param) -{ - // TODO: reset semi-sync slave status here - return 0; + bool ret= (!mysql_real_connect(kill_mysql, mysql->host, + mysql->user, mysql->passwd,0, mysql->port, mysql->unix_socket, 0)); + if (DBUG_EVALUATE_IF("semisync_slave_failed_kill", 1, 0) || ret) + { + sql_print_information("cannot connect to master to kill slave io_thread's " + "connection"); + if (!ret) + mysql_close(kill_mysql); + return; + } + uint kill_buffer_length = my_snprintf(kill_buffer, 30, "KILL %lu", + mysql->thread_id); + mysql_real_query(kill_mysql, kill_buffer, kill_buffer_length); + mysql_close(kill_mysql); } -int repl_semi_slave_request_dump(Binlog_relay_IO_param *param, - uint32 flags) +int ReplSemiSyncSlave::requestTransmit(Master_info *mi) { - MYSQL *mysql= param->mysql; + MYSQL *mysql= mi->mysql; MYSQL_RES *res= 0; MYSQL_ROW row; const char *query; - if (!repl_semisync_slave.getSlaveEnabled()) + if (!getSlaveEnabled()) return 0; - /* Check if master server has semi-sync plugin installed */ query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'"; if (mysql_real_query(mysql, query, strlen(query)) || !(res= mysql_store_result(mysql))) { - sql_print_error("Execution failed on master: %s", query); + sql_print_error("Execution failed on master: %s, error :%s", query, mysql_error(mysql)); return 1; } row= mysql_fetch_row(res); - if (!row) + if (DBUG_EVALUATE_IF("master_not_support_semisync", 1, 0) + || !row) { /* Master does not support semi-sync */ sql_print_warning("Master server does not support semi-sync, " @@ -195,8 +186,8 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param, mysql_free_result(res); /* - Tell master dump thread that we want to do semi-sync - replication + Tell master dump thread that we want to do semi-sync + replication */ query= "SET @rpl_semi_sync_slave= 1"; if (mysql_real_query(mysql, query, strlen(query))) @@ -206,83 +197,56 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param, } mysql_free_result(mysql_store_result(mysql)); rpl_semi_sync_slave_status= 1; - return 0; -} -int repl_semi_slave_read_event(Binlog_relay_IO_param *param, - const char *packet, unsigned long len, - const char **event_buf, unsigned long *event_len) -{ - if (rpl_semi_sync_slave_status) - return repl_semisync_slave.slaveReadSyncHeader(packet, len, - &semi_sync_need_reply, - event_buf, event_len); - *event_buf= packet; - *event_len= len; - return 0; -} - -int repl_semi_slave_queue_event(Binlog_relay_IO_param *param, - const char *event_buf, - unsigned long event_len, - uint32 flags) -{ - if (rpl_semi_sync_slave_status && semi_sync_need_reply) - { - /* - We deliberately ignore the error in slaveReply, such error - should not cause the slave IO thread to stop, and the error - messages are already reported. - */ - (void) repl_semisync_slave.slaveReply(param->mysql, - param->master_log_name, - param->master_log_pos); - } return 0; } -int repl_semi_slave_io_start(Binlog_relay_IO_param *param) -{ - return repl_semisync_slave.slaveStart(param); -} - -int repl_semi_slave_io_end(Binlog_relay_IO_param *param) +int ReplSemiSyncSlave::slaveReply(Master_info *mi) { - return repl_semisync_slave.slaveStop(param); -} - -C_MODE_END + const char *kWho = "ReplSemiSyncSlave::slaveReply"; + MYSQL* mysql= mi->mysql; + const char *binlog_filename= const_cast<char *>(mi->master_log_name); + my_off_t binlog_filepos= mi->master_log_pos; -Binlog_relay_IO_observer relay_io_observer= -{ - sizeof(Binlog_relay_IO_observer), // len + NET *net= &mysql->net; + uchar reply_buffer[REPLY_MAGIC_NUM_LEN + + REPLY_BINLOG_POS_LEN + + REPLY_BINLOG_NAME_LEN]; + int reply_res = 0; + int name_len = strlen(binlog_filename); - repl_semi_slave_io_start, // start - repl_semi_slave_io_end, // stop - repl_semi_slave_request_dump, // request_transmit - repl_semi_slave_read_event, // after_read_event - repl_semi_slave_queue_event, // after_queue_event - repl_semi_reset_slave, // reset -}; + function_enter(kWho); -static bool semi_sync_slave_inited= 0; + if (rpl_semi_sync_slave_status && semi_sync_need_reply) + { + /* Prepare the buffer of the reply. */ + reply_buffer[REPLY_MAGIC_NUM_OFFSET] = kPacketMagicNum; + int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos); + memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET, + binlog_filename, + name_len + 1 /* including trailing '\0' */); -int semi_sync_slave_init() -{ - void *p= 0; - if (repl_semisync_slave.initObject()) - return 1; - if (register_binlog_relay_io_observer(&relay_io_observer, p)) - return 1; - semi_sync_slave_inited= 1; - return 0; -} + if (trace_level_ & kTraceDetail) + sql_print_information("%s: reply (%s, %lu)", kWho, + binlog_filename, (ulong)binlog_filepos); + + net_clear(net, 0); + /* Send the reply. */ + reply_res = my_net_write(net, reply_buffer, + name_len + REPLY_BINLOG_NAME_OFFSET); + if (!reply_res) + { + reply_res = DBUG_EVALUATE_IF("semislave_failed_net_flush", 1, net_flush(net)); + if (reply_res) + sql_print_error("Semi-sync slave net_flush() reply failed"); + rpl_semi_sync_slave_send_ack++; + } + else + { + sql_print_error("Semi-sync slave send reply failed: %s (%d)", + net->last_error, net->last_errno); + } + } -void semi_sync_slave_deinit() -{ - void *p= 0; - if (!semi_sync_slave_inited) - return; - unregister_binlog_relay_io_observer(&relay_io_observer, p); - semi_sync_slave_inited= 0; + return function_exit(kWho, reply_res); } diff --git a/sql/semisync_slave.h b/sql/semisync_slave.h index 6bc10b0d479..0df4445ee4a 100644 --- a/sql/semisync_slave.h +++ b/sql/semisync_slave.h @@ -19,6 +19,10 @@ #define SEMISYNC_SLAVE_H #include "semisync.h" +#include "my_global.h" +#include "sql_priv.h" +#include "rpl_mi.h" +#include "mysql.h" /** The extension class for the slave of semi-synchronous replication @@ -44,49 +48,54 @@ public: return slave_enabled_; } void setSlaveEnabled(bool enabled) { - run_hooks_enabled|= enabled; slave_enabled_ = enabled; } + + bool isDelayMaster(){ + return delay_master_; + } + + void setDelayMaster(bool enabled) { + delay_master_ = enabled; + } + + void setKillConnTimeout(unsigned int timeout) { + kill_conn_timeout_ = timeout; + } /* A slave reads the semi-sync packet header and separate the metadata * from the payload data. - * + * * Input: * header - (IN) packet header pointer * total_len - (IN) total packet length: metadata + payload - * need_reply - (IN) whether the master is waiting for the reply + * semi_flags - (IN) store flags: SEMI_SYNC_SLAVE_DELAY_SYNC and SEMI_SYNC_NEED_ACK * payload - (IN) payload: the replication event * payload_len - (IN) payload length * * Return: * 0: success; non-zero: error */ - int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply, + int slaveReadSyncHeader(const char *header, unsigned long total_len, int *semi_flags, const char **payload, unsigned long *payload_len); /* A slave replies to the master indicating its replication process. It * indicates that the slave has received all events before the specified * binlog position. - * - * Input: - * mysql - (IN) the mysql network connection - * binlog_filename - (IN) the reply point's binlog file name - * binlog_filepos - (IN) the reply point's binlog file offset - * - * Return: - * 0: success; non-zero: error */ - int slaveReply(MYSQL *mysql, const char *binlog_filename, - my_off_t binlog_filepos); - - int slaveStart(Binlog_relay_IO_param *param); - int slaveStop(Binlog_relay_IO_param *param); + int slaveReply(Master_info* mi); + int slaveStart(Master_info *mi); + int slaveStop(Master_info *mi); + int requestTransmit(Master_info*); + void killConnection(MYSQL *mysql); + int resetSlave(Master_info *mi); private: /* True when initObject has been called */ bool init_done_; bool slave_enabled_; /* semi-sycn is enabled on the slave */ - MYSQL *mysql_reply; /* connection to send reply */ + bool delay_master_; + unsigned int kill_conn_timeout_; }; @@ -96,7 +105,8 @@ extern my_bool rpl_semi_sync_slave_status; extern ulong rpl_semi_sync_slave_trace_level; extern ReplSemiSyncSlave repl_semisync_slave; -int semi_sync_slave_init(); -void semi_sync_slave_deinit(); +extern char rpl_semi_sync_slave_delay_master; +extern unsigned int rpl_semi_sync_slave_kill_conn_timeout; +extern unsigned long long rpl_semi_sync_slave_send_ack; #endif /* SEMISYNC_SLAVE_H */ diff --git a/sql/slave.cc b/sql/slave.cc index a8334525345..b4a817d6ecd 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -61,6 +61,7 @@ #include "debug_sync.h" #include "rpl_parallel.h" #include "sql_show.h" +#include "semisync_slave.h" #define FLAGSTR(V,F) ((V)&(F)?#F" ":"") @@ -3590,7 +3591,9 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi, before_request_transmit, (thd, mi, binlog_flags))) DBUG_RETURN(1); - + if (repl_semisync_slave.requestTransmit(mi)) + DBUG_RETURN(1); + // TODO if big log files: Change next to int8store() int4store(buf, (ulong) mi->master_log_pos); int2store(buf + 4, binlog_flags); @@ -4615,7 +4618,9 @@ pthread_handler_t handle_slave_io(void *arg) } - if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi))) + if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)) || + (DBUG_EVALUATE_IF("failed_slave_start", 1, 0) + || repl_semisync_slave.slaveStart(mi))) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, ER_THD(thd, ER_SLAVE_FATAL_ERROR), @@ -4805,9 +4810,13 @@ Stopping slave I/O thread due to out-of-memory error from master"); retry_count=0; // ok event, reset retry counter THD_STAGE_INFO(thd, stage_queueing_master_event_to_the_relay_log); event_buf= (const char*)mysql->net.read_pos + 1; + mi->semi_ack= 0; if (RUN_HOOK(binlog_relay_io, after_read_event, (thd, mi,(const char*)mysql->net.read_pos + 1, - event_len, &event_buf, &event_len))) + event_len, &event_buf, &event_len)) || + repl_semisync_slave. + slaveReadSyncHeader((const char*)mysql->net.read_pos + 1, event_len, + &(mi->semi_ack), &event_buf, &event_len)) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, ER_THD(thd, ER_SLAVE_FATAL_ERROR), @@ -4868,7 +4877,10 @@ Stopping slave I/O thread due to out-of-memory error from master"); } if (RUN_HOOK(binlog_relay_io, after_queue_event, - (thd, mi, event_buf, event_len, synced))) + (thd, mi, event_buf, event_len, synced)) || + (rpl_semi_sync_slave_status && + (mi->semi_ack & SEMI_SYNC_NEED_ACK) && + repl_semisync_slave.slaveReply(mi))) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, ER_THD(thd, ER_SLAVE_FATAL_ERROR), @@ -4877,7 +4889,16 @@ Stopping slave I/O thread due to out-of-memory error from master"); } if (mi->using_gtid == Master_info::USE_GTID_NO && - flush_master_info(mi, TRUE, TRUE)) + /* + If rpl_semi_sync_slave_delay_master is enabled, we will flush + master info only when ack is needed. This may lead to at least one + group transaction delay but affords better performance improvement. + */ + (!repl_semisync_slave.getSlaveEnabled() || + (!(mi->semi_ack & SEMI_SYNC_SLAVE_DELAY_SYNC) || + (mi->semi_ack & (SEMI_SYNC_NEED_ACK)))) && + (DBUG_EVALUATE_IF("failed_flush_master_info", 1, 0) || + flush_master_info(mi, TRUE, TRUE))) { sql_print_error("Failed to flush master info file"); goto err; @@ -4931,7 +4952,8 @@ err: IO_RPL_LOG_NAME, mi->master_log_pos, tmp.c_ptr_safe()); } - RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi)); + (void) RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi)); + repl_semisync_slave.slaveStop(mi); thd->reset_query(); thd->reset_db(NULL, 0); if (mysql) diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 35ed9be74f9..d0aa0818e99 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -706,7 +706,7 @@ extern "C" void thd_kill_timeout(THD* thd) } -THD::THD(my_thread_id id, bool is_wsrep_applier) +THD::THD(my_thread_id id, bool is_wsrep_applier, bool skip_global_sys_var_lock) :Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION, /* statement id */ 0), rli_fake(0), rgi_fake(0), rgi_slave(NULL), @@ -893,7 +893,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) /* Call to init() below requires fully initialized Open_tables_state. */ reset_open_tables_state(this); - init(); + init(skip_global_sys_var_lock); #if defined(ENABLED_PROFILING) profiling.set_thd(this); #endif @@ -1264,10 +1264,11 @@ const Type_handler *THD::type_handler_for_date() const Init common variables that has to be reset on start and on change_user */ -void THD::init(void) +void THD::init(bool skip_lock) { DBUG_ENTER("thd::init"); - mysql_mutex_lock(&LOCK_global_system_variables); + if (!skip_lock) + mysql_mutex_lock(&LOCK_global_system_variables); plugin_thdvar_init(this); /* plugin_thd_var_init() sets variables= global_system_variables, which @@ -1280,8 +1281,8 @@ void THD::init(void) ::strmake(default_master_connection_buff, global_system_variables.default_master_connection.str, variables.default_master_connection.length); - - mysql_mutex_unlock(&LOCK_global_system_variables); + if (!skip_lock) + mysql_mutex_unlock(&LOCK_global_system_variables); user_time.val= start_time= start_time_sec_part= 0; @@ -4193,7 +4194,8 @@ my_bool thd_net_is_killed() void thd_increment_bytes_received(void *thd, ulong length) { - ((THD*) thd)->status_var.bytes_received+= length; + if (thd != NULL) // MDEV-13073 Ack collector having NULL + ((THD*) thd)->status_var.bytes_received+= length; } diff --git a/sql/sql_class.h b/sql/sql_class.h index 4249bc6bb5b..66c490c6acf 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -1569,7 +1569,8 @@ enum enum_thread_type SYSTEM_THREAD_EVENT_WORKER= 16, SYSTEM_THREAD_BINLOG_BACKGROUND= 32, SYSTEM_THREAD_SLAVE_BACKGROUND= 64, - SYSTEM_THREAD_GENERIC= 128 + SYSTEM_THREAD_GENERIC= 128, + SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND= 256 }; inline char const * @@ -1585,6 +1586,7 @@ show_system_thread(enum_thread_type thread) RETURN_NAME_AS_STRING(SYSTEM_THREAD_EVENT_SCHEDULER); RETURN_NAME_AS_STRING(SYSTEM_THREAD_EVENT_WORKER); RETURN_NAME_AS_STRING(SYSTEM_THREAD_SLAVE_BACKGROUND); + RETURN_NAME_AS_STRING(SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND); default: sprintf(buf, "<UNKNOWN SYSTEM THREAD: %d>", thread); return buf; @@ -2261,7 +2263,8 @@ public: /* Needed by MariaDB semi sync replication */ Trans_binlog_info *semisync_info; - + /* If this is a semisync slave connection. */ + bool semi_sync_slave; ulonglong client_capabilities; /* What the client supports */ ulong max_client_packet_length; @@ -3147,11 +3150,20 @@ public: /* Debug Sync facility. See debug_sync.cc. */ struct st_debug_sync_control *debug_sync_control; #endif /* defined(ENABLED_DEBUG_SYNC) */ - THD(my_thread_id id, bool is_wsrep_applier= false); + /** + @param id thread identifier + @param is_wsrep_applier thread type + @param skip_lock instruct whether @c LOCK_global_system_variables + is already locked, to not acquire it then. + */ + THD(my_thread_id id, bool is_wsrep_applier= false, bool skip_lock= false); ~THD(); - - void init(void); + /** + @param skip_lock instruct whether @c LOCK_global_system_variables + is already locked, to not acquire it then. + */ + void init(bool skip_lock= false); /* Initialize memory roots necessary for query processing and (!) pre-allocate memory for it. We can't do that in THD constructor because diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index fcbf0ce1bd0..36fa6120584 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -30,7 +30,8 @@ #include <my_dir.h> #include "rpl_handler.h" #include "debug_sync.h" -#include "log.h" // get_gtid_list_event +#include "semisync_master.h" +#include "semisync_slave.h" enum enum_gtid_until_state { GTID_UNTIL_NOT_DONE, @@ -160,6 +161,7 @@ struct binlog_send_info { bool clear_initial_log_pos; bool should_stop; + size_t dirlen; binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg, char *lfn) @@ -315,14 +317,30 @@ static int reset_transmit_packet(binlog_send_info *info, ushort flags, if (RUN_HOOK(binlog_transmit, reserve_header, (info->thd, flags, packet))) { + /* RUN_HOOK() must return zero when thd->semi_sync_slave */ + DBUG_ASSERT(!info->thd->semi_sync_slave); + info->error= ER_UNKNOWN_ERROR; *errmsg= "Failed to run hook 'reserve_header'"; ret= 1; } + if (info->thd->semi_sync_slave) + { + repl_semisync_master.reserveSyncHeader(packet); + } + *ev_offset= packet->length(); return ret; } +inline bool is_semi_sync_slave() +{ + int null_value; + long long val= 0; + get_user_var_int("rpl_semi_sync_slave", &val, &null_value); + return val; +} + static int send_file(THD *thd) { NET* net = &thd->net; @@ -875,73 +893,6 @@ get_binlog_list(MEM_ROOT *memroot) DBUG_RETURN(current_list); } -/* - Find the Gtid_list_log_event at the start of a binlog. - - NULL for ok, non-NULL error message for error. - - If ok, then the event is returned in *out_gtid_list. This can be NULL if we - get back to binlogs written by old server version without GTID support. If - so, it means we have reached the point to start from, as no GTID events can - exist in earlier binlogs. -*/ - -static const char * -get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list) -{ - Format_description_log_event init_fdle(BINLOG_VERSION); - Format_description_log_event *fdle; - Log_event *ev; - const char *errormsg = NULL; - - *out_gtid_list= NULL; - - if (!(ev= Log_event::read_log_event(cache, &init_fdle, - opt_master_verify_checksum)) || - ev->get_type_code() != FORMAT_DESCRIPTION_EVENT) - { - if (ev) - delete ev; - return "Could not read format description log event while looking for " - "GTID position in binlog"; - } - - fdle= static_cast<Format_description_log_event *>(ev); - - for (;;) - { - Log_event_type typ; - - ev= Log_event::read_log_event(cache, fdle, opt_master_verify_checksum); - if (!ev) - { - errormsg= "Could not read GTID list event while looking for GTID " - "position in binlog"; - break; - } - typ= ev->get_type_code(); - if (typ == GTID_LIST_EVENT) - break; /* Done, found it */ - if (typ == START_ENCRYPTION_EVENT) - { - if (fdle->start_decryption((Start_encryption_log_event*) ev)) - errormsg= "Could not set up decryption for binlog."; - } - delete ev; - if (typ == ROTATE_EVENT || typ == STOP_EVENT || - typ == FORMAT_DESCRIPTION_EVENT || typ == START_ENCRYPTION_EVENT) - continue; /* Continue looking */ - - /* We did not find any Gtid_list_log_event, must be old binlog. */ - ev= NULL; - break; - } - - delete fdle; - *out_gtid_list= static_cast<Gtid_list_log_event *>(ev); - return errormsg; -} - /* Check if every GTID requested by the slave is contained in this (or a later) @@ -1673,6 +1624,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, enum enum_binlog_checksum_alg current_checksum_alg= info->current_checksum_alg; slave_connection_state *gtid_state= &info->gtid_state; slave_connection_state *until_gtid_state= info->until_gtid_state; + bool need_sync= false; if (event_type == GTID_LIST_EVENT && info->using_gtid_state && until_gtid_state) @@ -1984,7 +1936,10 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, pos= my_b_tell(log); if (RUN_HOOK(binlog_transmit, before_send_event, - (info->thd, info->flags, packet, info->log_file_name, pos))) + (info->thd, info->flags, packet, info->log_file_name, pos)) || + repl_semisync_master.updateSyncHeader(info->thd, (uchar *)packet->c_ptr(), + info->log_file_name + info->dirlen, + pos, &need_sync)) { info->error= ER_UNKNOWN_ERROR; return "run 'before_send_event' hook failed"; @@ -2012,6 +1967,8 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, info->error= ER_UNKNOWN_ERROR; return "Failed to run hook 'after_send_event'"; } + if (need_sync) + repl_semisync_master.flushNet(info->thd, packet->c_ptr()); return NULL; /* Success */ } @@ -2748,7 +2705,7 @@ static int send_one_binlog_file(binlog_send_info *info, /** end of file or error */ return (int)end_pos; } - + info->dirlen= dirname_length(info->log_file_name); /** * send events from current position up to end_pos */ @@ -2770,6 +2727,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, binlog_send_info infoobj(thd, packet, flags, linfo.log_file_name); binlog_send_info *info= &infoobj; + bool has_transmit_started= false; int old_max_allowed_packet= thd->variables.max_allowed_packet; thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET; @@ -2792,6 +2750,11 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, info->error= ER_UNKNOWN_ERROR; goto err; } + has_transmit_started= true; + + /* Check if the dump thread is created by a slave with semisync enabled. */ + thd->semi_sync_slave = is_semi_sync_slave(); + repl_semisync_master.dump_start(thd, log_ident, pos); /* heartbeat_period from @master_heartbeat_period user variable @@ -2908,7 +2871,11 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, err: THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination); - RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); + (void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); + if (has_transmit_started) + { + repl_semisync_master.dump_end(thd); + } if (info->thd->killed == KILL_SLAVE_SAME_ID) { @@ -3374,7 +3341,9 @@ int reset_slave(THD *thd, Master_info* mi) else if (global_system_variables.log_warnings > 1) sql_print_information("Deleted Master_info file '%s'.", fname); - RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi)); + (void) RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi)); + if (rpl_semi_sync_slave_enabled) + repl_semisync_slave.resetSlave(mi); err: mi->unlock_slave_threads(); if (error) @@ -3876,11 +3845,14 @@ int reset_master(THD* thd, rpl_gtid *init_state, uint32 init_state_len, return 1; } - if (mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len, - next_log_number)) - return 1; - RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */)); - return 0; + bool ret= 0; + /* Temporarily disable master semisync before reseting master. */ + repl_semisync_master.beforeResetMaster(); + ret= mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len, + next_log_number); + (void) RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */)); + repl_semisync_master.afterResetMaster(); + return ret; } diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index e897b6c21ce..9eac6017c7c 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -3051,22 +3051,18 @@ static bool fix_rpl_semi_sync_master_enabled(sys_var *self, THD *thd, { if (repl_semisync_master.enableMaster() != 0) rpl_semi_sync_master_enabled= false; -#ifdef HAVE_ACC_RECEIVER else if (ack_receiver.start()) { repl_semisync_master.disableMaster(); rpl_semi_sync_master_enabled= false; } -#endif } else { if (repl_semisync_master.disableMaster() != 0) rpl_semi_sync_master_enabled= true; -#ifdef HAVE_ACC_RECEIVER if (!rpl_semi_sync_master_enabled) ack_receiver.stop(); -#endif } return false; } @@ -3082,27 +3078,21 @@ static bool fix_rpl_semi_sync_master_trace_level(sys_var *self, THD *thd, enum_var_type type) { repl_semisync_master.setTraceLevel(rpl_semi_sync_master_trace_level); -#ifdef HAVE_ACC_RECEIVER ack_receiver.setTraceLevel(rpl_semi_sync_master_trace_level); -#endif return false; } static bool fix_rpl_semi_sync_master_wait_point(sys_var *self, THD *thd, enum_var_type type) { -#ifdef HAVE_ACC_RECEIVER repl_semisync_master.setWaitPoint(rpl_semi_sync_master_wait_point); -#endif return false; } static bool fix_rpl_semi_sync_master_wait_no_slave(sys_var *self, THD *thd, enum_var_type type) { -#ifdef HAVE_ACC_RECEIVER repl_semisync_master.checkAndSwitch(); -#endif return false; } @@ -3168,7 +3158,6 @@ static bool fix_rpl_semi_sync_slave_trace_level(sys_var *self, THD *thd, return false; } -#ifdef HAVE_ACC_RECEIVER static bool fix_rpl_semi_sync_slave_delay_master(sys_var *self, THD *thd, enum_var_type type) { @@ -3182,8 +3171,6 @@ static bool fix_rpl_semi_sync_slave_kill_conn_timeout(sys_var *self, THD *thd, repl_semisync_slave.setKillConnTimeout(rpl_semi_sync_slave_kill_conn_timeout); return false; } -#endif - static Sys_var_mybool Sys_semisync_slave_enabled( "rpl_semi_sync_slave_enabled", @@ -3202,7 +3189,6 @@ static Sys_var_ulong Sys_semisync_slave_trace_level( NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), ON_UPDATE(fix_rpl_semi_sync_slave_trace_level)); -#ifdef HAVE_ACC_RECEIVER static Sys_var_mybool Sys_semisync_slave_delay_master( "rpl_semi_sync_slave_delay_master", "Only write master info file when ack is needed.", @@ -3221,7 +3207,6 @@ static Sys_var_uint Sys_semisync_slave_kill_conn_timeout( VALID_RANGE(0, UINT_MAX), DEFAULT(5), BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), ON_UPDATE(fix_rpl_semi_sync_slave_kill_conn_timeout)); -#endif #endif /* HAVE_REPLICATION */ static Sys_var_ulong Sys_slow_launch_time( diff --git a/sql/transaction.cc b/sql/transaction.cc index cbd875e3114..f6ccf5a1930 100644 --- a/sql/transaction.cc +++ b/sql/transaction.cc @@ -24,7 +24,7 @@ #include "rpl_handler.h" #include "debug_sync.h" // DEBUG_SYNC #include "sql_acl.h" - +#include "semisync_master.h" #ifndef EMBEDDED_LIBRARY /** @@ -318,9 +318,19 @@ bool trans_commit(THD *thd) transaction, so the hooks for rollback will be called. */ if (res) + { (void) RUN_HOOK(transaction, after_rollback, (thd, FALSE)); +#ifdef HAVE_REPLICATION + repl_semisync_master.waitAfterRollback(thd, FALSE); +#endif + } else + { (void) RUN_HOOK(transaction, after_commit, (thd, FALSE)); +#ifdef HAVE_REPLICATION + repl_semisync_master.waitAfterCommit(thd, FALSE); +#endif + } thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); thd->transaction.all.reset(); thd->lex->start_transaction_opt= 0; @@ -414,6 +424,9 @@ bool trans_rollback(THD *thd) DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); res= ha_rollback_trans(thd, TRUE); (void) RUN_HOOK(transaction, after_rollback, (thd, FALSE)); +#ifdef HAVE_REPLICATION + repl_semisync_master.waitAfterRollback(thd, FALSE); +#endif thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); /* Reset the binlog transaction marker */ thd->variables.option_bits&= ~OPTION_GTID_BEGIN; @@ -526,9 +539,19 @@ bool trans_commit_stmt(THD *thd) transaction, so the hooks for rollback will be called. */ if (res) + { (void) RUN_HOOK(transaction, after_rollback, (thd, FALSE)); +#ifdef HAVE_REPLICATION + repl_semisync_master.waitAfterRollback(thd, FALSE); +#endif + } else + { (void) RUN_HOOK(transaction, after_commit, (thd, FALSE)); +#ifdef HAVE_REPLICATION + repl_semisync_master.waitAfterCommit(thd, FALSE); +#endif + } thd->transaction.stmt.reset(); @@ -568,6 +591,9 @@ bool trans_rollback_stmt(THD *thd) } (void) RUN_HOOK(transaction, after_rollback, (thd, FALSE)); +#ifdef HAVE_REPLICATION + repl_semisync_master.waitAfterRollback(thd, FALSE); +#endif thd->transaction.stmt.reset(); |