summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/extra/rpl_tests/rpl_semi_sync.inc1
-rw-r--r--mysql-test/suite/binlog_encryption/rpl_semi_sync.result6
-rw-r--r--mysql-test/suite/perfschema/r/dml_setup_instruments.result4
-rw-r--r--mysql-test/suite/perfschema/r/relaylog.result2
-rw-r--r--mysql-test/suite/rpl/r/rpl_semi_sync.result6
-rw-r--r--mysql-test/suite/rpl/r/rpl_semi_sync_event.result1
-rw-r--r--mysql-test/suite/rpl/r/rpl_semi_sync_event_after_sync.result1
-rw-r--r--mysql-test/suite/rpl/t/rpl_semi_sync_event.test2
-rw-r--r--mysql-test/suite/sys_vars/r/rpl_semi_sync_master_enabled_basic.result6
-rw-r--r--mysql-test/suite/sys_vars/t/rpl_semi_sync_master_enabled_basic.test29
-rw-r--r--sql/CMakeLists.txt1
-rw-r--r--sql/handler.cc8
-rw-r--r--sql/log.cc80
-rw-r--r--sql/log.h11
-rw-r--r--sql/mysqld.cc33
-rw-r--r--sql/mysqld.h1
-rw-r--r--sql/rpl_handler.h4
-rw-r--r--sql/rpl_mi.h5
-rw-r--r--sql/rpl_rli.cc3
-rw-r--r--sql/semisync.h16
-rw-r--r--sql/semisync_master.cc584
-rw-r--r--sql/semisync_master.h84
-rw-r--r--sql/semisync_master_ack_receiver.cc308
-rw-r--r--sql/semisync_master_ack_receiver.h119
-rw-r--r--sql/semisync_slave.cc278
-rw-r--r--sql/semisync_slave.h50
-rw-r--r--sql/slave.cc34
-rw-r--r--sql/sql_class.cc16
-rw-r--r--sql/sql_class.h22
-rw-r--r--sql/sql_repl.cc126
-rw-r--r--sql/sys_vars.cc15
-rw-r--r--sql/transaction.cc28
32 files changed, 1193 insertions, 691 deletions
diff --git a/mysql-test/extra/rpl_tests/rpl_semi_sync.inc b/mysql-test/extra/rpl_tests/rpl_semi_sync.inc
index ed56e405e27..393b49372e1 100644
--- a/mysql-test/extra/rpl_tests/rpl_semi_sync.inc
+++ b/mysql-test/extra/rpl_tests/rpl_semi_sync.inc
@@ -16,6 +16,7 @@ connection master;
call mtr.add_suppression("Timeout waiting for reply of binlog");
call mtr.add_suppression("Read semi-sync reply");
call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT.");
+call mtr.add_suppression("mysqld: Got an error reading communication packets");
connection slave;
call mtr.add_suppression("Master server does not support semi-sync");
call mtr.add_suppression("Semi-sync slave .* reply");
diff --git a/mysql-test/suite/binlog_encryption/rpl_semi_sync.result b/mysql-test/suite/binlog_encryption/rpl_semi_sync.result
index 6d574681d73..106efb555d3 100644
--- a/mysql-test/suite/binlog_encryption/rpl_semi_sync.result
+++ b/mysql-test/suite/binlog_encryption/rpl_semi_sync.result
@@ -4,6 +4,7 @@ connection master;
call mtr.add_suppression("Timeout waiting for reply of binlog");
call mtr.add_suppression("Read semi-sync reply");
call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT.");
+call mtr.add_suppression("mysqld: Got an error reading communication packets");
connection slave;
call mtr.add_suppression("Master server does not support semi-sync");
call mtr.add_suppression("Semi-sync slave .* reply");
@@ -176,7 +177,7 @@ Variable_name Value
Rpl_semi_sync_master_yes_tx 14
show status like 'Rpl_semi_sync_master_clients';
Variable_name Value
-Rpl_semi_sync_master_clients 1
+Rpl_semi_sync_master_clients 0
[ semi-sync replication of these transactions will fail ]
insert into t1 values (500);
[ master status should be OFF ]
@@ -321,7 +322,6 @@ connection slave;
include/stop_slave.inc
reset slave;
connection master;
-kill query _tid;
connection slave;
include/start_slave.inc
connection master;
@@ -353,7 +353,6 @@ include/stop_slave.inc
reset slave;
connection master;
reset master;
-kill query _tid;
set sql_log_bin=0;
grant replication slave on *.* to rpl@127.0.0.1 identified by 'rpl_password';
flush privileges;
@@ -404,7 +403,6 @@ SHOW STATUS LIKE 'Rpl_semi_sync_slave_status';
Variable_name Value
Rpl_semi_sync_slave_status OFF
connection master;
-kill query _tid;
[ Semi-sync status on master should be ON ]
show status like 'Rpl_semi_sync_master_clients';
Variable_name Value
diff --git a/mysql-test/suite/perfschema/r/dml_setup_instruments.result b/mysql-test/suite/perfschema/r/dml_setup_instruments.result
index 940839b105f..3f4d6107717 100644
--- a/mysql-test/suite/perfschema/r/dml_setup_instruments.result
+++ b/mysql-test/suite/perfschema/r/dml_setup_instruments.result
@@ -4,6 +4,7 @@ where name like 'Wait/Synch/Mutex/sql/%'
and name not in ('wait/synch/mutex/sql/DEBUG_SYNC::mutex')
order by name limit 10;
NAME ENABLED TIMED
+wait/synch/mutex/sql/Ack_receiver::m_mutex YES YES
wait/synch/mutex/sql/Cversion_lock YES YES
wait/synch/mutex/sql/Delayed_insert::mutex YES YES
wait/synch/mutex/sql/Event_scheduler::LOCK_scheduler_state YES YES
@@ -13,7 +14,6 @@ wait/synch/mutex/sql/HA_DATA_PARTITION::LOCK_auto_inc YES YES
wait/synch/mutex/sql/LOCK_active_mi YES YES
wait/synch/mutex/sql/LOCK_after_binlog_sync YES YES
wait/synch/mutex/sql/LOCK_audit_mask YES YES
-wait/synch/mutex/sql/LOCK_binlog YES YES
select * from performance_schema.setup_instruments
where name like 'Wait/Synch/Rwlock/sql/%'
and name not in ('wait/synch/rwlock/sql/CRYPTO_dynlock_value::lock')
@@ -36,6 +36,7 @@ where name like 'Wait/Synch/Cond/sql/%'
'wait/synch/cond/sql/DEBUG_SYNC::cond')
order by name limit 10;
NAME ENABLED TIMED
+wait/synch/cond/sql/Ack_receiver::m_cond YES YES
wait/synch/cond/sql/COND_binlog_send YES YES
wait/synch/cond/sql/COND_flush_thread_cache YES YES
wait/synch/cond/sql/COND_group_commit_orderer YES YES
@@ -45,7 +46,6 @@ wait/synch/cond/sql/COND_parallel_entry YES YES
wait/synch/cond/sql/COND_prepare_ordered YES YES
wait/synch/cond/sql/COND_queue_state YES YES
wait/synch/cond/sql/COND_rpl_thread YES YES
-wait/synch/cond/sql/COND_rpl_thread_pool YES YES
select * from performance_schema.setup_instruments
where name='Wait';
select * from performance_schema.setup_instruments
diff --git a/mysql-test/suite/perfschema/r/relaylog.result b/mysql-test/suite/perfschema/r/relaylog.result
index b8655a781eb..3fcf7367b53 100644
--- a/mysql-test/suite/perfschema/r/relaylog.result
+++ b/mysql-test/suite/perfschema/r/relaylog.result
@@ -86,6 +86,7 @@ EVENT_NAME COUNT_STAR SUM_TIMER_WAIT MIN_TIMER_WAIT AVG_TIMER_WAIT MAX_TIMER_WAI
wait/synch/cond/sql/MYSQL_RELAY_LOG::COND_bin_log_updated 0 0 0 0 0
wait/synch/cond/sql/MYSQL_RELAY_LOG::COND_queue_busy 0 0 0 0 0
wait/synch/cond/sql/MYSQL_RELAY_LOG::COND_relay_log_updated 0 0 0 0 0
+wait/synch/mutex/sql/MYSQL_RELAY_LOG::LOCK_binlog_end_pos 0 0 0 0 0
wait/synch/mutex/sql/MYSQL_RELAY_LOG::LOCK_index 0 0 0 0 0
connection slave;
"============ Performance schema on slave ============"
@@ -193,5 +194,6 @@ EVENT_NAME COUNT_STAR
wait/synch/cond/sql/MYSQL_RELAY_LOG::COND_bin_log_updated NONE
wait/synch/cond/sql/MYSQL_RELAY_LOG::COND_queue_busy NONE
wait/synch/cond/sql/MYSQL_RELAY_LOG::COND_relay_log_updated MANY
+wait/synch/mutex/sql/MYSQL_RELAY_LOG::LOCK_binlog_end_pos NONE
wait/synch/mutex/sql/MYSQL_RELAY_LOG::LOCK_index MANY
include/stop_slave.inc
diff --git a/mysql-test/suite/rpl/r/rpl_semi_sync.result b/mysql-test/suite/rpl/r/rpl_semi_sync.result
index 6d574681d73..106efb555d3 100644
--- a/mysql-test/suite/rpl/r/rpl_semi_sync.result
+++ b/mysql-test/suite/rpl/r/rpl_semi_sync.result
@@ -4,6 +4,7 @@ connection master;
call mtr.add_suppression("Timeout waiting for reply of binlog");
call mtr.add_suppression("Read semi-sync reply");
call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT.");
+call mtr.add_suppression("mysqld: Got an error reading communication packets");
connection slave;
call mtr.add_suppression("Master server does not support semi-sync");
call mtr.add_suppression("Semi-sync slave .* reply");
@@ -176,7 +177,7 @@ Variable_name Value
Rpl_semi_sync_master_yes_tx 14
show status like 'Rpl_semi_sync_master_clients';
Variable_name Value
-Rpl_semi_sync_master_clients 1
+Rpl_semi_sync_master_clients 0
[ semi-sync replication of these transactions will fail ]
insert into t1 values (500);
[ master status should be OFF ]
@@ -321,7 +322,6 @@ connection slave;
include/stop_slave.inc
reset slave;
connection master;
-kill query _tid;
connection slave;
include/start_slave.inc
connection master;
@@ -353,7 +353,6 @@ include/stop_slave.inc
reset slave;
connection master;
reset master;
-kill query _tid;
set sql_log_bin=0;
grant replication slave on *.* to rpl@127.0.0.1 identified by 'rpl_password';
flush privileges;
@@ -404,7 +403,6 @@ SHOW STATUS LIKE 'Rpl_semi_sync_slave_status';
Variable_name Value
Rpl_semi_sync_slave_status OFF
connection master;
-kill query _tid;
[ Semi-sync status on master should be ON ]
show status like 'Rpl_semi_sync_master_clients';
Variable_name Value
diff --git a/mysql-test/suite/rpl/r/rpl_semi_sync_event.result b/mysql-test/suite/rpl/r/rpl_semi_sync_event.result
index c347ff410ac..917e7c2b02b 100644
--- a/mysql-test/suite/rpl/r/rpl_semi_sync_event.result
+++ b/mysql-test/suite/rpl/r/rpl_semi_sync_event.result
@@ -5,6 +5,7 @@ call mtr.add_suppression("Timeout waiting for reply of binlog");
call mtr.add_suppression("Semi-sync master .* waiting for slave reply");
call mtr.add_suppression("Read semi-sync reply");
call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT.");
+call mtr.add_suppression("mysqld: Got an error reading communication packets");
connection slave;
call mtr.add_suppression("Master server does not support semi-sync");
call mtr.add_suppression("Semi-sync slave .* reply");
diff --git a/mysql-test/suite/rpl/r/rpl_semi_sync_event_after_sync.result b/mysql-test/suite/rpl/r/rpl_semi_sync_event_after_sync.result
index c237eb8df47..24daf0d72b5 100644
--- a/mysql-test/suite/rpl/r/rpl_semi_sync_event_after_sync.result
+++ b/mysql-test/suite/rpl/r/rpl_semi_sync_event_after_sync.result
@@ -6,6 +6,7 @@ call mtr.add_suppression("Timeout waiting for reply of binlog");
call mtr.add_suppression("Semi-sync master .* waiting for slave reply");
call mtr.add_suppression("Read semi-sync reply");
call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT.");
+call mtr.add_suppression("mysqld: Got an error reading communication packets");
connection slave;
call mtr.add_suppression("Master server does not support semi-sync");
call mtr.add_suppression("Semi-sync slave .* reply");
diff --git a/mysql-test/suite/rpl/t/rpl_semi_sync_event.test b/mysql-test/suite/rpl/t/rpl_semi_sync_event.test
index d7685413a07..4d96fd694ec 100644
--- a/mysql-test/suite/rpl/t/rpl_semi_sync_event.test
+++ b/mysql-test/suite/rpl/t/rpl_semi_sync_event.test
@@ -10,6 +10,8 @@ call mtr.add_suppression("Timeout waiting for reply of binlog");
call mtr.add_suppression("Semi-sync master .* waiting for slave reply");
call mtr.add_suppression("Read semi-sync reply");
call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT.");
+call mtr.add_suppression("mysqld: Got an error reading communication packets");
+
connection slave;
call mtr.add_suppression("Master server does not support semi-sync");
call mtr.add_suppression("Semi-sync slave .* reply");
diff --git a/mysql-test/suite/sys_vars/r/rpl_semi_sync_master_enabled_basic.result b/mysql-test/suite/sys_vars/r/rpl_semi_sync_master_enabled_basic.result
index 7454f0b0089..0c90462df59 100644
--- a/mysql-test/suite/sys_vars/r/rpl_semi_sync_master_enabled_basic.result
+++ b/mysql-test/suite/sys_vars/r/rpl_semi_sync_master_enabled_basic.result
@@ -65,6 +65,12 @@ set global rpl_semi_sync_master_enabled=1e1;
ERROR 42000: Incorrect argument type to variable 'rpl_semi_sync_master_enabled'
set global rpl_semi_sync_master_enabled="some text";
ERROR 42000: Variable 'rpl_semi_sync_master_enabled' can't be set to the value of 'some text'
+connect con1,localhost,root,,;
+connect con2,localhost,root,,;
+disconnect con1;
+disconnect con2;
+connection default;
+SET @@global.rpl_semi_sync_master_enabled = 1;
SET @@global.rpl_semi_sync_master_enabled = @start_global_value;
select @@global.rpl_semi_sync_master_enabled;
@@global.rpl_semi_sync_master_enabled
diff --git a/mysql-test/suite/sys_vars/t/rpl_semi_sync_master_enabled_basic.test b/mysql-test/suite/sys_vars/t/rpl_semi_sync_master_enabled_basic.test
index da22d0535f4..68653d3a9a7 100644
--- a/mysql-test/suite/sys_vars/t/rpl_semi_sync_master_enabled_basic.test
+++ b/mysql-test/suite/sys_vars/t/rpl_semi_sync_master_enabled_basic.test
@@ -51,6 +51,35 @@ set global rpl_semi_sync_master_enabled=1e1;
--error ER_WRONG_VALUE_FOR_VAR
set global rpl_semi_sync_master_enabled="some text";
+#
+# Test conflicting concurrent setting
+#
+--let $val_saved= `SELECT @@global.rpl_semi_sync_master_enabled`
+connect (con1,localhost,root,,);
+connect (con2,localhost,root,,);
+--let $iter=100
+--disable_query_log
+while ($iter)
+{
+ --connection con1
+ --send_eval SET @@global.rpl_semi_sync_master_enabled = $iter % 2
+
+ --connection con2
+ --send_eval SET @@global.rpl_semi_sync_master_enabled = ($iter + 1) % 2
+
+ --connection con1
+ reap;
+ --connection con2
+ reap;
+
+ --dec $iter
+}
+--enable_query_log
+disconnect con1;
+disconnect con2;
+
+--connection default
+--eval SET @@global.rpl_semi_sync_master_enabled = $val_saved
#
# Cleanup
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt
index 6c63f3feca3..fadee80491b 100644
--- a/sql/CMakeLists.txt
+++ b/sql/CMakeLists.txt
@@ -139,6 +139,7 @@ SET (SQL_SOURCE
my_json_writer.cc
rpl_gtid.cc rpl_parallel.cc
semisync.cc semisync_master.cc semisync_slave.cc
+ semisync_master_ack_receiver.cc
sql_type.cc
item_windowfunc.cc sql_window.cc
sql_cte.cc
diff --git a/sql/handler.cc b/sql/handler.cc
index 47eb58e17f3..7ea02278abc 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -50,6 +50,7 @@
#ifdef WITH_ARIA_STORAGE_ENGINE
#include "../storage/maria/ha_maria.h"
#endif
+#include "semisync_master.h"
#include "wsrep_mysqld.h"
#include "wsrep.h"
@@ -1485,6 +1486,10 @@ done:
mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
(void) RUN_HOOK(transaction, after_commit, (thd, FALSE));
+#ifdef REPLICATION
+ repl_semisync_master.waitAfterCommit(thd, all);
+ DEBUG_SYNC(thd, "after_group_after_commit");
+#endif
goto end;
/* Come here if error and we need to rollback. */
@@ -1730,6 +1735,9 @@ int ha_rollback_trans(THD *thd, bool all)
ER_WARNING_NOT_COMPLETE_ROLLBACK,
ER_THD(thd, ER_WARNING_NOT_COMPLETE_ROLLBACK));
(void) RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+#ifdef REPLICATION
+ repl_semisync_master.waitAfterRollback(thd, all);
+#endif
DBUG_RETURN(error);
}
diff --git a/sql/log.cc b/sql/log.cc
index 9d4a622d400..a866d72d785 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -53,6 +53,7 @@
#include "debug_sync.h"
#include "sql_show.h"
#include "my_pthread.h"
+#include "semisync_master.h"
#include "wsrep_mysqld.h"
#include "sp_rcontext.h"
#include "sp_head.h"
@@ -3329,7 +3330,7 @@ void MYSQL_BIN_LOG::init_pthread_objects()
mysql_cond_init(key_BINLOG_COND_binlog_background_thread_end,
&COND_binlog_background_thread_end, 0);
- mysql_mutex_init(key_LOCK_binlog_end_pos, &LOCK_binlog_end_pos,
+ mysql_mutex_init(m_key_LOCK_binlog_end_pos, &LOCK_binlog_end_pos,
MY_MUTEX_INIT_SLOW);
}
@@ -5250,6 +5251,7 @@ end:
close(LOG_CLOSE_INDEX);
sql_print_error(fatal_log_error, new_name_ptr, errno);
}
+
mysql_mutex_unlock(&LOCK_index);
if (need_lock)
mysql_mutex_unlock(&LOCK_log);
@@ -6376,7 +6378,12 @@ err:
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
if ((error= RUN_HOOK(binlog_storage, after_flush,
(thd, log_file_name, file->pos_in_file,
- synced, true, true))))
+ synced, true, true)))
+#ifdef REPLICATION
+ || repl_semisync_master.reportBinlogUpdate(thd, log_file_name,
+ file->pos_in_file)
+#endif
+ )
{
sql_print_error("Failed to run 'after_flush' hooks");
error= 1;
@@ -6408,7 +6415,12 @@ err:
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
if (RUN_HOOK(binlog_storage, after_sync,
(thd, log_file_name, file->pos_in_file,
- true, true)))
+ true, true))
+#ifdef REPLICATION
+ || repl_semisync_master.waitAfterSync(log_file_name,
+ file->pos_in_file)
+#endif
+ )
{
error=1;
/* error is already printed inside hook */
@@ -7589,7 +7601,11 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
else if (is_leader)
trx_group_commit_leader(entry);
else if (!entry->queued_by_other)
+ {
+ DEBUG_SYNC(entry->thd, "after_semisync_queue");
+
entry->thd->wait_for_wakeup_ready();
+ }
else
{
/*
@@ -7840,11 +7856,20 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
{
last= current->next == NULL;
if (!current->error &&
- RUN_HOOK(binlog_storage, after_flush,
- (current->thd,
- current->cache_mngr->last_commit_pos_file,
- current->cache_mngr->last_commit_pos_offset, synced,
- first, last)))
+ (RUN_HOOK(binlog_storage, after_flush,
+ (current->thd,
+ current->cache_mngr->last_commit_pos_file,
+ current->cache_mngr->last_commit_pos_offset, synced,
+ first, last))
+#ifdef REPLICATION
+ || (DBUG_EVALUATE_IF("failed_report_binlog_update", 1, 0) ||
+ repl_semisync_master.
+ reportBinlogUpdate(current->thd,
+ current->cache_mngr->last_commit_pos_file,
+ current->cache_mngr->
+ last_commit_pos_offset))
+#endif
+ ))
{
current->error= ER_ERROR_ON_WRITE;
current->commit_errno= -1;
@@ -7927,12 +7952,22 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
{
last= current->next == NULL;
if (!current->error &&
- RUN_HOOK(binlog_storage, after_sync,
+ (RUN_HOOK(binlog_storage, after_sync,
(current->thd, current->cache_mngr->last_commit_pos_file,
current->cache_mngr->last_commit_pos_offset,
- first, last)))
+ first, last))
+#ifdef REPLICATION
+ || (DBUG_EVALUATE_IF("simulate_after_sync_hook_error", 1, 0) ||
+ repl_semisync_master.waitAfterSync(current->cache_mngr->
+ last_commit_pos_file,
+ current->cache_mngr->
+ last_commit_pos_offset))
+#endif
+ ))
{
- /* error is already printed inside hook */
+ const char *hook_name= rpl_semi_sync_master_enabled ?
+ "'waitAfterSync'" : "binlog_storage 'after_sync'";
+ sql_print_error("Failed to call '%s'", hook_name);
}
first= false;
}
@@ -8268,24 +8303,6 @@ void MYSQL_BIN_LOG::wait_for_update_relay_log(THD* thd)
LOCK_log is released by the caller.
*/
-int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd,
- const struct timespec *timeout)
-{
- int ret= 0;
- DBUG_ENTER("wait_for_update_bin_log");
-
- thd_wait_begin(thd, THD_WAIT_BINLOG);
- mysql_mutex_assert_owner(&LOCK_binlog_end_pos);
- if (!timeout)
- mysql_cond_wait(&COND_bin_log_updated, &LOCK_binlog_end_pos);
- else
- ret= mysql_cond_timedwait(&COND_bin_log_updated, &LOCK_binlog_end_pos,
- const_cast<struct timespec *>(timeout));
- thd_wait_end(thd);
- DBUG_RETURN(ret);
-}
-
-
int MYSQL_BIN_LOG::wait_for_update_binlog_end_pos(THD* thd,
struct timespec *timeout)
{
@@ -10427,7 +10444,7 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
*out_gtid_list= NULL;
- if (!(ev= Log_event::read_log_event(cache, 0, &init_fdle,
+ if (!(ev= Log_event::read_log_event(cache, &init_fdle,
opt_master_verify_checksum)) ||
ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
{
@@ -10443,7 +10460,7 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
{
Log_event_type typ;
- ev= Log_event::read_log_event(cache, 0, fdle, opt_master_verify_checksum);
+ ev= Log_event::read_log_event(cache, fdle, opt_master_verify_checksum);
if (!ev)
{
errormsg= "Could not read GTID list event while looking for GTID "
@@ -10473,6 +10490,7 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
return errormsg;
}
+
struct st_mysql_storage_engine binlog_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
diff --git a/sql/log.h b/sql/log.h
index 0a82f8813f0..02ace7c7921 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -349,6 +349,11 @@ public:
/* for documentation of mutexes held in various places in code */
};
+/* Tell the io thread if we can delay the master info sync. */
+#define SEMI_SYNC_SLAVE_DELAY_SYNC 1
+/* Tell the io thread if the current event needs a ack. */
+#define SEMI_SYNC_NEED_ACK 2
+
class MYSQL_QUERY_LOG: public MYSQL_LOG
{
public:
@@ -435,6 +440,8 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
PSI_file_key m_key_file_log_index;
PSI_file_key m_key_COND_queue_busy;
+ /** The instrumentation key to use for LOCK_binlog_end_pos. */
+ PSI_mutex_key m_key_LOCK_binlog_end_pos;
#endif
struct group_commit_entry
@@ -667,7 +674,8 @@ public:
PSI_cond_key key_bin_log_update,
PSI_file_key key_file_log,
PSI_file_key key_file_log_index,
- PSI_file_key key_COND_queue_busy)
+ PSI_file_key key_COND_queue_busy,
+ PSI_mutex_key key_LOCK_binlog_end_pos)
{
m_key_LOCK_index= key_LOCK_index;
m_key_relay_log_update= key_relay_log_update;
@@ -675,6 +683,7 @@ public:
m_key_file_log= key_file_log;
m_key_file_log_index= key_file_log_index;
m_key_COND_queue_busy= key_COND_queue_busy;
+ m_key_LOCK_binlog_end_pos= key_LOCK_binlog_end_pos;
}
#endif
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 71e0aeee473..86759b8ed8b 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -933,6 +933,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_LOCK_thread_count, key_LOCK_thread_cache,
key_PARTITION_LOCK_auto_inc;
PSI_mutex_key key_RELAYLOG_LOCK_index;
+PSI_mutex_key key_LOCK_relaylog_end_pos;
PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool, key_LOCK_parallel_entry;
PSI_mutex_key key_LOCK_binlog;
@@ -947,6 +948,8 @@ PSI_mutex_key key_LOCK_after_binlog_sync;
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered,
key_LOCK_slave_background;
PSI_mutex_key key_TABLE_SHARE_LOCK_share;
+PSI_mutex_key key_ss_mutex_LOCK_binlog_;
+PSI_mutex_key key_ss_mutex_Ack_receiver_mutex;
static PSI_mutex_info all_server_mutexes[]=
{
@@ -967,6 +970,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_BINLOG_LOCK_binlog_background_thread, "MYSQL_BIN_LOG::LOCK_binlog_background_thread", 0},
{ &key_LOCK_binlog_end_pos, "MYSQL_BIN_LOG::LOCK_binlog_end_pos", 0 },
{ &key_RELAYLOG_LOCK_index, "MYSQL_RELAY_LOG::LOCK_index", 0},
+ { &key_LOCK_relaylog_end_pos, "MYSQL_RELAY_LOG::LOCK_binlog_end_pos", 0},
{ &key_delayed_insert_mutex, "Delayed_insert::mutex", 0},
{ &key_hash_filo_lock, "hash_filo::lock", 0},
{ &key_LOCK_active_mi, "LOCK_active_mi", PSI_FLAG_GLOBAL},
@@ -1024,6 +1028,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_rpl_thread, "LOCK_rpl_thread", 0},
{ &key_LOCK_rpl_thread_pool, "LOCK_rpl_thread_pool", 0},
{ &key_LOCK_parallel_entry, "LOCK_parallel_entry", 0},
+ { &key_ss_mutex_Ack_receiver_mutex, "Ack_receiver::m_mutex", 0},
{ &key_LOCK_binlog, "LOCK_binlog", 0}
};
@@ -1078,6 +1083,7 @@ PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
key_COND_parallel_entry, key_COND_group_commit_orderer,
key_COND_prepare_ordered, key_COND_slave_background;
PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
+PSI_cond_key key_ss_cond_Ack_receiver_cond;
static PSI_cond_info all_server_conds[]=
{
@@ -1131,6 +1137,7 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_start_thread, "COND_start_thread", PSI_FLAG_GLOBAL},
{ &key_COND_wait_gtid, "COND_wait_gtid", 0},
{ &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0},
+ { &key_ss_cond_Ack_receiver_cond, "Ack_receiver::m_cond", 0},
{ &key_COND_binlog_send, "COND_binlog_send", 0}
};
@@ -1138,6 +1145,7 @@ PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_main,
key_thread_one_connection, key_thread_signal_hand,
key_thread_slave_background, key_rpl_parallel_thread;
+PSI_thread_key key_ss_thread_Ack_receiver_thread;
static PSI_thread_info all_server_threads[]=
{
@@ -1164,6 +1172,7 @@ static PSI_thread_info all_server_threads[]=
{ &key_thread_one_connection, "one_connection", 0},
{ &key_thread_signal_hand, "signal_handler", PSI_FLAG_GLOBAL},
{ &key_thread_slave_background, "slave_background", PSI_FLAG_GLOBAL},
+ { &key_ss_thread_Ack_receiver_thread, "Ack_receiver", PSI_FLAG_GLOBAL},
{ &key_rpl_parallel_thread, "rpl_parallel_thread", 0}
};
@@ -1743,6 +1752,7 @@ static void close_connections(void)
Events::deinit();
slave_prepare_for_shutdown();
mysql_bin_log.stop_background_thread();
+ ack_receiver.stop();
/*
Give threads time to die.
@@ -2226,7 +2236,6 @@ void clean_up(bool print_message)
tc_log->close();
#ifdef HAVE_REPLICATION
semi_sync_master_deinit();
- semi_sync_slave_deinit();
#endif
delegates_destroy();
xid_cache_free();
@@ -4251,7 +4260,8 @@ static int init_common_variables()
key_BINLOG_COND_bin_log_updated,
key_file_binlog,
key_file_binlog_index,
- key_BINLOG_COND_queue_busy);
+ key_BINLOG_COND_queue_busy,
+ key_LOCK_binlog_end_pos);
#endif
/*
@@ -5183,8 +5193,12 @@ static int init_server_components()
"--log-bin option is not defined.");
}
- semi_sync_master_init();
- semi_sync_slave_init();
+ if (repl_semisync_master.initObject() ||
+ repl_semisync_slave.initObject())
+ {
+ sql_print_error("Could not initialize semisync.");
+ unireg_abort(1);
+ }
#endif
if (opt_bin_log)
@@ -8599,14 +8613,10 @@ SHOW_VAR status_vars[]= {
{"Rpl_semi_sync_master_net_wait_time", (char*) &SHOW_FNAME(net_wait_time), SHOW_FUNC},
{"Rpl_semi_sync_master_net_waits", (char*) &SHOW_FNAME(net_wait_num), SHOW_FUNC},
{"Rpl_semi_sync_master_net_avg_wait_time", (char*) &SHOW_FNAME(avg_net_wait_time), SHOW_FUNC},
-#ifdef HAVE_ACC_RECEIVER
{"Rpl_semi_sync_master_request_ack", (char*) &rpl_semi_sync_master_request_ack, SHOW_LONGLONG},
{"Rpl_semi_sync_master_get_ack", (char*)&rpl_semi_sync_master_get_ack, SHOW_LONGLONG},
-#endif
{"Rpl_semi_sync_slave_status", (char*) &rpl_semi_sync_slave_status, SHOW_BOOL},
-#ifdef HAVE_ACC_RECEIVER
{"Rpl_semi_sync_slave_send_ack", (char*) &rpl_semi_sync_slave_send_ack, SHOW_LONGLONG},
-#endif
#endif /* HAVE_REPLICATION */
#ifdef HAVE_QUERY_CACHE
{"Qcache_free_blocks", (char*) &query_cache.free_memory_blocks, SHOW_LONG_NOFLUSH},
@@ -8950,7 +8960,7 @@ static int mysql_init_variables(void)
transactions_multi_engine= 0;
rpl_transactions_multi_engine= 0;
transactions_gtid_foreign_engine= 0;
- run_hooks_enabled= 0;
+ run_hooks_enabled= 0; // don't run hooks, semisync does not need 'em
log_bin_basename= NULL;
log_bin_index= NULL;
@@ -10348,6 +10358,8 @@ PSI_stage_info stage_waiting_for_master_update= { 0, "Waiting for master update"
PSI_stage_info stage_waiting_for_relay_log_space= { 0, "Waiting for the slave SQL thread to free enough relay log space", 0};
PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave=
{ 0, "Waiting for semi-sync ACK from slave", 0};
+PSI_stage_info stage_waiting_for_semi_sync_slave={ 0, "Waiting for semi-sync slave connection", 0};
+PSI_stage_info stage_reading_semi_sync_ack={ 0, "Reading semi-sync ACK from slave", 0};
PSI_stage_info stage_waiting_for_slave_mutex_on_exit= { 0, "Waiting for slave mutex on exit", 0};
PSI_stage_info stage_waiting_for_slave_thread_to_start= { 0, "Waiting for slave thread to start", 0};
PSI_stage_info stage_waiting_for_table_flush= { 0, "Waiting for table flush", 0};
@@ -10508,6 +10520,9 @@ PSI_stage_info *all_server_stages[]=
& stage_gtid_wait_other_connection,
& stage_slave_background_process_request,
& stage_slave_background_wait_request,
+ & stage_waiting_for_semi_sync_ack_from_slave,
+ & stage_waiting_for_semi_sync_slave,
+ & stage_reading_semi_sync_ack,
& stage_waiting_for_deadlock_kill
};
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 1da95fd13f5..5399ec91b19 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -310,6 +310,7 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_LOCK_start_thread,
key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc;
extern PSI_mutex_key key_RELAYLOG_LOCK_index;
+extern PSI_mutex_key key_LOCK_relaylog_end_pos;
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool, key_LOCK_parallel_entry;
diff --git a/sql/rpl_handler.h b/sql/rpl_handler.h
index 62bd7d2606c..f8b11adbb69 100644
--- a/sql/rpl_handler.h
+++ b/sql/rpl_handler.h
@@ -209,6 +209,10 @@ extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
if semisync replication is not enabled, we can return immediately.
*/
#ifdef HAVE_REPLICATION
+/*
+ As semisync is unpluggined and its hooks are turned into static
+ invocations all other hooks are not run for optimization sake.
+*/
#define RUN_HOOK(group, hook, args) \
(unlikely(run_hooks_enabled) ? group ##_delegate->hook args : 0)
#else
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index 610bc77b683..14d74dc4bb7 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -311,6 +311,11 @@ class Master_info : public Slave_reporting_capability
/* The parallel replication mode. */
enum_slave_parallel_mode parallel_mode;
+ /*
+ semi_ack is used to identify if the current binlog event needs an
+ ACK from slave, or if delay_master is enabled.
+ */
+ int semi_ack;
};
int init_master_info(Master_info* mi, const char* master_info_fname,
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 6e8b6edb44c..321eef97700 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -74,7 +74,8 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
key_RELAYLOG_COND_bin_log_updated,
key_file_relaylog,
key_file_relaylog_index,
- key_RELAYLOG_COND_queue_busy);
+ key_RELAYLOG_COND_queue_busy,
+ key_LOCK_relaylog_end_pos);
#endif
group_relay_log_name[0]= event_relay_log_name[0]=
diff --git a/sql/semisync.h b/sql/semisync.h
index 3142f920f1e..cf791b36c1b 100644
--- a/sql/semisync.h
+++ b/sql/semisync.h
@@ -47,6 +47,20 @@ public:
return exit_code;
}
+ inline bool function_exit(const char *func_name, bool exit_code)
+ {
+ if (trace_level_ & kTraceFunction)
+ sql_print_information("<--- %s exit (%s)", func_name,
+ exit_code ? "True" : "False");
+ return exit_code;
+ }
+
+ inline void function_exit(const char *func_name)
+ {
+ if (trace_level_ & kTraceFunction)
+ sql_print_information("<--- %s exit", func_name);
+ }
+
Trace()
:trace_level_(0L)
{}
@@ -79,5 +93,7 @@ public:
#define REPLY_MAGIC_NUM_OFFSET 0
#define REPLY_BINLOG_POS_OFFSET (REPLY_MAGIC_NUM_OFFSET + REPLY_MAGIC_NUM_LEN)
#define REPLY_BINLOG_NAME_OFFSET (REPLY_BINLOG_POS_OFFSET + REPLY_BINLOG_POS_LEN)
+#define REPLY_MESSAGE_MAX_LENGTH \
+ (REPLY_MAGIC_NUM_LEN + REPLY_BINLOG_POS_LEN + REPLY_BINLOG_NAME_LEN)
#endif /* SEMISYNC_H */
diff --git a/sql/semisync_master.cc b/sql/semisync_master.cc
index 21c52addce9..de91e30beec 100644
--- a/sql/semisync_master.cc
+++ b/sql/semisync_master.cc
@@ -24,7 +24,9 @@
#define TIME_BILLION 1000000000
/* This indicates whether semi-synchronous replication is enabled. */
-my_bool rpl_semi_sync_master_enabled;
+my_bool rpl_semi_sync_master_enabled= 0;
+unsigned long long rpl_semi_sync_master_request_ack = 0;
+unsigned long long rpl_semi_sync_master_get_ack = 0;
my_bool rpl_semi_sync_master_wait_no_slave = 1;
my_bool rpl_semi_sync_master_status = 0;
ulong rpl_semi_sync_master_wait_point =
@@ -47,6 +49,15 @@ ulonglong rpl_semi_sync_master_net_wait_time = 0;
ulonglong rpl_semi_sync_master_trx_wait_time = 0;
ReplSemiSyncMaster repl_semisync_master;
+Ack_receiver ack_receiver;
+
+/*
+ structure to save transaction log filename and position
+*/
+typedef struct Trans_binlog_info {
+ my_off_t log_pos;
+ char log_file[FN_REFLEN];
+} Trans_binlog_info;
static int getWaitTime(const struct timespec& start_ts);
@@ -336,7 +347,8 @@ ReplSemiSyncMaster::ReplSemiSyncMaster()
wait_file_pos_(0),
master_enabled_(false),
wait_timeout_(0L),
- state_(0)
+ state_(0),
+ wait_point_(0)
{
strcpy(reply_file_name_, "");
strcpy(wait_file_name_, "");
@@ -345,18 +357,13 @@ ReplSemiSyncMaster::ReplSemiSyncMaster()
int ReplSemiSyncMaster::initObject()
{
int result;
- const char *kWho = "ReplSemiSyncMaster::initObject";
- if (init_done_)
- {
- fprintf(stderr, "%s called twice\n", kWho);
- return 1;
- }
init_done_ = true;
/* References to the parameter works after set_options(). */
setWaitTimeout(rpl_semi_sync_master_timeout);
setTraceLevel(rpl_semi_sync_master_trace_level);
+ setWaitPoint(rpl_semi_sync_master_wait_point);
/* Mutex initialization can only be done after MY_INIT(). */
mysql_mutex_init(key_LOCK_binlog,
@@ -365,9 +372,22 @@ int ReplSemiSyncMaster::initObject()
&COND_binlog_send, NULL);
if (rpl_semi_sync_master_enabled)
+ {
result = enableMaster();
+ if (!result)
+ result= ack_receiver.start(); /* Start the ACK thread. */
+ }
else
+ {
result = disableMaster();
+ }
+
+ /*
+ If rpl_semi_sync_master_wait_no_slave is disabled, let's temporarily
+ switch off semisync to avoid hang if there's none active slave.
+ */
+ if (!rpl_semi_sync_master_wait_no_slave)
+ switch_off();
return result;
}
@@ -390,7 +410,6 @@ int ReplSemiSyncMaster::enableMaster()
set_master_enabled(true);
state_ = true;
- run_hooks_enabled= 1;
sql_print_information("Semi-sync replication enabled on the master.");
}
else
@@ -498,12 +517,50 @@ void ReplSemiSyncMaster::remove_slave()
unlock();
}
-bool ReplSemiSyncMaster::is_semi_sync_slave()
+int ReplSemiSyncMaster::reportReplyPacket(uint32 server_id, const uchar *packet,
+ ulong packet_len)
{
- int null_value;
- long long val= 0;
- get_user_var_int("rpl_semi_sync_slave", &val, &null_value);
- return val;
+ const char *kWho = "ReplSemiSyncMaster::reportReplyPacket";
+ int result= -1;
+ char log_file_name[FN_REFLEN+1];
+ my_off_t log_file_pos;
+ ulong log_file_len = 0;
+
+ function_enter(kWho);
+
+ if (unlikely(packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum))
+ {
+ sql_print_error("Read semi-sync reply magic number error");
+ goto l_end;
+ }
+
+ if (unlikely(packet_len < REPLY_BINLOG_NAME_OFFSET))
+ {
+ sql_print_error("Read semi-sync reply length error: packet is too small");
+ goto l_end;
+ }
+
+ log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
+ log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
+ if (unlikely(log_file_len >= FN_REFLEN))
+ {
+ sql_print_error("Read semi-sync reply binlog file length too large");
+ goto l_end;
+ }
+ strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
+ log_file_name[log_file_len] = 0;
+
+ DBUG_ASSERT(dirname_length(log_file_name) == 0);
+
+ if (trace_level_ & kTraceDetail)
+ sql_print_information("%s: Got reply(%s, %lu) from server %u",
+ kWho, log_file_name, (ulong)log_file_pos, server_id);
+
+ rpl_semi_sync_master_get_ack++;
+ reportReplyBinlog(server_id, log_file_name, log_file_pos);
+
+l_end:
+ return function_exit(kWho, result);
}
int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
@@ -602,6 +659,121 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
return function_exit(kWho, 0);
}
+int ReplSemiSyncMaster::waitAfterSync(const char *log_file, my_off_t log_pos)
+{
+ if (!getMasterEnabled())
+ return 0;
+
+ int ret= 0;
+ if(log_pos &&
+ waitPoint() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC)
+ ret= commitTrx(log_file + dirname_length(log_file), log_pos);
+
+ return ret;
+}
+
+int ReplSemiSyncMaster::waitAfterCommit(THD* thd, bool all)
+{
+ if (!getMasterEnabled())
+ return 0;
+
+ int ret= 0;
+ const char *log_file;
+ my_off_t log_pos;
+
+ bool is_real_trans=
+ (all || thd->transaction.all.ha_list == 0);
+ /*
+ The coordinates are propagated to this point having been computed
+ in reportBinlogUpdate
+ */
+ Trans_binlog_info *log_info= thd->semisync_info;
+ log_file= log_info && log_info->log_file[0] ? log_info->log_file : 0;
+ log_pos= log_info ? log_info->log_pos : 0;
+
+ DBUG_ASSERT(!log_file || dirname_length(log_file) == 0);
+
+ if (is_real_trans &&
+ log_pos &&
+ waitPoint() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT)
+ ret= commitTrx(log_file, log_pos);
+
+ if (is_real_trans && log_info)
+ {
+ log_info->log_file[0]= 0;
+ log_info->log_pos= 0;
+ }
+
+ return ret;
+}
+
+int ReplSemiSyncMaster::waitAfterRollback(THD *thd, bool all)
+{
+ return waitAfterCommit(thd, all);
+}
+
+/**
+ The method runs after flush to binary log is done.
+*/
+int ReplSemiSyncMaster::reportBinlogUpdate(THD* thd, const char *log_file,
+ my_off_t log_pos)
+{
+ if (getMasterEnabled())
+ {
+ Trans_binlog_info *log_info;
+
+ if (!(log_info= thd->semisync_info))
+ {
+ if(!(log_info=
+ (Trans_binlog_info*) my_malloc(sizeof(Trans_binlog_info), MYF(0))))
+ return 1;
+ thd->semisync_info= log_info;
+ }
+ strcpy(log_info->log_file, log_file + dirname_length(log_file));
+ log_info->log_pos = log_pos;
+
+ return writeTranxInBinlog(log_info->log_file, log_pos);
+ }
+
+ return 0;
+}
+
+void ReplSemiSyncMaster::dump_start(THD* thd,
+ const char *log_file,
+ my_off_t log_pos)
+{
+ if (!thd->semi_sync_slave)
+ return;
+
+ if (ack_receiver.add_slave(thd))
+ {
+ sql_print_error("Failed to register slave to semi-sync ACK receiver "
+ "thread. Turning off semisync");
+ thd->semi_sync_slave= 0;
+ return;
+ }
+
+ add_slave();
+ reportReplyBinlog(thd->variables.server_id, log_file + dirname_length(log_file), log_pos);
+ sql_print_information("Start semi-sync binlog_dump to slave (server_id: %d), pos(%s, %lu",
+ thd->variables.server_id, log_file, (unsigned long)log_pos);
+
+ return;
+}
+
+void ReplSemiSyncMaster::dump_end(THD* thd)
+{
+ if (!thd->semi_sync_slave)
+ return;
+
+ sql_print_information("Stop semi-sync binlog_dump to slave (server_id: %d)", thd->variables.server_id);
+
+ remove_slave();
+ ack_receiver.remove_slave(thd);
+
+ return;
+}
+
int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
my_off_t trx_wait_binlog_pos)
{
@@ -850,42 +1022,23 @@ int ReplSemiSyncMaster::try_switch_on(int server_id,
return function_exit(kWho, 0);
}
-int ReplSemiSyncMaster::reserveSyncHeader(unsigned char *header,
- ulong size)
+int ReplSemiSyncMaster::reserveSyncHeader(String* packet)
{
const char *kWho = "ReplSemiSyncMaster::reserveSyncHeader";
function_enter(kWho);
- int hlen=0;
- if (!is_semi_sync_slave())
- {
- hlen= 0;
- }
- else
- {
- /* No enough space for the extra header, disable semi-sync master */
- if (sizeof(kSyncHeader) > size)
- {
- sql_print_warning("No enough space in the packet "
- "for semi-sync extra header, "
- "semi-sync replication disabled");
- disableMaster();
- return 0;
- }
-
- /* Set the magic number and the sync status. By default, no sync
- * is required.
- */
- memcpy(header, kSyncHeader, sizeof(kSyncHeader));
- hlen= sizeof(kSyncHeader);
- }
- return function_exit(kWho, hlen);
+ /* Set the magic number and the sync status. By default, no sync
+ * is required.
+ */
+ packet->append(reinterpret_cast<const char*>(kSyncHeader),
+ sizeof(kSyncHeader));
+ return function_exit(kWho, 0);
}
-int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
+int ReplSemiSyncMaster::updateSyncHeader(THD* thd, unsigned char *packet,
const char *log_file_name,
my_off_t log_file_pos,
- uint32 server_id)
+ bool* need_sync)
{
const char *kWho = "ReplSemiSyncMaster::updateSyncHeader";
int cmp = 0;
@@ -894,8 +1047,11 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
/* If the semi-sync master is not enabled, or the slave is not a semi-sync
* target, do not request replies from the slave.
*/
- if (!getMasterEnabled() || !is_semi_sync_slave())
+ if (!getMasterEnabled() || !thd->semi_sync_slave)
+ {
+ *need_sync = false;
return 0;
+ }
function_enter(kWho);
@@ -903,12 +1059,15 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
/* This is the real check inside the mutex. */
if (!getMasterEnabled())
- goto l_end; // sync= false at this point in time
+ {
+ assert(sync == false);
+ goto l_end;
+ }
if (is_on())
{
/* semi-sync is ON */
- /* sync= false; No sync unless a transaction is involved. */
+ sync = false; /* No sync unless a transaction is involved. */
if (reply_file_name_inited_)
{
@@ -962,8 +1121,9 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
if (trace_level_ & kTraceDetail)
sql_print_information("%s: server(%d), (%s, %lu) sync(%d), repl(%d)",
- kWho, server_id, log_file_name,
+ kWho, thd->variables.server_id, log_file_name,
(ulong)log_file_pos, sync, (int)is_on());
+ *need_sync= sync;
l_end:
unlock();
@@ -1033,6 +1193,10 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
log_file_name, (ulong)log_file_pos);
switch_off();
}
+ else
+ {
+ rpl_semi_sync_master_request_ack++;
+ }
}
l_end:
@@ -1041,19 +1205,12 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
return function_exit(kWho, result);
}
-int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
- const char *event_buf)
+int ReplSemiSyncMaster::flushNet(THD *thd,
+ const char *event_buf)
{
- const char *kWho = "ReplSemiSyncMaster::readSlaveReply";
- const unsigned char *packet;
- char log_file_name[FN_REFLEN];
- my_off_t log_file_pos;
- ulong log_file_len = 0;
- ulong packet_len;
+ const char *kWho = "ReplSemiSyncMaster::flushNet";
int result = -1;
- struct timespec start_ts;
- ulong trc_level = trace_level_;
- LINT_INIT_STRUCT(start_ts);
+ NET* net= &thd->net;
function_enter(kWho);
@@ -1065,9 +1222,6 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
goto l_end;
}
- if (trc_level & kTraceNetWait)
- set_timespec(start_ts, 0);
-
/* We flush to make sure that the current event is sent to the network,
* instead of being buffered in the TCP/IP stack.
*/
@@ -1079,82 +1233,35 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
}
net_clear(net, 0);
- if (trc_level & kTraceDetail)
- sql_print_information("%s: Wait for replica's reply", kWho);
-
- /* Wait for the network here. Though binlog dump thread can indefinitely wait
- * here, transactions would not wait indefintely.
- * Transactions wait on binlog replies detected by binlog dump threads. If
- * binlog dump threads wait too long, transactions will timeout and continue.
- */
- packet_len = my_net_read(net);
-
- if (trc_level & kTraceNetWait)
- {
- int wait_time = getWaitTime(start_ts);
- if (wait_time < 0)
- {
- sql_print_error("Semi-sync master wait for reply "
- "fail to get wait time.");
- rpl_semi_sync_master_timefunc_fails++;
- }
- else
- {
- rpl_semi_sync_master_net_wait_num++;
- rpl_semi_sync_master_net_wait_time += wait_time;
- }
- }
-
- if (packet_len == packet_error || packet_len < REPLY_BINLOG_NAME_OFFSET)
- {
- if (packet_len == packet_error)
- sql_print_error("Read semi-sync reply network error: %s (errno: %d)",
- net->last_error, net->last_errno);
- else
- sql_print_error("Read semi-sync reply length error: %s (errno: %d)",
- net->last_error, net->last_errno);
- goto l_end;
- }
-
- packet = net->read_pos;
- if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
- {
- sql_print_error("Read semi-sync reply magic number error");
- goto l_end;
- }
-
- log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
- log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
- if (log_file_len >= FN_REFLEN)
- {
- sql_print_error("Read semi-sync reply binlog file length too large");
- goto l_end;
- }
- strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
- log_file_name[log_file_len] = 0;
-
- if (trc_level & kTraceDetail)
- sql_print_information("%s: Got reply (%s, %lu)",
- kWho, log_file_name, (ulong)log_file_pos);
-
- result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
+ net->pkt_nr++;
+ result = 0;
+ rpl_semi_sync_master_net_wait_num++;
l_end:
+ thd->clear_error();
return function_exit(kWho, result);
}
-
-int ReplSemiSyncMaster::resetMaster()
+int ReplSemiSyncMaster::afterResetMaster()
{
- const char *kWho = "ReplSemiSyncMaster::resetMaster";
+ const char *kWho = "ReplSemiSyncMaster::afterResetMaster";
int result = 0;
function_enter(kWho);
+ if (rpl_semi_sync_master_enabled)
+ {
+ sql_print_information("Enable Semi-sync Master after reset master");
+ enableMaster();
+ }
lock();
- state_ = getMasterEnabled()? 1 : 0;
+ if (rpl_semi_sync_master_clients == 0 &&
+ !rpl_semi_sync_master_wait_no_slave)
+ state_ = 0;
+ else
+ state_ = getMasterEnabled()? 1 : 0;
wait_file_name_inited_ = false;
reply_file_name_inited_ = false;
@@ -1176,6 +1283,31 @@ int ReplSemiSyncMaster::resetMaster()
return function_exit(kWho, result);
}
+int ReplSemiSyncMaster::beforeResetMaster()
+{
+ const char *kWho = "ReplSemiSyncMaster::beforeResetMaster";
+ int result = 0;
+
+ function_enter(kWho);
+
+ if (rpl_semi_sync_master_enabled)
+ disableMaster();
+
+ return function_exit(kWho, result);
+}
+
+void ReplSemiSyncMaster::checkAndSwitch()
+{
+ lock();
+ if (getMasterEnabled() && is_on())
+ {
+ if (!rpl_semi_sync_master_wait_no_slave
+ && rpl_semi_sync_master_clients == 0)
+ switch_off();
+ }
+ unlock();
+}
+
void ReplSemiSyncMaster::setExportStats()
{
lock();
@@ -1219,212 +1351,8 @@ static int getWaitTime(const struct timespec& start_ts)
return (int)(end_usecs - start_usecs);
}
-/***************************************************************************
- Semisync master interface setup and deinit
-***************************************************************************/
-
-C_MODE_START
-
-int repl_semi_report_binlog_update(Binlog_storage_param *param,
- const char *log_file,
- my_off_t log_pos, uint32 flags)
-{
- int error= 0;
-
- if (repl_semisync_master.getMasterEnabled())
- {
- /*
- Let us store the binlog file name and the position, so that
- we know how long to wait for the binlog to the replicated to
- the slave in synchronous replication.
- */
- error= repl_semisync_master.writeTranxInBinlog(log_file,
- log_pos);
- }
-
- return error;
-}
-
-int repl_semi_request_commit(Trans_param *param)
-{
- return 0;
-}
-
-int repl_semi_report_binlog_sync(Binlog_storage_param *param,
- const char *log_file,
- my_off_t log_pos, uint32 flags)
-{
- int error= 0;
- if (rpl_semi_sync_master_wait_point ==
- SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC)
- {
- error= repl_semisync_master.commitTrx(log_file, log_pos);
- }
-
- return error;
-}
-
-int repl_semi_report_commit(Trans_param *param)
-{
- if (rpl_semi_sync_master_wait_point !=
- SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT)
- {
- return 0;
- }
-
- bool is_real_trans= param->flags & TRANS_IS_REAL_TRANS;
-
- if (is_real_trans && param->log_pos)
- {
- const char *binlog_name= param->log_file;
- return repl_semisync_master.commitTrx(binlog_name, param->log_pos);
- }
- return 0;
-}
-
-int repl_semi_report_rollback(Trans_param *param)
-{
- return repl_semi_report_commit(param);
-}
-
-int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
- const char *log_file,
- my_off_t log_pos)
-{
- bool semi_sync_slave= repl_semisync_master.is_semi_sync_slave();
-
- if (semi_sync_slave)
- {
- /* One more semi-sync slave */
- repl_semisync_master.add_slave();
-
- /*
- Let's assume this semi-sync slave has already received all
- binlog events before the filename and position it requests.
- */
- repl_semisync_master.reportReplyBinlog(param->server_id, log_file, log_pos);
- }
- sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
- semi_sync_slave ? "semi-sync" : "asynchronous",
- param->server_id, log_file, (ulong)log_pos);
-
- return 0;
-}
-
-int repl_semi_binlog_dump_end(Binlog_transmit_param *param)
-{
- bool semi_sync_slave= repl_semisync_master.is_semi_sync_slave();
-
- sql_print_information("Stop %s binlog_dump to slave (server_id: %d)",
- semi_sync_slave ? "semi-sync" : "asynchronous",
- param->server_id);
- if (semi_sync_slave)
- {
- /* One less semi-sync slave */
- repl_semisync_master.remove_slave();
- }
- return 0;
-}
-
-int repl_semi_reserve_header(Binlog_transmit_param *param,
- unsigned char *header,
- ulong size, ulong *len)
-{
- *len += repl_semisync_master.reserveSyncHeader(header, size);
- return 0;
-}
-
-int repl_semi_before_send_event(Binlog_transmit_param *param,
- unsigned char *packet, ulong len,
- const char *log_file, my_off_t log_pos)
-{
- return repl_semisync_master.updateSyncHeader(packet,
- log_file,
- log_pos,
- param->server_id);
-}
-
-int repl_semi_after_send_event(Binlog_transmit_param *param,
- const char *event_buf, ulong len)
-{
- if (repl_semisync_master.is_semi_sync_slave())
- {
- THD *thd= current_thd;
- /*
- Possible errors in reading slave reply are ignored deliberately
- because we do not want dump thread to quit on this. Error
- messages are already reported.
- */
- (void) repl_semisync_master.readSlaveReply(&thd->net,
- param->server_id, event_buf);
- thd->clear_error();
- }
- return 0;
-}
-
-int repl_semi_reset_master(Binlog_transmit_param *param)
-{
- if (repl_semisync_master.resetMaster())
- return 1;
- return 0;
-}
-
-C_MODE_END
-
-Trans_observer trans_observer=
-{
- sizeof(Trans_observer), // len
-
- repl_semi_report_commit, // after_commit
- repl_semi_report_rollback, // after_rollback
-};
-
-Binlog_storage_observer storage_observer=
-{
- sizeof(Binlog_storage_observer), // len
-
- repl_semi_report_binlog_update, // report_update
- repl_semi_report_binlog_sync, // after_sync
-};
-
-Binlog_transmit_observer transmit_observer=
-{
- sizeof(Binlog_transmit_observer), // len
-
- repl_semi_binlog_dump_start, // start
- repl_semi_binlog_dump_end, // stop
- repl_semi_reserve_header, // reserve_header
- repl_semi_before_send_event, // before_send_event
- repl_semi_after_send_event, // after_send_event
- repl_semi_reset_master, // reset
-};
-
-static bool semi_sync_master_inited= 0;
-
-int semi_sync_master_init()
-{
- void *p= 0;
- if (repl_semisync_master.initObject())
- return 1;
- if (register_trans_observer(&trans_observer, p))
- return 1;
- if (register_binlog_storage_observer(&storage_observer, p))
- return 1;
- if (register_binlog_transmit_observer(&transmit_observer, p))
- return 1;
- semi_sync_master_inited= 1;
- return 0;
-}
-
void semi_sync_master_deinit()
{
- void *p= 0;
- if (!semi_sync_master_inited)
- return;
-
- unregister_trans_observer(&trans_observer, p);
- unregister_binlog_storage_observer(&storage_observer, p);
- unregister_binlog_transmit_observer(&transmit_observer, p);
repl_semisync_master.cleanup();
- semi_sync_master_inited= 0;
+ ack_receiver.cleanup();
}
diff --git a/sql/semisync_master.h b/sql/semisync_master.h
index ff1e3dd48b4..66fac17cd45 100644
--- a/sql/semisync_master.h
+++ b/sql/semisync_master.h
@@ -20,14 +20,13 @@
#define SEMISYNC_MASTER_H
#include "semisync.h"
+#include "semisync_master_ack_receiver.h"
#ifdef HAVE_PSI_INTERFACE
extern PSI_mutex_key key_LOCK_binlog;
extern PSI_cond_key key_COND_binlog_send;
#endif
-extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave;
-
struct TranxNode {
char log_name_[FN_REFLEN];
my_off_t log_pos_;
@@ -432,6 +431,9 @@ class ReplSemiSyncMaster
bool state_; /* whether semi-sync is switched */
+ /*Waiting for ACK before/after innodb commit*/
+ ulong wait_point_;
+
void lock();
void unlock();
void cond_broadcast();
@@ -473,6 +475,17 @@ class ReplSemiSyncMaster
wait_timeout_ = wait_timeout;
}
+ /*set the ACK point, after binlog sync or after transaction commit*/
+ void setWaitPoint(unsigned long ack_point)
+ {
+ wait_point_ = ack_point;
+ }
+
+ ulong waitPoint() //no cover line
+ {
+ return wait_point_; //no cover line
+ }
+
/* Initialize this class after MySQL parameters are initialized. this
* function should be called once at bootstrap time.
*/
@@ -490,8 +503,9 @@ class ReplSemiSyncMaster
/* Remove a semi-sync replication slave */
void remove_slave();
- /* Is the slave servered by the thread requested semi-sync */
- bool is_semi_sync_slave();
+ /* It parses a reply packet and call reportReplyBinlog to handle it. */
+ int reportReplyPacket(uint32 server_id, const uchar *packet,
+ ulong packet_len);
/* In semi-sync replication, reports up to which binlog position we have
* received replies from the slave indicating that it already get the events.
@@ -527,42 +541,61 @@ class ReplSemiSyncMaster
int commitTrx(const char* trx_wait_binlog_name,
my_off_t trx_wait_binlog_pos);
+ /*Wait for ACK after writing/sync binlog to file*/
+ int waitAfterSync(const char* log_file, my_off_t log_pos);
+
+ /*Wait for ACK after commting the transaction*/
+ int waitAfterCommit(THD* thd, bool all);
+
+ /*Wait after the transaction is rollback*/
+ int waitAfterRollback(THD *thd, bool all);
+ /*Store the current binlog position in active_tranxs_. This position should
+ * be acked by slave*/
+ int reportBinlogUpdate(THD *thd, const char *log_file,my_off_t log_pos);
+
+ void dump_start(THD* thd,
+ const char *log_file,
+ my_off_t log_pos);
+
+ void dump_end(THD* thd);
+
/* Reserve space in the replication event packet header:
* . slave semi-sync off: 1 byte - (0)
* . slave semi-sync on: 3 byte - (0, 0xef, 0/1}
- *
+ *
* Input:
- * header - (IN) the header buffer
- * size - (IN) size of the header buffer
+ * packet - (IN) the header buffer
*
* Return:
* size of the bytes reserved for header
*/
- int reserveSyncHeader(unsigned char *header, unsigned long size);
+ int reserveSyncHeader(String* packet);
/* Update the sync bit in the packet header to indicate to the slave whether
* the master will wait for the reply of the event. If semi-sync is switched
* off and we detect that the slave is catching up, we switch semi-sync on.
*
* Input:
+ * THD - (IN) current dump thread
* packet - (IN) the packet containing the replication event
* log_file_name - (IN) the event ending position's file name
* log_file_pos - (IN) the event ending position's file offset
+ * need_sync - (IN) identify if flushNet is needed to call.
* server_id - (IN) master server id number
*
* Return:
* 0: success; non-zero: error
*/
- int updateSyncHeader(unsigned char *packet,
+ int updateSyncHeader(THD* thd, unsigned char *packet,
const char *log_file_name,
- my_off_t log_file_pos,
- uint32 server_id);
+ my_off_t log_file_pos,
+ bool* need_sync);
/* Called when a transaction finished writing binlog events.
* . update the 'largest' transactions' binlog event position
* . insert the ending position in the active transaction list if
* semi-sync is on
- *
+ *
* Input: (the transaction events' ending binlog position)
* log_file_name - (IN) transaction ending position's file name
* log_file_pos - (IN) transaction ending position's file offset
@@ -574,16 +607,8 @@ class ReplSemiSyncMaster
/* Read the slave's reply so that we know how much progress the slave makes
* on receive replication events.
- *
- * Input:
- * net - (IN) the connection to master
- * server_id - (IN) master server id number
- * event_buf - (IN) pointer to the event packet
- *
- * Return:
- * 0: success; non-zero: error
*/
- int readSlaveReply(NET *net, uint32 server_id, const char *event_buf);
+ int flushNet(THD* thd, const char *event_buf);
/* Export internal statistics for semi-sync replication. */
void setExportStats();
@@ -591,7 +616,12 @@ class ReplSemiSyncMaster
/* 'reset master' command is issued from the user and semi-sync need to
* go off for that.
*/
- int resetMaster();
+ int afterResetMaster();
+
+ /*called before reset master*/
+ int beforeResetMaster();
+
+ void checkAndSwitch();
};
enum rpl_semi_sync_master_wait_point_t {
@@ -599,6 +629,9 @@ enum rpl_semi_sync_master_wait_point_t {
SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT,
};
+extern ReplSemiSyncMaster repl_semisync_master;
+extern Ack_receiver ack_receiver;
+
/* System and status variables for the master component */
extern my_bool rpl_semi_sync_master_enabled;
extern my_bool rpl_semi_sync_master_status;
@@ -620,6 +653,8 @@ extern ulonglong rpl_semi_sync_master_net_wait_num;
extern ulonglong rpl_semi_sync_master_trx_wait_num;
extern ulonglong rpl_semi_sync_master_net_wait_time;
extern ulonglong rpl_semi_sync_master_trx_wait_time;
+extern unsigned long long rpl_semi_sync_master_request_ack;
+extern unsigned long long rpl_semi_sync_master_get_ack;
/*
This indicates whether we should keep waiting if no semi-sync slave
@@ -630,7 +665,10 @@ extern ulonglong rpl_semi_sync_master_trx_wait_time;
extern char rpl_semi_sync_master_wait_no_slave;
extern ReplSemiSyncMaster repl_semisync_master;
-int semi_sync_master_init();
+extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave;
+extern PSI_stage_info stage_reading_semi_sync_ack;
+extern PSI_stage_info stage_waiting_for_semi_sync_slave;
+
void semi_sync_master_deinit();
#endif /* SEMISYNC_MASTER_H */
diff --git a/sql/semisync_master_ack_receiver.cc b/sql/semisync_master_ack_receiver.cc
new file mode 100644
index 00000000000..eee35cc122f
--- /dev/null
+++ b/sql/semisync_master_ack_receiver.cc
@@ -0,0 +1,308 @@
+/* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+#include <my_global.h>
+#include "semisync_master.h"
+#include "semisync_master_ack_receiver.h"
+
+extern PSI_mutex_key key_ss_mutex_Ack_receiver_mutex;
+extern PSI_cond_key key_ss_cond_Ack_receiver_cond;
+extern PSI_thread_key key_ss_thread_Ack_receiver_thread;
+extern ReplSemiSyncMaster repl_semisync;
+
+/* Callback function of ack receive thread */
+pthread_handler_t ack_receive_handler(void *arg)
+{
+ Ack_receiver *recv= reinterpret_cast<Ack_receiver *>(arg);
+
+ my_thread_init();
+ recv->run();
+ my_thread_end();
+
+ return NULL;
+}
+
+Ack_receiver::Ack_receiver()
+{
+ const char *kWho = "Ack_receiver::Ack_receiver";
+ function_enter(kWho);
+
+ m_status= ST_DOWN;
+ mysql_mutex_init(key_ss_mutex_Ack_receiver_mutex, &m_mutex,
+ MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_ss_cond_Ack_receiver_cond, &m_cond, NULL);
+ m_pid= 0;
+
+ function_exit(kWho);
+}
+
+void Ack_receiver::cleanup()
+{
+ const char *kWho = "Ack_receiver::~Ack_receiver";
+ function_enter(kWho);
+
+ stop();
+ mysql_mutex_destroy(&m_mutex);
+ mysql_cond_destroy(&m_cond);
+
+ function_exit(kWho);
+}
+
+bool Ack_receiver::start()
+{
+ const char *kWho = "Ack_receiver::start";
+ function_enter(kWho);
+
+ mysql_mutex_lock(&m_mutex);
+ if(m_status == ST_DOWN)
+ {
+ pthread_attr_t attr;
+
+ m_status= ST_UP;
+
+ if (DBUG_EVALUATE_IF("rpl_semisync_simulate_create_thread_failure", 1, 0) ||
+ pthread_attr_init(&attr) != 0 ||
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) != 0 ||
+#ifndef _WIN32
+ pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0 ||
+#endif
+ mysql_thread_create(key_ss_thread_Ack_receiver_thread, &m_pid,
+ &attr, ack_receive_handler, this))
+ {
+ sql_print_error("Failed to start semi-sync ACK receiver thread, "
+ " could not create thread(errno:%d)", errno);
+
+ m_status= ST_DOWN;
+ mysql_mutex_unlock(&m_mutex);
+
+ return function_exit(kWho, true);
+ }
+ (void) pthread_attr_destroy(&attr);
+ }
+ mysql_mutex_unlock(&m_mutex);
+
+ return function_exit(kWho, false);
+}
+
+void Ack_receiver::stop()
+{
+ const char *kWho = "Ack_receiver::stop";
+ function_enter(kWho);
+
+ mysql_mutex_lock(&m_mutex);
+ if (m_status == ST_UP)
+ {
+ m_status= ST_STOPPING;
+ mysql_cond_broadcast(&m_cond);
+
+ while (m_status == ST_STOPPING)
+ mysql_cond_wait(&m_cond, &m_mutex);
+
+ DBUG_ASSERT(m_status == ST_DOWN);
+
+ m_pid= 0;
+ }
+ mysql_mutex_unlock(&m_mutex);
+
+ function_exit(kWho);
+}
+
+bool Ack_receiver::add_slave(THD *thd)
+{
+ Slave *slave;
+ const char *kWho = "Ack_receiver::add_slave";
+ function_enter(kWho);
+
+ if (!(slave= new Slave))
+ return function_exit(kWho, true);
+
+ slave->thd= thd;
+ slave->vio= *thd->net.vio;
+ slave->vio.mysql_socket.m_psi= NULL;
+ slave->vio.read_timeout= 1;
+
+ mysql_mutex_lock(&m_mutex);
+ m_slaves.push_back(slave);
+ m_slaves_changed= true;
+ mysql_cond_broadcast(&m_cond);
+ mysql_mutex_unlock(&m_mutex);
+
+ return function_exit(kWho, false);
+}
+
+void Ack_receiver::remove_slave(THD *thd)
+{
+ I_List_iterator<Slave> it(m_slaves);
+ Slave *slave;
+ const char *kWho = "Ack_receiver::remove_slave";
+ function_enter(kWho);
+
+ mysql_mutex_lock(&m_mutex);
+
+ while ((slave= it++))
+ {
+ if (slave->thd == thd)
+ {
+ delete slave;
+ m_slaves_changed= true;
+ break;
+ }
+ }
+ mysql_mutex_unlock(&m_mutex);
+ function_exit(kWho);
+}
+
+inline void Ack_receiver::set_stage_info(const PSI_stage_info &stage)
+{
+ MYSQL_SET_STAGE(stage.m_key, __FILE__, __LINE__);
+}
+
+inline void Ack_receiver::wait_for_slave_connection()
+{
+ set_stage_info(stage_waiting_for_semi_sync_slave);
+ mysql_cond_wait(&m_cond, &m_mutex);
+}
+
+my_socket Ack_receiver::get_slave_sockets(fd_set *fds, uint *count)
+{
+ my_socket max_fd= INVALID_SOCKET;
+ Slave *slave;
+ I_List_iterator<Slave> it(m_slaves);
+
+ *count= 0;
+ FD_ZERO(fds);
+ while ((slave= it++))
+ {
+ (*count)++;
+ my_socket fd= slave->sock_fd();
+ max_fd= (fd > max_fd ? fd : max_fd);
+ FD_SET(fd, fds);
+ }
+
+ return max_fd;
+}
+
+/* Auxilary function to initialize a NET object with given net buffer. */
+static void init_net(NET *net, unsigned char *buff, unsigned int buff_len)
+{
+ memset(net, 0, sizeof(NET));
+ net->max_packet= buff_len;
+ net->buff= buff;
+ net->buff_end= buff + buff_len;
+ net->read_pos= net->buff;
+}
+
+void Ack_receiver::run()
+{
+ // skip LOCK_global_system_variables due to the 3rd arg
+ THD *thd= new THD(next_thread_id(), false, true);
+ NET net;
+ unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH];
+ fd_set read_fds;
+ my_socket max_fd= INVALID_SOCKET;
+ Slave *slave;
+
+ my_thread_init();
+
+ DBUG_ENTER("Ack_receiver::run");
+
+ sql_print_information("Starting ack receiver thread");
+ thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND;
+ thd->thread_stack= (char*) &thd;
+ thd->store_globals();
+ thd->security_ctx->skip_grants();
+ thread_safe_increment32(&service_thread_count);
+ thd->set_command(COM_DAEMON);
+ init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH);
+
+ mysql_mutex_lock(&m_mutex);
+ m_slaves_changed= true;
+ mysql_mutex_unlock(&m_mutex);
+
+ while (1)
+ {
+ fd_set fds;
+ int ret;
+ uint slave_count;
+
+ mysql_mutex_lock(&m_mutex);
+ if (unlikely(m_status == ST_STOPPING))
+ goto end;
+
+ set_stage_info(stage_waiting_for_semi_sync_ack_from_slave);
+ if (unlikely(m_slaves_changed))
+ {
+ if (unlikely(m_slaves.is_empty()))
+ {
+ wait_for_slave_connection();
+ mysql_mutex_unlock(&m_mutex);
+ continue;
+ }
+
+ max_fd= get_slave_sockets(&read_fds, &slave_count);
+ m_slaves_changed= false;
+ DBUG_PRINT("info", ("fd count %u, max_fd %d", slave_count, max_fd));
+ }
+
+ struct timeval tv= {1, 0};
+ fds= read_fds;
+ /* select requires max fd + 1 for the first argument */
+ ret= select(max_fd+1, &fds, NULL, NULL, &tv);
+ if (ret <= 0)
+ {
+ mysql_mutex_unlock(&m_mutex);
+
+ ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error", -1, ret);
+
+ if (ret == -1)
+ sql_print_information("Failed to select() on semi-sync dump sockets, "
+ "error: errno=%d", socket_errno);
+ /* Sleep 1us, so other threads can catch the m_mutex easily. */
+ my_sleep(1);
+ continue;
+ }
+
+ set_stage_info(stage_reading_semi_sync_ack);
+ I_List_iterator<Slave> it(m_slaves);
+
+ while ((slave= it++))
+ {
+ if (FD_ISSET(slave->sock_fd(), &fds))
+ {
+ ulong len;
+
+ net_clear(&net, 0);
+ net.vio= &slave->vio;
+
+ len= my_net_read(&net);
+ if (likely(len != packet_error))
+ repl_semisync_master.reportReplyPacket(slave->server_id(),
+ net.read_pos, len);
+ else if (net.last_errno == ER_NET_READ_ERROR)
+ FD_CLR(slave->sock_fd(), &read_fds);
+ }
+ }
+ mysql_mutex_unlock(&m_mutex);
+ }
+end:
+ sql_print_information("Stopping ack receiver thread");
+ m_status= ST_DOWN;
+ delete thd;
+ thread_safe_decrement32(&service_thread_count);
+ signal_thd_deleted();
+ mysql_cond_broadcast(&m_cond);
+ mysql_mutex_unlock(&m_mutex);
+ DBUG_VOID_RETURN;
+}
diff --git a/sql/semisync_master_ack_receiver.h b/sql/semisync_master_ack_receiver.h
new file mode 100644
index 00000000000..25307131bad
--- /dev/null
+++ b/sql/semisync_master_ack_receiver.h
@@ -0,0 +1,119 @@
+/* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+#ifndef SEMISYNC_MASTER_ACK_RECEIVER_DEFINED
+#define SEMISYNC_MASTER_ACK_RECEIVER_DEFINED
+
+#include "my_global.h"
+#include "my_pthread.h"
+#include "sql_class.h"
+#include "semisync.h"
+/**
+ Ack_receiver is responsible to control ack receive thread and maintain
+ slave information used by ack receive thread.
+
+ There are mainly four operations on ack receive thread:
+ start: start ack receive thread
+ stop: stop ack receive thread
+ add_slave: maintain a new semisync slave's information
+ remove_slave: remove a semisync slave's information
+ */
+class Ack_receiver : public ReplSemiSyncBase
+{
+public:
+ Ack_receiver();
+ ~Ack_receiver() {}
+ void cleanup();
+ /**
+ Notify ack receiver to receive acks on the dump session.
+
+ It adds the given dump thread into the slave list and wakes
+ up ack thread if it is waiting for any slave coming.
+
+ @param[in] thd THD of a dump thread.
+
+ @return it return false if succeeds, otherwise true is returned.
+ */
+ bool add_slave(THD *thd);
+
+ /**
+ Notify ack receiver not to receive ack on the dump session.
+
+ it removes the given dump thread from slave list.
+
+ @param[in] thd THD of a dump thread.
+ */
+ void remove_slave(THD *thd);
+
+ /**
+ Start ack receive thread
+
+ @return it return false if succeeds, otherwise true is returned.
+ */
+ bool start();
+
+ /**
+ Stop ack receive thread
+ */
+ void stop();
+
+ /**
+ The core of ack receive thread.
+
+ It monitors all slaves' sockets and receives acks when they come.
+ */
+ void run();
+
+ void setTraceLevel(unsigned long trace_level)
+ {
+ trace_level_= trace_level;
+ }
+private:
+ enum status {ST_UP, ST_DOWN, ST_STOPPING};
+ uint8 m_status;
+ /*
+ Protect m_status, m_slaves_changed and m_slaves. ack thread and other
+ session may access the variables at the same time.
+ */
+ mysql_mutex_t m_mutex;
+ mysql_cond_t m_cond;
+ /* If slave list is updated(add or remove). */
+ bool m_slaves_changed;
+
+ class Slave :public ilink
+ {
+public:
+ THD *thd;
+ Vio vio;
+
+ my_socket sock_fd() { return vio.mysql_socket.fd; }
+ uint server_id() { return thd->variables.server_id; }
+ };
+
+ I_List<Slave> m_slaves;
+
+ pthread_t m_pid;
+
+/* Declare them private, so no one can copy the object. */
+ Ack_receiver(const Ack_receiver &ack_receiver);
+ Ack_receiver& operator=(const Ack_receiver &ack_receiver);
+
+ void set_stage_info(const PSI_stage_info &stage);
+ void wait_for_slave_connection();
+ my_socket get_slave_sockets(fd_set *fds, uint *count);
+};
+
+extern Ack_receiver ack_receiver;
+#endif
diff --git a/sql/semisync_slave.cc b/sql/semisync_slave.cc
index 63bf9dca0e8..012f807d28e 100644
--- a/sql/semisync_slave.cc
+++ b/sql/semisync_slave.cc
@@ -18,10 +18,13 @@
#include <my_global.h>
#include "semisync_slave.h"
-my_bool rpl_semi_sync_slave_enabled;
+ReplSemiSyncSlave repl_semisync_slave;
+
+my_bool rpl_semi_sync_slave_enabled= 0;
+
+char rpl_semi_sync_slave_delay_master;
my_bool rpl_semi_sync_slave_status= 0;
ulong rpl_semi_sync_slave_trace_level;
-ReplSemiSyncSlave repl_semisync_slave;
/*
indicate whether or not the slave should send a reply to the master.
@@ -31,30 +34,27 @@ ReplSemiSyncSlave repl_semisync_slave;
checked in repl_semi_slave_queue_event.
*/
bool semi_sync_need_reply= false;
-
+unsigned int rpl_semi_sync_slave_kill_conn_timeout;
+unsigned long long rpl_semi_sync_slave_send_ack = 0;
int ReplSemiSyncSlave::initObject()
{
int result= 0;
- const char *kWho = "ReplSemiSyncSlave::initObject";
- if (init_done_)
- {
- fprintf(stderr, "%s called twice\n", kWho);
- return 1;
- }
init_done_ = true;
/* References to the parameter works after set_options(). */
setSlaveEnabled(rpl_semi_sync_slave_enabled);
setTraceLevel(rpl_semi_sync_slave_trace_level);
+ setDelayMaster(rpl_semi_sync_slave_delay_master);
+ setKillConnTimeout(rpl_semi_sync_slave_kill_conn_timeout);
return result;
}
int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
unsigned long total_len,
- bool *need_reply,
+ int *semi_flags,
const char **payload,
unsigned long *payload_len)
{
@@ -62,128 +62,119 @@ int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
int read_res = 0;
function_enter(kWho);
- if ((unsigned char)(header[0]) == kPacketMagicNum)
- {
- *need_reply = (header[1] & kPacketFlagSync);
- *payload_len = total_len - 2;
- *payload = header + 2;
-
- if (trace_level_ & kTraceDetail)
- sql_print_information("%s: reply - %d", kWho, *need_reply);
- }
- else
+ if (rpl_semi_sync_slave_status)
{
- sql_print_error("Missing magic number for semi-sync packet, packet "
- "len: %lu", total_len);
- read_res = -1;
+ if (DBUG_EVALUATE_IF("semislave_corrupt_log", 0, 1)
+ && (unsigned char)(header[0]) == kPacketMagicNum)
+ {
+ semi_sync_need_reply = (header[1] & kPacketFlagSync);
+ *payload_len = total_len - 2;
+ *payload = header + 2;
+
+ if (trace_level_ & kTraceDetail)
+ sql_print_information("%s: reply - %d", kWho, semi_sync_need_reply);
+
+ if (semi_sync_need_reply)
+ *semi_flags |= SEMI_SYNC_NEED_ACK;
+ if (isDelayMaster())
+ *semi_flags |= SEMI_SYNC_SLAVE_DELAY_SYNC;
+ }
+ else
+ {
+ sql_print_error("Missing magic number for semi-sync packet, packet "
+ "len: %lu", total_len);
+ read_res = -1;
+ }
+ } else {
+ *payload= header;
+ *payload_len= total_len;
}
return function_exit(kWho, read_res);
}
-int ReplSemiSyncSlave::slaveStart(Binlog_relay_IO_param *param)
+int ReplSemiSyncSlave::slaveStart(Master_info *mi)
{
bool semi_sync= getSlaveEnabled();
sql_print_information("Slave I/O thread: Start %s replication to\
master '%s@%s:%d' in log '%s' at position %lu",
semi_sync ? "semi-sync" : "asynchronous",
- param->user, param->host, param->port,
- param->master_log_name[0] ? param->master_log_name : "FIRST",
- (unsigned long)param->master_log_pos);
+ const_cast<char *>(mi->user), mi->host, mi->port,
+ const_cast<char *>(mi->master_log_name),
+ (unsigned long)(mi->master_log_pos));
if (semi_sync && !rpl_semi_sync_slave_status)
rpl_semi_sync_slave_status= 1;
+
+ /*clear the counter*/
+ rpl_semi_sync_slave_send_ack= 0;
return 0;
}
-int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param)
+int ReplSemiSyncSlave::slaveStop(Master_info *mi)
{
if (rpl_semi_sync_slave_status)
rpl_semi_sync_slave_status= 0;
- if (mysql_reply)
- mysql_close(mysql_reply);
- mysql_reply= 0;
+ if (getSlaveEnabled())
+ killConnection(mi->mysql);
return 0;
}
-int ReplSemiSyncSlave::slaveReply(MYSQL *mysql,
- const char *binlog_filename,
- my_off_t binlog_filepos)
+int ReplSemiSyncSlave::resetSlave(Master_info *mi)
{
- const char *kWho = "ReplSemiSyncSlave::slaveReply";
- NET *net= &mysql->net;
- uchar reply_buffer[REPLY_MAGIC_NUM_LEN
- + REPLY_BINLOG_POS_LEN
- + REPLY_BINLOG_NAME_LEN];
- int reply_res, name_len = strlen(binlog_filename);
-
- function_enter(kWho);
-
- /* Prepare the buffer of the reply. */
- reply_buffer[REPLY_MAGIC_NUM_OFFSET] = kPacketMagicNum;
- int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos);
- memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET,
- binlog_filename,
- name_len + 1 /* including trailing '\0' */);
-
- if (trace_level_ & kTraceDetail)
- sql_print_information("%s: reply (%s, %lu)", kWho,
- binlog_filename, (ulong)binlog_filepos);
-
- net_clear(net, 0);
- /* Send the reply. */
- reply_res = my_net_write(net, reply_buffer,
- name_len + REPLY_BINLOG_NAME_OFFSET);
- if (!reply_res)
- {
- reply_res = net_flush(net);
- if (reply_res)
- sql_print_error("Semi-sync slave net_flush() reply failed");
- }
- else
- {
- sql_print_error("Semi-sync slave send reply failed: %s (%d)",
- net->last_error, net->last_errno);
- }
-
- return function_exit(kWho, reply_res);
+ return 0;
}
-/***************************************************************************
- Semisync slave interface setup and deinit
-***************************************************************************/
+void ReplSemiSyncSlave::killConnection(MYSQL *mysql)
+{
+ if (!mysql)
+ return;
-C_MODE_START
+ char kill_buffer[30];
+ MYSQL *kill_mysql = NULL;
+ kill_mysql = mysql_init(kill_mysql);
+ mysql_options(kill_mysql, MYSQL_OPT_CONNECT_TIMEOUT, &kill_conn_timeout_);
+ mysql_options(kill_mysql, MYSQL_OPT_READ_TIMEOUT, &kill_conn_timeout_);
+ mysql_options(kill_mysql, MYSQL_OPT_WRITE_TIMEOUT, &kill_conn_timeout_);
-int repl_semi_reset_slave(Binlog_relay_IO_param *param)
-{
- // TODO: reset semi-sync slave status here
- return 0;
+ bool ret= (!mysql_real_connect(kill_mysql, mysql->host,
+ mysql->user, mysql->passwd,0, mysql->port, mysql->unix_socket, 0));
+ if (DBUG_EVALUATE_IF("semisync_slave_failed_kill", 1, 0) || ret)
+ {
+ sql_print_information("cannot connect to master to kill slave io_thread's "
+ "connection");
+ if (!ret)
+ mysql_close(kill_mysql);
+ return;
+ }
+ uint kill_buffer_length = my_snprintf(kill_buffer, 30, "KILL %lu",
+ mysql->thread_id);
+ mysql_real_query(kill_mysql, kill_buffer, kill_buffer_length);
+ mysql_close(kill_mysql);
}
-int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
- uint32 flags)
+int ReplSemiSyncSlave::requestTransmit(Master_info *mi)
{
- MYSQL *mysql= param->mysql;
+ MYSQL *mysql= mi->mysql;
MYSQL_RES *res= 0;
MYSQL_ROW row;
const char *query;
- if (!repl_semisync_slave.getSlaveEnabled())
+ if (!getSlaveEnabled())
return 0;
- /* Check if master server has semi-sync plugin installed */
query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'";
if (mysql_real_query(mysql, query, strlen(query)) ||
!(res= mysql_store_result(mysql)))
{
- sql_print_error("Execution failed on master: %s", query);
+ sql_print_error("Execution failed on master: %s, error :%s", query, mysql_error(mysql));
return 1;
}
row= mysql_fetch_row(res);
- if (!row)
+ if (DBUG_EVALUATE_IF("master_not_support_semisync", 1, 0)
+ || !row)
{
/* Master does not support semi-sync */
sql_print_warning("Master server does not support semi-sync, "
@@ -195,8 +186,8 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
mysql_free_result(res);
/*
- Tell master dump thread that we want to do semi-sync
- replication
+ Tell master dump thread that we want to do semi-sync
+ replication
*/
query= "SET @rpl_semi_sync_slave= 1";
if (mysql_real_query(mysql, query, strlen(query)))
@@ -206,83 +197,56 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
}
mysql_free_result(mysql_store_result(mysql));
rpl_semi_sync_slave_status= 1;
- return 0;
-}
-int repl_semi_slave_read_event(Binlog_relay_IO_param *param,
- const char *packet, unsigned long len,
- const char **event_buf, unsigned long *event_len)
-{
- if (rpl_semi_sync_slave_status)
- return repl_semisync_slave.slaveReadSyncHeader(packet, len,
- &semi_sync_need_reply,
- event_buf, event_len);
- *event_buf= packet;
- *event_len= len;
- return 0;
-}
-
-int repl_semi_slave_queue_event(Binlog_relay_IO_param *param,
- const char *event_buf,
- unsigned long event_len,
- uint32 flags)
-{
- if (rpl_semi_sync_slave_status && semi_sync_need_reply)
- {
- /*
- We deliberately ignore the error in slaveReply, such error
- should not cause the slave IO thread to stop, and the error
- messages are already reported.
- */
- (void) repl_semisync_slave.slaveReply(param->mysql,
- param->master_log_name,
- param->master_log_pos);
- }
return 0;
}
-int repl_semi_slave_io_start(Binlog_relay_IO_param *param)
-{
- return repl_semisync_slave.slaveStart(param);
-}
-
-int repl_semi_slave_io_end(Binlog_relay_IO_param *param)
+int ReplSemiSyncSlave::slaveReply(Master_info *mi)
{
- return repl_semisync_slave.slaveStop(param);
-}
-
-C_MODE_END
+ const char *kWho = "ReplSemiSyncSlave::slaveReply";
+ MYSQL* mysql= mi->mysql;
+ const char *binlog_filename= const_cast<char *>(mi->master_log_name);
+ my_off_t binlog_filepos= mi->master_log_pos;
-Binlog_relay_IO_observer relay_io_observer=
-{
- sizeof(Binlog_relay_IO_observer), // len
+ NET *net= &mysql->net;
+ uchar reply_buffer[REPLY_MAGIC_NUM_LEN
+ + REPLY_BINLOG_POS_LEN
+ + REPLY_BINLOG_NAME_LEN];
+ int reply_res = 0;
+ int name_len = strlen(binlog_filename);
- repl_semi_slave_io_start, // start
- repl_semi_slave_io_end, // stop
- repl_semi_slave_request_dump, // request_transmit
- repl_semi_slave_read_event, // after_read_event
- repl_semi_slave_queue_event, // after_queue_event
- repl_semi_reset_slave, // reset
-};
+ function_enter(kWho);
-static bool semi_sync_slave_inited= 0;
+ if (rpl_semi_sync_slave_status && semi_sync_need_reply)
+ {
+ /* Prepare the buffer of the reply. */
+ reply_buffer[REPLY_MAGIC_NUM_OFFSET] = kPacketMagicNum;
+ int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos);
+ memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET,
+ binlog_filename,
+ name_len + 1 /* including trailing '\0' */);
-int semi_sync_slave_init()
-{
- void *p= 0;
- if (repl_semisync_slave.initObject())
- return 1;
- if (register_binlog_relay_io_observer(&relay_io_observer, p))
- return 1;
- semi_sync_slave_inited= 1;
- return 0;
-}
+ if (trace_level_ & kTraceDetail)
+ sql_print_information("%s: reply (%s, %lu)", kWho,
+ binlog_filename, (ulong)binlog_filepos);
+
+ net_clear(net, 0);
+ /* Send the reply. */
+ reply_res = my_net_write(net, reply_buffer,
+ name_len + REPLY_BINLOG_NAME_OFFSET);
+ if (!reply_res)
+ {
+ reply_res = DBUG_EVALUATE_IF("semislave_failed_net_flush", 1, net_flush(net));
+ if (reply_res)
+ sql_print_error("Semi-sync slave net_flush() reply failed");
+ rpl_semi_sync_slave_send_ack++;
+ }
+ else
+ {
+ sql_print_error("Semi-sync slave send reply failed: %s (%d)",
+ net->last_error, net->last_errno);
+ }
+ }
-void semi_sync_slave_deinit()
-{
- void *p= 0;
- if (!semi_sync_slave_inited)
- return;
- unregister_binlog_relay_io_observer(&relay_io_observer, p);
- semi_sync_slave_inited= 0;
+ return function_exit(kWho, reply_res);
}
diff --git a/sql/semisync_slave.h b/sql/semisync_slave.h
index 6bc10b0d479..0df4445ee4a 100644
--- a/sql/semisync_slave.h
+++ b/sql/semisync_slave.h
@@ -19,6 +19,10 @@
#define SEMISYNC_SLAVE_H
#include "semisync.h"
+#include "my_global.h"
+#include "sql_priv.h"
+#include "rpl_mi.h"
+#include "mysql.h"
/**
The extension class for the slave of semi-synchronous replication
@@ -44,49 +48,54 @@ public:
return slave_enabled_;
}
void setSlaveEnabled(bool enabled) {
- run_hooks_enabled|= enabled;
slave_enabled_ = enabled;
}
+
+ bool isDelayMaster(){
+ return delay_master_;
+ }
+
+ void setDelayMaster(bool enabled) {
+ delay_master_ = enabled;
+ }
+
+ void setKillConnTimeout(unsigned int timeout) {
+ kill_conn_timeout_ = timeout;
+ }
/* A slave reads the semi-sync packet header and separate the metadata
* from the payload data.
- *
+ *
* Input:
* header - (IN) packet header pointer
* total_len - (IN) total packet length: metadata + payload
- * need_reply - (IN) whether the master is waiting for the reply
+ * semi_flags - (IN) store flags: SEMI_SYNC_SLAVE_DELAY_SYNC and SEMI_SYNC_NEED_ACK
* payload - (IN) payload: the replication event
* payload_len - (IN) payload length
*
* Return:
* 0: success; non-zero: error
*/
- int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply,
+ int slaveReadSyncHeader(const char *header, unsigned long total_len, int *semi_flags,
const char **payload, unsigned long *payload_len);
/* A slave replies to the master indicating its replication process. It
* indicates that the slave has received all events before the specified
* binlog position.
- *
- * Input:
- * mysql - (IN) the mysql network connection
- * binlog_filename - (IN) the reply point's binlog file name
- * binlog_filepos - (IN) the reply point's binlog file offset
- *
- * Return:
- * 0: success; non-zero: error
*/
- int slaveReply(MYSQL *mysql, const char *binlog_filename,
- my_off_t binlog_filepos);
-
- int slaveStart(Binlog_relay_IO_param *param);
- int slaveStop(Binlog_relay_IO_param *param);
+ int slaveReply(Master_info* mi);
+ int slaveStart(Master_info *mi);
+ int slaveStop(Master_info *mi);
+ int requestTransmit(Master_info*);
+ void killConnection(MYSQL *mysql);
+ int resetSlave(Master_info *mi);
private:
/* True when initObject has been called */
bool init_done_;
bool slave_enabled_; /* semi-sycn is enabled on the slave */
- MYSQL *mysql_reply; /* connection to send reply */
+ bool delay_master_;
+ unsigned int kill_conn_timeout_;
};
@@ -96,7 +105,8 @@ extern my_bool rpl_semi_sync_slave_status;
extern ulong rpl_semi_sync_slave_trace_level;
extern ReplSemiSyncSlave repl_semisync_slave;
-int semi_sync_slave_init();
-void semi_sync_slave_deinit();
+extern char rpl_semi_sync_slave_delay_master;
+extern unsigned int rpl_semi_sync_slave_kill_conn_timeout;
+extern unsigned long long rpl_semi_sync_slave_send_ack;
#endif /* SEMISYNC_SLAVE_H */
diff --git a/sql/slave.cc b/sql/slave.cc
index a8334525345..b4a817d6ecd 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -61,6 +61,7 @@
#include "debug_sync.h"
#include "rpl_parallel.h"
#include "sql_show.h"
+#include "semisync_slave.h"
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
@@ -3590,7 +3591,9 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
before_request_transmit,
(thd, mi, binlog_flags)))
DBUG_RETURN(1);
-
+ if (repl_semisync_slave.requestTransmit(mi))
+ DBUG_RETURN(1);
+
// TODO if big log files: Change next to int8store()
int4store(buf, (ulong) mi->master_log_pos);
int2store(buf + 4, binlog_flags);
@@ -4615,7 +4618,9 @@ pthread_handler_t handle_slave_io(void *arg)
}
- if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)))
+ if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)) ||
+ (DBUG_EVALUATE_IF("failed_slave_start", 1, 0)
+ || repl_semisync_slave.slaveStart(mi)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
ER_THD(thd, ER_SLAVE_FATAL_ERROR),
@@ -4805,9 +4810,13 @@ Stopping slave I/O thread due to out-of-memory error from master");
retry_count=0; // ok event, reset retry counter
THD_STAGE_INFO(thd, stage_queueing_master_event_to_the_relay_log);
event_buf= (const char*)mysql->net.read_pos + 1;
+ mi->semi_ack= 0;
if (RUN_HOOK(binlog_relay_io, after_read_event,
(thd, mi,(const char*)mysql->net.read_pos + 1,
- event_len, &event_buf, &event_len)))
+ event_len, &event_buf, &event_len)) ||
+ repl_semisync_slave.
+ slaveReadSyncHeader((const char*)mysql->net.read_pos + 1, event_len,
+ &(mi->semi_ack), &event_buf, &event_len))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
ER_THD(thd, ER_SLAVE_FATAL_ERROR),
@@ -4868,7 +4877,10 @@ Stopping slave I/O thread due to out-of-memory error from master");
}
if (RUN_HOOK(binlog_relay_io, after_queue_event,
- (thd, mi, event_buf, event_len, synced)))
+ (thd, mi, event_buf, event_len, synced)) ||
+ (rpl_semi_sync_slave_status &&
+ (mi->semi_ack & SEMI_SYNC_NEED_ACK) &&
+ repl_semisync_slave.slaveReply(mi)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
ER_THD(thd, ER_SLAVE_FATAL_ERROR),
@@ -4877,7 +4889,16 @@ Stopping slave I/O thread due to out-of-memory error from master");
}
if (mi->using_gtid == Master_info::USE_GTID_NO &&
- flush_master_info(mi, TRUE, TRUE))
+ /*
+ If rpl_semi_sync_slave_delay_master is enabled, we will flush
+ master info only when ack is needed. This may lead to at least one
+ group transaction delay but affords better performance improvement.
+ */
+ (!repl_semisync_slave.getSlaveEnabled() ||
+ (!(mi->semi_ack & SEMI_SYNC_SLAVE_DELAY_SYNC) ||
+ (mi->semi_ack & (SEMI_SYNC_NEED_ACK)))) &&
+ (DBUG_EVALUATE_IF("failed_flush_master_info", 1, 0) ||
+ flush_master_info(mi, TRUE, TRUE)))
{
sql_print_error("Failed to flush master info file");
goto err;
@@ -4931,7 +4952,8 @@ err:
IO_RPL_LOG_NAME, mi->master_log_pos,
tmp.c_ptr_safe());
}
- RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi));
+ (void) RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi));
+ repl_semisync_slave.slaveStop(mi);
thd->reset_query();
thd->reset_db(NULL, 0);
if (mysql)
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 35ed9be74f9..d0aa0818e99 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -706,7 +706,7 @@ extern "C" void thd_kill_timeout(THD* thd)
}
-THD::THD(my_thread_id id, bool is_wsrep_applier)
+THD::THD(my_thread_id id, bool is_wsrep_applier, bool skip_global_sys_var_lock)
:Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION,
/* statement id */ 0),
rli_fake(0), rgi_fake(0), rgi_slave(NULL),
@@ -893,7 +893,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
/* Call to init() below requires fully initialized Open_tables_state. */
reset_open_tables_state(this);
- init();
+ init(skip_global_sys_var_lock);
#if defined(ENABLED_PROFILING)
profiling.set_thd(this);
#endif
@@ -1264,10 +1264,11 @@ const Type_handler *THD::type_handler_for_date() const
Init common variables that has to be reset on start and on change_user
*/
-void THD::init(void)
+void THD::init(bool skip_lock)
{
DBUG_ENTER("thd::init");
- mysql_mutex_lock(&LOCK_global_system_variables);
+ if (!skip_lock)
+ mysql_mutex_lock(&LOCK_global_system_variables);
plugin_thdvar_init(this);
/*
plugin_thd_var_init() sets variables= global_system_variables, which
@@ -1280,8 +1281,8 @@ void THD::init(void)
::strmake(default_master_connection_buff,
global_system_variables.default_master_connection.str,
variables.default_master_connection.length);
-
- mysql_mutex_unlock(&LOCK_global_system_variables);
+ if (!skip_lock)
+ mysql_mutex_unlock(&LOCK_global_system_variables);
user_time.val= start_time= start_time_sec_part= 0;
@@ -4193,7 +4194,8 @@ my_bool thd_net_is_killed()
void thd_increment_bytes_received(void *thd, ulong length)
{
- ((THD*) thd)->status_var.bytes_received+= length;
+ if (thd != NULL) // MDEV-13073 Ack collector having NULL
+ ((THD*) thd)->status_var.bytes_received+= length;
}
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 4249bc6bb5b..66c490c6acf 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -1569,7 +1569,8 @@ enum enum_thread_type
SYSTEM_THREAD_EVENT_WORKER= 16,
SYSTEM_THREAD_BINLOG_BACKGROUND= 32,
SYSTEM_THREAD_SLAVE_BACKGROUND= 64,
- SYSTEM_THREAD_GENERIC= 128
+ SYSTEM_THREAD_GENERIC= 128,
+ SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND= 256
};
inline char const *
@@ -1585,6 +1586,7 @@ show_system_thread(enum_thread_type thread)
RETURN_NAME_AS_STRING(SYSTEM_THREAD_EVENT_SCHEDULER);
RETURN_NAME_AS_STRING(SYSTEM_THREAD_EVENT_WORKER);
RETURN_NAME_AS_STRING(SYSTEM_THREAD_SLAVE_BACKGROUND);
+ RETURN_NAME_AS_STRING(SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND);
default:
sprintf(buf, "<UNKNOWN SYSTEM THREAD: %d>", thread);
return buf;
@@ -2261,7 +2263,8 @@ public:
/* Needed by MariaDB semi sync replication */
Trans_binlog_info *semisync_info;
-
+ /* If this is a semisync slave connection. */
+ bool semi_sync_slave;
ulonglong client_capabilities; /* What the client supports */
ulong max_client_packet_length;
@@ -3147,11 +3150,20 @@ public:
/* Debug Sync facility. See debug_sync.cc. */
struct st_debug_sync_control *debug_sync_control;
#endif /* defined(ENABLED_DEBUG_SYNC) */
- THD(my_thread_id id, bool is_wsrep_applier= false);
+ /**
+ @param id thread identifier
+ @param is_wsrep_applier thread type
+ @param skip_lock instruct whether @c LOCK_global_system_variables
+ is already locked, to not acquire it then.
+ */
+ THD(my_thread_id id, bool is_wsrep_applier= false, bool skip_lock= false);
~THD();
-
- void init(void);
+ /**
+ @param skip_lock instruct whether @c LOCK_global_system_variables
+ is already locked, to not acquire it then.
+ */
+ void init(bool skip_lock= false);
/*
Initialize memory roots necessary for query processing and (!)
pre-allocate memory for it. We can't do that in THD constructor because
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index fcbf0ce1bd0..36fa6120584 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -30,7 +30,8 @@
#include <my_dir.h>
#include "rpl_handler.h"
#include "debug_sync.h"
-#include "log.h" // get_gtid_list_event
+#include "semisync_master.h"
+#include "semisync_slave.h"
enum enum_gtid_until_state {
GTID_UNTIL_NOT_DONE,
@@ -160,6 +161,7 @@ struct binlog_send_info {
bool clear_initial_log_pos;
bool should_stop;
+ size_t dirlen;
binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg,
char *lfn)
@@ -315,14 +317,30 @@ static int reset_transmit_packet(binlog_send_info *info, ushort flags,
if (RUN_HOOK(binlog_transmit, reserve_header, (info->thd, flags, packet)))
{
+ /* RUN_HOOK() must return zero when thd->semi_sync_slave */
+ DBUG_ASSERT(!info->thd->semi_sync_slave);
+
info->error= ER_UNKNOWN_ERROR;
*errmsg= "Failed to run hook 'reserve_header'";
ret= 1;
}
+ if (info->thd->semi_sync_slave)
+ {
+ repl_semisync_master.reserveSyncHeader(packet);
+ }
+
*ev_offset= packet->length();
return ret;
}
+inline bool is_semi_sync_slave()
+{
+ int null_value;
+ long long val= 0;
+ get_user_var_int("rpl_semi_sync_slave", &val, &null_value);
+ return val;
+}
+
static int send_file(THD *thd)
{
NET* net = &thd->net;
@@ -875,73 +893,6 @@ get_binlog_list(MEM_ROOT *memroot)
DBUG_RETURN(current_list);
}
-/*
- Find the Gtid_list_log_event at the start of a binlog.
-
- NULL for ok, non-NULL error message for error.
-
- If ok, then the event is returned in *out_gtid_list. This can be NULL if we
- get back to binlogs written by old server version without GTID support. If
- so, it means we have reached the point to start from, as no GTID events can
- exist in earlier binlogs.
-*/
-
-static const char *
-get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
-{
- Format_description_log_event init_fdle(BINLOG_VERSION);
- Format_description_log_event *fdle;
- Log_event *ev;
- const char *errormsg = NULL;
-
- *out_gtid_list= NULL;
-
- if (!(ev= Log_event::read_log_event(cache, &init_fdle,
- opt_master_verify_checksum)) ||
- ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
- {
- if (ev)
- delete ev;
- return "Could not read format description log event while looking for "
- "GTID position in binlog";
- }
-
- fdle= static_cast<Format_description_log_event *>(ev);
-
- for (;;)
- {
- Log_event_type typ;
-
- ev= Log_event::read_log_event(cache, fdle, opt_master_verify_checksum);
- if (!ev)
- {
- errormsg= "Could not read GTID list event while looking for GTID "
- "position in binlog";
- break;
- }
- typ= ev->get_type_code();
- if (typ == GTID_LIST_EVENT)
- break; /* Done, found it */
- if (typ == START_ENCRYPTION_EVENT)
- {
- if (fdle->start_decryption((Start_encryption_log_event*) ev))
- errormsg= "Could not set up decryption for binlog.";
- }
- delete ev;
- if (typ == ROTATE_EVENT || typ == STOP_EVENT ||
- typ == FORMAT_DESCRIPTION_EVENT || typ == START_ENCRYPTION_EVENT)
- continue; /* Continue looking */
-
- /* We did not find any Gtid_list_log_event, must be old binlog. */
- ev= NULL;
- break;
- }
-
- delete fdle;
- *out_gtid_list= static_cast<Gtid_list_log_event *>(ev);
- return errormsg;
-}
-
/*
Check if every GTID requested by the slave is contained in this (or a later)
@@ -1673,6 +1624,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
enum enum_binlog_checksum_alg current_checksum_alg= info->current_checksum_alg;
slave_connection_state *gtid_state= &info->gtid_state;
slave_connection_state *until_gtid_state= info->until_gtid_state;
+ bool need_sync= false;
if (event_type == GTID_LIST_EVENT &&
info->using_gtid_state && until_gtid_state)
@@ -1984,7 +1936,10 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
pos= my_b_tell(log);
if (RUN_HOOK(binlog_transmit, before_send_event,
- (info->thd, info->flags, packet, info->log_file_name, pos)))
+ (info->thd, info->flags, packet, info->log_file_name, pos)) ||
+ repl_semisync_master.updateSyncHeader(info->thd, (uchar *)packet->c_ptr(),
+ info->log_file_name + info->dirlen,
+ pos, &need_sync))
{
info->error= ER_UNKNOWN_ERROR;
return "run 'before_send_event' hook failed";
@@ -2012,6 +1967,8 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
info->error= ER_UNKNOWN_ERROR;
return "Failed to run hook 'after_send_event'";
}
+ if (need_sync)
+ repl_semisync_master.flushNet(info->thd, packet->c_ptr());
return NULL; /* Success */
}
@@ -2748,7 +2705,7 @@ static int send_one_binlog_file(binlog_send_info *info,
/** end of file or error */
return (int)end_pos;
}
-
+ info->dirlen= dirname_length(info->log_file_name);
/**
* send events from current position up to end_pos
*/
@@ -2770,6 +2727,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
binlog_send_info infoobj(thd, packet, flags, linfo.log_file_name);
binlog_send_info *info= &infoobj;
+ bool has_transmit_started= false;
int old_max_allowed_packet= thd->variables.max_allowed_packet;
thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET;
@@ -2792,6 +2750,11 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
info->error= ER_UNKNOWN_ERROR;
goto err;
}
+ has_transmit_started= true;
+
+ /* Check if the dump thread is created by a slave with semisync enabled. */
+ thd->semi_sync_slave = is_semi_sync_slave();
+ repl_semisync_master.dump_start(thd, log_ident, pos);
/*
heartbeat_period from @master_heartbeat_period user variable
@@ -2908,7 +2871,11 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
err:
THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
- RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
+ (void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
+ if (has_transmit_started)
+ {
+ repl_semisync_master.dump_end(thd);
+ }
if (info->thd->killed == KILL_SLAVE_SAME_ID)
{
@@ -3374,7 +3341,9 @@ int reset_slave(THD *thd, Master_info* mi)
else if (global_system_variables.log_warnings > 1)
sql_print_information("Deleted Master_info file '%s'.", fname);
- RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
+ (void) RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
+ if (rpl_semi_sync_slave_enabled)
+ repl_semisync_slave.resetSlave(mi);
err:
mi->unlock_slave_threads();
if (error)
@@ -3876,11 +3845,14 @@ int reset_master(THD* thd, rpl_gtid *init_state, uint32 init_state_len,
return 1;
}
- if (mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len,
- next_log_number))
- return 1;
- RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
- return 0;
+ bool ret= 0;
+ /* Temporarily disable master semisync before reseting master. */
+ repl_semisync_master.beforeResetMaster();
+ ret= mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len,
+ next_log_number);
+ (void) RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
+ repl_semisync_master.afterResetMaster();
+ return ret;
}
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index e897b6c21ce..9eac6017c7c 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -3051,22 +3051,18 @@ static bool fix_rpl_semi_sync_master_enabled(sys_var *self, THD *thd,
{
if (repl_semisync_master.enableMaster() != 0)
rpl_semi_sync_master_enabled= false;
-#ifdef HAVE_ACC_RECEIVER
else if (ack_receiver.start())
{
repl_semisync_master.disableMaster();
rpl_semi_sync_master_enabled= false;
}
-#endif
}
else
{
if (repl_semisync_master.disableMaster() != 0)
rpl_semi_sync_master_enabled= true;
-#ifdef HAVE_ACC_RECEIVER
if (!rpl_semi_sync_master_enabled)
ack_receiver.stop();
-#endif
}
return false;
}
@@ -3082,27 +3078,21 @@ static bool fix_rpl_semi_sync_master_trace_level(sys_var *self, THD *thd,
enum_var_type type)
{
repl_semisync_master.setTraceLevel(rpl_semi_sync_master_trace_level);
-#ifdef HAVE_ACC_RECEIVER
ack_receiver.setTraceLevel(rpl_semi_sync_master_trace_level);
-#endif
return false;
}
static bool fix_rpl_semi_sync_master_wait_point(sys_var *self, THD *thd,
enum_var_type type)
{
-#ifdef HAVE_ACC_RECEIVER
repl_semisync_master.setWaitPoint(rpl_semi_sync_master_wait_point);
-#endif
return false;
}
static bool fix_rpl_semi_sync_master_wait_no_slave(sys_var *self, THD *thd,
enum_var_type type)
{
-#ifdef HAVE_ACC_RECEIVER
repl_semisync_master.checkAndSwitch();
-#endif
return false;
}
@@ -3168,7 +3158,6 @@ static bool fix_rpl_semi_sync_slave_trace_level(sys_var *self, THD *thd,
return false;
}
-#ifdef HAVE_ACC_RECEIVER
static bool fix_rpl_semi_sync_slave_delay_master(sys_var *self, THD *thd,
enum_var_type type)
{
@@ -3182,8 +3171,6 @@ static bool fix_rpl_semi_sync_slave_kill_conn_timeout(sys_var *self, THD *thd,
repl_semisync_slave.setKillConnTimeout(rpl_semi_sync_slave_kill_conn_timeout);
return false;
}
-#endif
-
static Sys_var_mybool Sys_semisync_slave_enabled(
"rpl_semi_sync_slave_enabled",
@@ -3202,7 +3189,6 @@ static Sys_var_ulong Sys_semisync_slave_trace_level(
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
ON_UPDATE(fix_rpl_semi_sync_slave_trace_level));
-#ifdef HAVE_ACC_RECEIVER
static Sys_var_mybool Sys_semisync_slave_delay_master(
"rpl_semi_sync_slave_delay_master",
"Only write master info file when ack is needed.",
@@ -3221,7 +3207,6 @@ static Sys_var_uint Sys_semisync_slave_kill_conn_timeout(
VALID_RANGE(0, UINT_MAX), DEFAULT(5), BLOCK_SIZE(1),
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
ON_UPDATE(fix_rpl_semi_sync_slave_kill_conn_timeout));
-#endif
#endif /* HAVE_REPLICATION */
static Sys_var_ulong Sys_slow_launch_time(
diff --git a/sql/transaction.cc b/sql/transaction.cc
index cbd875e3114..f6ccf5a1930 100644
--- a/sql/transaction.cc
+++ b/sql/transaction.cc
@@ -24,7 +24,7 @@
#include "rpl_handler.h"
#include "debug_sync.h" // DEBUG_SYNC
#include "sql_acl.h"
-
+#include "semisync_master.h"
#ifndef EMBEDDED_LIBRARY
/**
@@ -318,9 +318,19 @@ bool trans_commit(THD *thd)
transaction, so the hooks for rollback will be called.
*/
if (res)
+ {
(void) RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+#ifdef HAVE_REPLICATION
+ repl_semisync_master.waitAfterRollback(thd, FALSE);
+#endif
+ }
else
+ {
(void) RUN_HOOK(transaction, after_commit, (thd, FALSE));
+#ifdef HAVE_REPLICATION
+ repl_semisync_master.waitAfterCommit(thd, FALSE);
+#endif
+ }
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
thd->transaction.all.reset();
thd->lex->start_transaction_opt= 0;
@@ -414,6 +424,9 @@ bool trans_rollback(THD *thd)
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
res= ha_rollback_trans(thd, TRUE);
(void) RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+#ifdef HAVE_REPLICATION
+ repl_semisync_master.waitAfterRollback(thd, FALSE);
+#endif
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
/* Reset the binlog transaction marker */
thd->variables.option_bits&= ~OPTION_GTID_BEGIN;
@@ -526,9 +539,19 @@ bool trans_commit_stmt(THD *thd)
transaction, so the hooks for rollback will be called.
*/
if (res)
+ {
(void) RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+#ifdef HAVE_REPLICATION
+ repl_semisync_master.waitAfterRollback(thd, FALSE);
+#endif
+ }
else
+ {
(void) RUN_HOOK(transaction, after_commit, (thd, FALSE));
+#ifdef HAVE_REPLICATION
+ repl_semisync_master.waitAfterCommit(thd, FALSE);
+#endif
+ }
thd->transaction.stmt.reset();
@@ -568,6 +591,9 @@ bool trans_rollback_stmt(THD *thd)
}
(void) RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+#ifdef HAVE_REPLICATION
+ repl_semisync_master.waitAfterRollback(thd, FALSE);
+#endif
thd->transaction.stmt.reset();