summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMonty <monty@mariadb.org>2017-10-25 11:07:44 +0300
committerMonty <monty@mariadb.org>2017-12-18 13:43:36 +0200
commit2e53b96a0aa9dcb18d8bbe12e5bc7e0aba208540 (patch)
tree112b522a5ae3a3132266052e38edcdac0a919e1a /sql
parent77030649fb1f492b6dd9351a7d4b36e1aeb29f4d (diff)
downloadmariadb-git-2e53b96a0aa9dcb18d8bbe12e5bc7e0aba208540.tar.gz
Moved semisync from a plugin to normal server
Part of MDEV-13073 AliSQL Optimize performance of semisync Did the following renames to match other similar variables key_ss_mutex_LOCK_binlog_ > key_LOCK_bing key_ss_cond_COND_binlog_send_ -> key_COND_binlog_send COND_binlog_send_ -> COND_binlog_send LOCK_binlog_ -> LOCK_binlog debian/mariadb-server-10.2.install does not install semisync libs.
Diffstat (limited to 'sql')
-rw-r--r--sql/CMakeLists.txt3
-rw-r--r--sql/mysqld.cc66
-rw-r--r--sql/replication.h14
-rw-r--r--sql/rpl_handler.cc8
-rw-r--r--sql/semisync.cc32
-rw-r--r--sql/semisync.h83
-rw-r--r--sql/semisync_master.cc1430
-rw-r--r--sql/semisync_master.h636
-rw-r--r--sql/semisync_slave.cc288
-rw-r--r--sql/semisync_slave.h101
-rw-r--r--sql/sys_vars.cc182
11 files changed, 2827 insertions, 16 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt
index 0f67032bcbe..6c63f3feca3 100644
--- a/sql/CMakeLists.txt
+++ b/sql/CMakeLists.txt
@@ -36,7 +36,7 @@ ELSE()
ENDIF()
INCLUDE_DIRECTORIES(
-${CMAKE_SOURCE_DIR}/include
+${CMAKE_SOURCE_DIR}/include
${CMAKE_SOURCE_DIR}/sql
${PCRE_INCLUDES}
${ZLIB_INCLUDE_DIR}
@@ -138,6 +138,7 @@ SET (SQL_SOURCE
my_apc.cc mf_iocache_encr.cc item_jsonfunc.cc
my_json_writer.cc
rpl_gtid.cc rpl_parallel.cc
+ semisync.cc semisync_master.cc semisync_slave.cc
sql_type.cc
item_windowfunc.cc sql_window.cc
sql_cte.cc
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index f3cb39959a7..fc783ae5559 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -97,8 +97,9 @@
#include "set_var.h"
#include "rpl_injector.h"
-
#include "rpl_handler.h"
+#include "semisync_master.h"
+#include "semisync_slave.h"
#ifdef HAVE_SYS_PRCTL_H
#include <sys/prctl.h>
@@ -934,6 +935,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
PSI_mutex_key key_RELAYLOG_LOCK_index;
PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool, key_LOCK_parallel_entry;
+PSI_mutex_key key_LOCK_binlog;
PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
@@ -1021,7 +1023,8 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_binlog_state, "LOCK_binlog_state", 0},
{ &key_LOCK_rpl_thread, "LOCK_rpl_thread", 0},
{ &key_LOCK_rpl_thread_pool, "LOCK_rpl_thread_pool", 0},
- { &key_LOCK_parallel_entry, "LOCK_parallel_entry", 0}
+ { &key_LOCK_parallel_entry, "LOCK_parallel_entry", 0},
+ { &key_LOCK_binlog, "LOCK_binlog", 0}
};
PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
@@ -1062,7 +1065,7 @@ PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
key_rpl_group_info_sleep_cond,
key_TABLE_SHARE_cond, key_user_level_lock_cond,
key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache,
- key_COND_start_thread,
+ key_COND_start_thread, key_COND_binlog_send,
key_BINLOG_COND_queue_busy;
PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
key_COND_wait_commit;
@@ -1124,7 +1127,8 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_slave_background, "COND_slave_background", 0},
{ &key_COND_start_thread, "COND_start_thread", PSI_FLAG_GLOBAL},
{ &key_COND_wait_gtid, "COND_wait_gtid", 0},
- { &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0}
+ { &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0},
+ { &key_COND_binlog_send, "COND_binlog_send", 0}
};
PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
@@ -2217,6 +2221,10 @@ void clean_up(bool print_message)
ha_end();
if (tc_log)
tc_log->close();
+#ifdef HAVE_REPLICATION
+ semi_sync_master_deinit();
+ semi_sync_slave_deinit();
+#endif
delegates_destroy();
xid_cache_free();
tdc_deinit();
@@ -5170,6 +5178,9 @@ static int init_server_components()
"this server. However this will be ignored as the "
"--log-bin option is not defined.");
}
+
+ semi_sync_master_init();
+ semi_sync_slave_init();
#endif
if (opt_bin_log)
@@ -8230,6 +8241,27 @@ static int show_ssl_get_cipher_list(THD *thd, SHOW_VAR *var, char *buff,
return 0;
}
+#define SHOW_FNAME(name) \
+ rpl_semi_sync_master_show_##name
+
+#define DEF_SHOW_FUNC(name, show_type) \
+ static int SHOW_FNAME(name)(MYSQL_THD thd, SHOW_VAR *var, char *buff) \
+ { \
+ repl_semisync_master.setExportStats(); \
+ var->type= show_type; \
+ var->value= (char *)&rpl_semi_sync_master_##name; \
+ return 0; \
+ }
+
+DEF_SHOW_FUNC(status, SHOW_BOOL)
+DEF_SHOW_FUNC(clients, SHOW_LONG)
+DEF_SHOW_FUNC(wait_sessions, SHOW_LONG)
+DEF_SHOW_FUNC(trx_wait_time, SHOW_LONGLONG)
+DEF_SHOW_FUNC(trx_wait_num, SHOW_LONGLONG)
+DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG)
+DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
+DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG)
+DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG)
#ifdef HAVE_YASSL
@@ -8548,6 +8580,30 @@ SHOW_VAR status_vars[]= {
{"Rows_sent", (char*) offsetof(STATUS_VAR, rows_sent), SHOW_LONGLONG_STATUS},
{"Rows_read", (char*) offsetof(STATUS_VAR, rows_read), SHOW_LONGLONG_STATUS},
{"Rows_tmp_read", (char*) offsetof(STATUS_VAR, rows_tmp_read), SHOW_LONGLONG_STATUS},
+#ifdef HAVE_REPLICATION
+ {"Rpl_semi_sync_master_status", (char*) &SHOW_FNAME(status), SHOW_FUNC},
+ {"Rpl_semi_sync_master_clients", (char*) &SHOW_FNAME(clients), SHOW_FUNC},
+ {"Rpl_semi_sync_master_yes_tx", (char*) &rpl_semi_sync_master_yes_transactions, SHOW_LONG},
+ {"Rpl_semi_sync_master_no_tx", (char*) &rpl_semi_sync_master_no_transactions, SHOW_LONG},
+ {"Rpl_semi_sync_master_wait_sessions", (char*) &SHOW_FNAME(wait_sessions), SHOW_FUNC},
+ {"Rpl_semi_sync_master_no_times", (char*) &rpl_semi_sync_master_off_times, SHOW_LONG},
+ {"Rpl_semi_sync_master_timefunc_failures", (char*) &rpl_semi_sync_master_timefunc_fails, SHOW_LONG},
+ {"Rpl_semi_sync_master_wait_pos_backtraverse", (char*) &rpl_semi_sync_master_wait_pos_backtraverse, SHOW_LONG},
+ {"Rpl_semi_sync_master_tx_wait_time", (char*) &SHOW_FNAME(trx_wait_time), SHOW_FUNC},
+ {"Rpl_semi_sync_master_tx_waits", (char*) &SHOW_FNAME(trx_wait_num), SHOW_FUNC},
+ {"Rpl_semi_sync_master_tx_avg_wait_time", (char*) &SHOW_FNAME(avg_trx_wait_time), SHOW_FUNC},
+ {"Rpl_semi_sync_master_net_wait_time", (char*) &SHOW_FNAME(net_wait_time), SHOW_FUNC},
+ {"Rpl_semi_sync_master_net_waits", (char*) &SHOW_FNAME(net_wait_num), SHOW_FUNC},
+ {"Rpl_semi_sync_master_net_avg_wait_time", (char*) &SHOW_FNAME(avg_net_wait_time), SHOW_FUNC},
+#ifdef HAVE_ACC_RECEIVER
+ {"Rpl_semi_sync_master_request_ack", (char*) &rpl_semi_sync_master_request_ack, SHOW_LONGLONG},
+ {"Rpl_semi_sync_master_get_ack", (char*)&rpl_semi_sync_master_get_ack, SHOW_LONGLONG},
+#endif
+ {"Rpl_semi_sync_slave_status", (char*) &rpl_semi_sync_slave_status, SHOW_BOOL},
+#ifdef HAVE_ACC_RECEIVER
+ {"Rpl_semi_sync_slave_send_ack", (char*) &rpl_semi_sync_slave_send_ack, SHOW_LONGLONG},
+#endif
+#endif /* HAVE_REPLICATION */
#ifdef HAVE_QUERY_CACHE
{"Qcache_free_blocks", (char*) &query_cache.free_memory_blocks, SHOW_LONG_NOFLUSH},
{"Qcache_free_memory", (char*) &query_cache.free_memory, SHOW_LONG_NOFLUSH},
@@ -10285,6 +10341,8 @@ PSI_stage_info stage_waiting_for_insert= { 0, "Waiting for INSERT", 0};
PSI_stage_info stage_waiting_for_master_to_send_event= { 0, "Waiting for master to send event", 0};
PSI_stage_info stage_waiting_for_master_update= { 0, "Waiting for master update", 0};
PSI_stage_info stage_waiting_for_relay_log_space= { 0, "Waiting for the slave SQL thread to free enough relay log space", 0};
+PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave=
+{ 0, "Waiting for semi-sync ACK from slave", 0};
PSI_stage_info stage_waiting_for_slave_mutex_on_exit= { 0, "Waiting for slave mutex on exit", 0};
PSI_stage_info stage_waiting_for_slave_thread_to_start= { 0, "Waiting for slave thread to start", 0};
PSI_stage_info stage_waiting_for_table_flush= { 0, "Waiting for table flush", 0};
diff --git a/sql/replication.h b/sql/replication.h
index 4731c2246ef..d8672310110 100644
--- a/sql/replication.h
+++ b/sql/replication.h
@@ -18,16 +18,14 @@
/***************************************************************************
NOTE: plugin locking.
- This API was created specifically for the semisync plugin and its locking
- logic is also matches semisync plugin usage pattern. In particular, a plugin
- is locked on Binlog_transmit_observer::transmit_start and is unlocked after
- Binlog_transmit_observer::transmit_stop. All other master observable events
- happen between these two and don't lock the plugin at all. This works well
- for the semisync_master plugin.
+
+ The plugin is locked on Binlog_transmit_observer::transmit_start and is
+ unlocked after Binlog_transmit_observer::transmit_stop. All other
+ master observable events happen between these two and don't lock the
+ plugin at all.
Also a plugin is locked on Binlog_relay_IO_observer::thread_start
- and unlocked after Binlog_relay_IO_observer::thread_stop. This works well for
- the semisync_slave plugin.
+ and unlocked after Binlog_relay_IO_observer::thread_stop.
***************************************************************************/
#include <mysql.h>
diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc
index e3ff2a17a6a..27e411ca6de 100644
--- a/sql/rpl_handler.cc
+++ b/sql/rpl_handler.cc
@@ -149,13 +149,17 @@ void delegates_destroy()
{
if (transaction_delegate)
transaction_delegate->~Trans_delegate();
+ transaction_delegate= 0;
if (binlog_storage_delegate)
binlog_storage_delegate->~Binlog_storage_delegate();
+ binlog_storage_delegate= 0;
#ifdef HAVE_REPLICATION
if (binlog_transmit_delegate)
binlog_transmit_delegate->~Binlog_transmit_delegate();
+ binlog_transmit_delegate= 0;
if (binlog_relay_io_delegate)
binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
+ binlog_relay_io_delegate= 0;
#endif /* HAVE_REPLICATION */
}
@@ -171,13 +175,11 @@ void delegates_destroy()
Observer_info *info= iter++; \
for (; info; info= iter++) \
{ \
- if (do_lock) plugin_lock(thd, plugin_int_to_ref(info->plugin_int)); \
if (((Observer *)info->observer)->f \
&& ((Observer *)info->observer)->f args) \
{ \
r= 1; \
- sql_print_error("Run function '" #f "' in plugin '%s' failed", \
- info->plugin_int->name.str); \
+ sql_print_error("Run function '" #f "' failed"); \
break; \
} \
} \
diff --git a/sql/semisync.cc b/sql/semisync.cc
new file mode 100644
index 00000000000..df37f03ec2f
--- /dev/null
+++ b/sql/semisync.cc
@@ -0,0 +1,32 @@
+/* Copyright (C) 2007 Google Inc.
+ Copyright (C) 2008 MySQL AB
+ Use is subject to license terms
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+
+#include <my_global.h>
+#include "semisync.h"
+
+const unsigned char ReplSemiSyncBase::kPacketMagicNum = 0xef;
+const unsigned char ReplSemiSyncBase::kPacketFlagSync = 0x01;
+
+
+const unsigned long Trace::kTraceGeneral = 0x0001;
+const unsigned long Trace::kTraceDetail = 0x0010;
+const unsigned long Trace::kTraceNetWait = 0x0020;
+const unsigned long Trace::kTraceFunction = 0x0040;
+
+const unsigned char ReplSemiSyncBase::kSyncHeader[2] =
+ {ReplSemiSyncBase::kPacketMagicNum, 0};
diff --git a/sql/semisync.h b/sql/semisync.h
new file mode 100644
index 00000000000..3142f920f1e
--- /dev/null
+++ b/sql/semisync.h
@@ -0,0 +1,83 @@
+/* Copyright (C) 2007 Google Inc.
+ Copyright (C) 2008 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+
+#ifndef SEMISYNC_H
+#define SEMISYNC_H
+
+#include "mysqld.h"
+#include "log_event.h"
+#include "replication.h"
+
+/**
+ This class is used to trace function calls and other process
+ information
+*/
+class Trace {
+public:
+ static const unsigned long kTraceFunction;
+ static const unsigned long kTraceGeneral;
+ static const unsigned long kTraceDetail;
+ static const unsigned long kTraceNetWait;
+
+ unsigned long trace_level_; /* the level for tracing */
+
+ inline void function_enter(const char *func_name)
+ {
+ if (trace_level_ & kTraceFunction)
+ sql_print_information("---> %s enter", func_name);
+ }
+ inline int function_exit(const char *func_name, int exit_code)
+ {
+ if (trace_level_ & kTraceFunction)
+ sql_print_information("<--- %s exit (%d)", func_name, exit_code);
+ return exit_code;
+ }
+
+ Trace()
+ :trace_level_(0L)
+ {}
+ Trace(unsigned long trace_level)
+ :trace_level_(trace_level)
+ {}
+};
+
+/**
+ Base class for semi-sync master and slave classes
+*/
+class ReplSemiSyncBase
+ :public Trace {
+public:
+ static const unsigned char kSyncHeader[2]; /* three byte packet header */
+
+ /* Constants in network packet header. */
+ static const unsigned char kPacketMagicNum;
+ static const unsigned char kPacketFlagSync;
+};
+
+/* The layout of a semisync slave reply packet:
+ 1 byte for the magic num
+ 8 bytes for the binlog positon
+ n bytes for the binlog filename, terminated with a '\0'
+*/
+#define REPLY_MAGIC_NUM_LEN 1
+#define REPLY_BINLOG_POS_LEN 8
+#define REPLY_BINLOG_NAME_LEN (FN_REFLEN + 1)
+#define REPLY_MAGIC_NUM_OFFSET 0
+#define REPLY_BINLOG_POS_OFFSET (REPLY_MAGIC_NUM_OFFSET + REPLY_MAGIC_NUM_LEN)
+#define REPLY_BINLOG_NAME_OFFSET (REPLY_BINLOG_POS_OFFSET + REPLY_BINLOG_POS_LEN)
+
+#endif /* SEMISYNC_H */
diff --git a/sql/semisync_master.cc b/sql/semisync_master.cc
new file mode 100644
index 00000000000..21c52addce9
--- /dev/null
+++ b/sql/semisync_master.cc
@@ -0,0 +1,1430 @@
+/* Copyright (C) 2007 Google Inc.
+ Copyright (c) 2008, 2013, Oracle and/or its affiliates.
+ Copyright (c) 2011, 2016, MariaDB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+
+#include <my_global.h>
+#include "semisync_master.h"
+
+#define TIME_THOUSAND 1000
+#define TIME_MILLION 1000000
+#define TIME_BILLION 1000000000
+
+/* This indicates whether semi-synchronous replication is enabled. */
+my_bool rpl_semi_sync_master_enabled;
+my_bool rpl_semi_sync_master_wait_no_slave = 1;
+my_bool rpl_semi_sync_master_status = 0;
+ulong rpl_semi_sync_master_wait_point =
+ SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT;
+ulong rpl_semi_sync_master_timeout;
+ulong rpl_semi_sync_master_trace_level;
+ulong rpl_semi_sync_master_yes_transactions = 0;
+ulong rpl_semi_sync_master_no_transactions = 0;
+ulong rpl_semi_sync_master_off_times = 0;
+ulong rpl_semi_sync_master_timefunc_fails = 0;
+ulong rpl_semi_sync_master_wait_timeouts = 0;
+ulong rpl_semi_sync_master_wait_sessions = 0;
+ulong rpl_semi_sync_master_wait_pos_backtraverse = 0;
+ulong rpl_semi_sync_master_avg_trx_wait_time = 0;
+ulonglong rpl_semi_sync_master_trx_wait_num = 0;
+ulong rpl_semi_sync_master_avg_net_wait_time = 0;
+ulonglong rpl_semi_sync_master_net_wait_num = 0;
+ulong rpl_semi_sync_master_clients = 0;
+ulonglong rpl_semi_sync_master_net_wait_time = 0;
+ulonglong rpl_semi_sync_master_trx_wait_time = 0;
+
+ReplSemiSyncMaster repl_semisync_master;
+
+static int getWaitTime(const struct timespec& start_ts);
+
+static ulonglong timespec_to_usec(const struct timespec *ts)
+{
+ return (ulonglong) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND;
+}
+
+/*******************************************************************************
+ *
+ * <ActiveTranx> class : manage all active transaction nodes
+ *
+ ******************************************************************************/
+
+ActiveTranx::ActiveTranx(mysql_mutex_t *lock,
+ ulong trace_level)
+ : Trace(trace_level), allocator_(max_connections),
+ num_entries_(max_connections << 1), /* Transaction hash table size
+ * is set to double the size
+ * of max_connections */
+ lock_(lock)
+{
+ /* No transactions are in the list initially. */
+ trx_front_ = NULL;
+ trx_rear_ = NULL;
+
+ /* Create the hash table to find a transaction's ending event. */
+ trx_htb_ = new TranxNode *[num_entries_];
+ for (int idx = 0; idx < num_entries_; ++idx)
+ trx_htb_[idx] = NULL;
+
+ sql_print_information("Semi-sync replication initialized for transactions.");
+}
+
+ActiveTranx::~ActiveTranx()
+{
+ delete [] trx_htb_;
+ trx_htb_ = NULL;
+ num_entries_ = 0;
+}
+
+unsigned int ActiveTranx::calc_hash(const unsigned char *key,
+ unsigned int length)
+{
+ unsigned int nr = 1, nr2 = 4;
+
+ /* The hash implementation comes from calc_hashnr() in mysys/hash.c. */
+ while (length--)
+ {
+ nr ^= (((nr & 63)+nr2)*((unsigned int) (unsigned char) *key++))+ (nr << 8);
+ nr2 += 3;
+ }
+ return((unsigned int) nr);
+}
+
+unsigned int ActiveTranx::get_hash_value(const char *log_file_name,
+ my_off_t log_file_pos)
+{
+ unsigned int hash1 = calc_hash((const unsigned char *)log_file_name,
+ strlen(log_file_name));
+ unsigned int hash2 = calc_hash((const unsigned char *)(&log_file_pos),
+ sizeof(log_file_pos));
+
+ return (hash1 + hash2) % num_entries_;
+}
+
+int ActiveTranx::compare(const char *log_file_name1, my_off_t log_file_pos1,
+ const char *log_file_name2, my_off_t log_file_pos2)
+{
+ int cmp = strcmp(log_file_name1, log_file_name2);
+
+ if (cmp != 0)
+ return cmp;
+
+ if (log_file_pos1 > log_file_pos2)
+ return 1;
+ else if (log_file_pos1 < log_file_pos2)
+ return -1;
+ return 0;
+}
+
+int ActiveTranx::insert_tranx_node(const char *log_file_name,
+ my_off_t log_file_pos)
+{
+ const char *kWho = "ActiveTranx:insert_tranx_node";
+ TranxNode *ins_node;
+ int result = 0;
+ unsigned int hash_val;
+
+ function_enter(kWho);
+
+ ins_node = allocator_.allocate_node();
+ if (!ins_node)
+ {
+ sql_print_error("%s: transaction node allocation failed for: (%s, %lu)",
+ kWho, log_file_name, (ulong)log_file_pos);
+ result = -1;
+ goto l_end;
+ }
+
+ /* insert the binlog position in the active transaction list. */
+ strncpy(ins_node->log_name_, log_file_name, FN_REFLEN-1);
+ ins_node->log_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
+ ins_node->log_pos_ = log_file_pos;
+
+ if (!trx_front_)
+ {
+ /* The list is empty. */
+ trx_front_ = trx_rear_ = ins_node;
+ }
+ else
+ {
+ int cmp = compare(ins_node, trx_rear_);
+ if (cmp > 0)
+ {
+ /* Compare with the tail first. If the transaction happens later in
+ * binlog, then make it the new tail.
+ */
+ trx_rear_->next_ = ins_node;
+ trx_rear_ = ins_node;
+ }
+ else
+ {
+ /* Otherwise, it is an error because the transaction should hold the
+ * mysql_bin_log.LOCK_log when appending events.
+ */
+ sql_print_error("%s: binlog write out-of-order, tail (%s, %lu), "
+ "new node (%s, %lu)", kWho,
+ trx_rear_->log_name_, (ulong)trx_rear_->log_pos_,
+ ins_node->log_name_, (ulong)ins_node->log_pos_);
+ result = -1;
+ goto l_end;
+ }
+ }
+
+ hash_val = get_hash_value(ins_node->log_name_, ins_node->log_pos_);
+ ins_node->hash_next_ = trx_htb_[hash_val];
+ trx_htb_[hash_val] = ins_node;
+
+ if (trace_level_ & kTraceDetail)
+ sql_print_information("%s: insert (%s, %lu) in entry(%u)", kWho,
+ ins_node->log_name_, (ulong)ins_node->log_pos_,
+ hash_val);
+
+ l_end:
+ return function_exit(kWho, result);
+}
+
+bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
+ my_off_t log_file_pos)
+{
+ const char *kWho = "ActiveTranx::is_tranx_end_pos";
+ function_enter(kWho);
+
+ unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
+ TranxNode *entry = trx_htb_[hash_val];
+
+ while (entry != NULL)
+ {
+ if (compare(entry, log_file_name, log_file_pos) == 0)
+ break;
+
+ entry = entry->hash_next_;
+ }
+
+ if (trace_level_ & kTraceDetail)
+ sql_print_information("%s: probe (%s, %lu) in entry(%u)", kWho,
+ log_file_name, (ulong)log_file_pos, hash_val);
+
+ function_exit(kWho, (entry != NULL));
+ return (entry != NULL);
+}
+
+int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
+ my_off_t log_file_pos)
+{
+ const char *kWho = "ActiveTranx::::clear_active_tranx_nodes";
+ TranxNode *new_front;
+
+ function_enter(kWho);
+
+ if (log_file_name != NULL)
+ {
+ new_front = trx_front_;
+
+ while (new_front)
+ {
+ if (compare(new_front, log_file_name, log_file_pos) > 0)
+ break;
+ new_front = new_front->next_;
+ }
+ }
+ else
+ {
+ /* If log_file_name is NULL, clear everything. */
+ new_front = NULL;
+ }
+
+ if (new_front == NULL)
+ {
+ /* No active transaction nodes after the call. */
+
+ /* Clear the hash table. */
+ memset(trx_htb_, 0, num_entries_ * sizeof(TranxNode *));
+ allocator_.free_all_nodes();
+
+ /* Clear the active transaction list. */
+ if (trx_front_ != NULL)
+ {
+ trx_front_ = NULL;
+ trx_rear_ = NULL;
+ }
+
+ if (trace_level_ & kTraceDetail)
+ sql_print_information("%s: cleared all nodes", kWho);
+ }
+ else if (new_front != trx_front_)
+ {
+ TranxNode *curr_node, *next_node;
+
+ /* Delete all transaction nodes before the confirmation point. */
+ int n_frees = 0;
+ curr_node = trx_front_;
+ while (curr_node != new_front)
+ {
+ next_node = curr_node->next_;
+ n_frees++;
+
+ /* Remove the node from the hash table. */
+ unsigned int hash_val = get_hash_value(curr_node->log_name_, curr_node->log_pos_);
+ TranxNode **hash_ptr = &(trx_htb_[hash_val]);
+ while ((*hash_ptr) != NULL)
+ {
+ if ((*hash_ptr) == curr_node)
+ {
+ (*hash_ptr) = curr_node->hash_next_;
+ break;
+ }
+ hash_ptr = &((*hash_ptr)->hash_next_);
+ }
+
+ curr_node = next_node;
+ }
+
+ trx_front_ = new_front;
+ allocator_.free_nodes_before(trx_front_);
+
+ if (trace_level_ & kTraceDetail)
+ sql_print_information("%s: cleared %d nodes back until pos (%s, %lu)",
+ kWho, n_frees,
+ trx_front_->log_name_, (ulong)trx_front_->log_pos_);
+ }
+
+ return function_exit(kWho, 0);
+}
+
+
+/*******************************************************************************
+ *
+ * <ReplSemiSyncMaster> class: the basic code layer for sync-replication master.
+ * <ReplSemiSyncSlave> class: the basic code layer for sync-replication slave.
+ *
+ * The most important functions during semi-syn replication listed:
+ *
+ * Master:
+ * . reportReplyBinlog(): called by the binlog dump thread when it receives
+ * the slave's status information.
+ * . updateSyncHeader(): based on transaction waiting information, decide
+ * whether to request the slave to reply.
+ * . writeTranxInBinlog(): called by the transaction thread when it finishes
+ * writing all transaction events in binlog.
+ * . commitTrx(): transaction thread wait for the slave reply.
+ *
+ * Slave:
+ * . slaveReadSyncHeader(): read the semi-sync header from the master, get the
+ * sync status and get the payload for events.
+ * . slaveReply(): reply to the master about the replication progress.
+ *
+ ******************************************************************************/
+
+ReplSemiSyncMaster::ReplSemiSyncMaster()
+ : active_tranxs_(NULL),
+ init_done_(false),
+ reply_file_name_inited_(false),
+ reply_file_pos_(0L),
+ wait_file_name_inited_(false),
+ wait_file_pos_(0),
+ master_enabled_(false),
+ wait_timeout_(0L),
+ state_(0)
+{
+ strcpy(reply_file_name_, "");
+ strcpy(wait_file_name_, "");
+}
+
+int ReplSemiSyncMaster::initObject()
+{
+ int result;
+ const char *kWho = "ReplSemiSyncMaster::initObject";
+
+ if (init_done_)
+ {
+ fprintf(stderr, "%s called twice\n", kWho);
+ return 1;
+ }
+ init_done_ = true;
+
+ /* References to the parameter works after set_options(). */
+ setWaitTimeout(rpl_semi_sync_master_timeout);
+ setTraceLevel(rpl_semi_sync_master_trace_level);
+
+ /* Mutex initialization can only be done after MY_INIT(). */
+ mysql_mutex_init(key_LOCK_binlog,
+ &LOCK_binlog, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_binlog_send,
+ &COND_binlog_send, NULL);
+
+ if (rpl_semi_sync_master_enabled)
+ result = enableMaster();
+ else
+ result = disableMaster();
+
+ return result;
+}
+
+int ReplSemiSyncMaster::enableMaster()
+{
+ int result = 0;
+
+ /* Must have the lock when we do enable of disable. */
+ lock();
+
+ if (!getMasterEnabled())
+ {
+ active_tranxs_ = new ActiveTranx(&LOCK_binlog, trace_level_);
+ if (active_tranxs_ != NULL)
+ {
+ commit_file_name_inited_ = false;
+ reply_file_name_inited_ = false;
+ wait_file_name_inited_ = false;
+
+ set_master_enabled(true);
+ state_ = true;
+ run_hooks_enabled= 1;
+ sql_print_information("Semi-sync replication enabled on the master.");
+ }
+ else
+ {
+ sql_print_error("Cannot allocate memory to enable semi-sync on the master.");
+ result = -1;
+ }
+ }
+
+ unlock();
+
+ return result;
+}
+
+int ReplSemiSyncMaster::disableMaster()
+{
+ /* Must have the lock when we do enable of disable. */
+ lock();
+
+ if (getMasterEnabled())
+ {
+ /* Switch off the semi-sync first so that waiting transaction will be
+ * waken up.
+ */
+ switch_off();
+
+ assert(active_tranxs_ != NULL);
+ delete active_tranxs_;
+ active_tranxs_ = NULL;
+
+ reply_file_name_inited_ = false;
+ wait_file_name_inited_ = false;
+ commit_file_name_inited_ = false;
+
+ set_master_enabled(false);
+ sql_print_information("Semi-sync replication disabled on the master.");
+ }
+
+ unlock();
+
+ return 0;
+}
+
+void ReplSemiSyncMaster::cleanup()
+{
+ if (init_done_)
+ {
+ mysql_mutex_destroy(&LOCK_binlog);
+ mysql_cond_destroy(&COND_binlog_send);
+ init_done_= 0;
+ }
+
+ delete active_tranxs_;
+}
+
+void ReplSemiSyncMaster::lock()
+{
+ mysql_mutex_lock(&LOCK_binlog);
+}
+
+void ReplSemiSyncMaster::unlock()
+{
+ mysql_mutex_unlock(&LOCK_binlog);
+}
+
+void ReplSemiSyncMaster::cond_broadcast()
+{
+ mysql_cond_broadcast(&COND_binlog_send);
+}
+
+int ReplSemiSyncMaster::cond_timewait(struct timespec *wait_time)
+{
+ const char *kWho = "ReplSemiSyncMaster::cond_timewait()";
+ int wait_res;
+
+ function_enter(kWho);
+ wait_res= mysql_cond_timedwait(&COND_binlog_send,
+ &LOCK_binlog, wait_time);
+ return function_exit(kWho, wait_res);
+}
+
+void ReplSemiSyncMaster::add_slave()
+{
+ lock();
+ rpl_semi_sync_master_clients++;
+ unlock();
+}
+
+void ReplSemiSyncMaster::remove_slave()
+{
+ lock();
+ rpl_semi_sync_master_clients--;
+
+ /* Only switch off if semi-sync is enabled and is on */
+ if (getMasterEnabled() && is_on())
+ {
+ /* If user has chosen not to wait if no semi-sync slave available
+ and the last semi-sync slave exits, turn off semi-sync on master
+ immediately.
+ */
+ if (!rpl_semi_sync_master_wait_no_slave &&
+ rpl_semi_sync_master_clients == 0)
+ switch_off();
+ }
+ unlock();
+}
+
+bool ReplSemiSyncMaster::is_semi_sync_slave()
+{
+ int null_value;
+ long long val= 0;
+ get_user_var_int("rpl_semi_sync_slave", &val, &null_value);
+ return val;
+}
+
+int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
+ const char *log_file_name,
+ my_off_t log_file_pos)
+{
+ const char *kWho = "ReplSemiSyncMaster::reportReplyBinlog";
+ int cmp;
+ bool can_release_threads = false;
+ bool need_copy_send_pos = true;
+
+ if (!(getMasterEnabled()))
+ return 0;
+
+ function_enter(kWho);
+
+ lock();
+
+ /* This is the real check inside the mutex. */
+ if (!getMasterEnabled())
+ goto l_end;
+
+ if (!is_on())
+ /* We check to see whether we can switch semi-sync ON. */
+ try_switch_on(server_id, log_file_name, log_file_pos);
+
+ /* The position should increase monotonically, if there is only one
+ * thread sending the binlog to the slave.
+ * In reality, to improve the transaction availability, we allow multiple
+ * sync replication slaves. So, if any one of them get the transaction,
+ * the transaction session in the primary can move forward.
+ */
+ if (reply_file_name_inited_)
+ {
+ cmp = ActiveTranx::compare(log_file_name, log_file_pos,
+ reply_file_name_, reply_file_pos_);
+
+ /* If the requested position is behind the sending binlog position,
+ * would not adjust sending binlog position.
+ * We based on the assumption that there are multiple semi-sync slave,
+ * and at least one of them shou/ld be up to date.
+ * If all semi-sync slaves are behind, at least initially, the primary
+ * can find the situation after the waiting timeout. After that, some
+ * slaves should catch up quickly.
+ */
+ if (cmp < 0)
+ {
+ /* If the position is behind, do not copy it. */
+ need_copy_send_pos = false;
+ }
+ }
+
+ if (need_copy_send_pos)
+ {
+ strmake_buf(reply_file_name_, log_file_name);
+ reply_file_pos_ = log_file_pos;
+ reply_file_name_inited_ = true;
+
+ /* Remove all active transaction nodes before this point. */
+ assert(active_tranxs_ != NULL);
+ active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos);
+
+ if (trace_level_ & kTraceDetail)
+ sql_print_information("%s: Got reply at (%s, %lu)", kWho,
+ log_file_name, (ulong)log_file_pos);
+ }
+
+ if (rpl_semi_sync_master_wait_sessions > 0)
+ {
+ /* Let us check if some of the waiting threads doing a trx
+ * commit can now proceed.
+ */
+ cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
+ wait_file_name_, wait_file_pos_);
+ if (cmp >= 0)
+ {
+ /* Yes, at least one waiting thread can now proceed:
+ * let us release all waiting threads with a broadcast
+ */
+ can_release_threads = true;
+ wait_file_name_inited_ = false;
+ }
+ }
+
+ l_end:
+ unlock();
+
+ if (can_release_threads)
+ {
+ if (trace_level_ & kTraceDetail)
+ sql_print_information("%s: signal all waiting threads.", kWho);
+
+ cond_broadcast();
+ }
+
+ return function_exit(kWho, 0);
+}
+
+int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
+ my_off_t trx_wait_binlog_pos)
+{
+ const char *kWho = "ReplSemiSyncMaster::commitTrx";
+
+ function_enter(kWho);
+
+ if (getMasterEnabled() && trx_wait_binlog_name)
+ {
+ struct timespec start_ts;
+ struct timespec abstime;
+ int wait_result;
+ PSI_stage_info old_stage;
+
+ set_timespec(start_ts, 0);
+
+ DEBUG_SYNC(current_thd, "rpl_semisync_master_commit_trx_before_lock");
+ /* Acquire the mutex. */
+ lock();
+
+ /* This must be called after acquired the lock */
+ THD_ENTER_COND(NULL, &COND_binlog_send, &LOCK_binlog,
+ & stage_waiting_for_semi_sync_ack_from_slave,
+ & old_stage);
+
+ /* This is the real check inside the mutex. */
+ if (!getMasterEnabled() || !is_on())
+ goto l_end;
+
+ if (trace_level_ & kTraceDetail)
+ {
+ sql_print_information("%s: wait pos (%s, %lu), repl(%d)\n", kWho,
+ trx_wait_binlog_name, (ulong)trx_wait_binlog_pos,
+ (int)is_on());
+ }
+
+ while (is_on() && !thd_killed(current_thd))
+ {
+ if (reply_file_name_inited_)
+ {
+ int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
+ trx_wait_binlog_name, trx_wait_binlog_pos);
+ if (cmp >= 0)
+ {
+ /* We have already sent the relevant binlog to the slave: no need to
+ * wait here.
+ */
+ if (trace_level_ & kTraceDetail)
+ sql_print_information("%s: Binlog reply is ahead (%s, %lu),",
+ kWho, reply_file_name_, (ulong)reply_file_pos_);
+ break;
+ }
+ }
+
+ /* Let us update the info about the minimum binlog position of waiting
+ * threads.
+ */
+ if (wait_file_name_inited_)
+ {
+ int cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos,
+ wait_file_name_, wait_file_pos_);
+ if (cmp <= 0)
+ {
+ /* This thd has a lower position, let's update the minimum info. */
+ strmake_buf(wait_file_name_, trx_wait_binlog_name);
+ wait_file_pos_ = trx_wait_binlog_pos;
+
+ rpl_semi_sync_master_wait_pos_backtraverse++;
+ if (trace_level_ & kTraceDetail)
+ sql_print_information("%s: move back wait position (%s, %lu),",
+ kWho, wait_file_name_, (ulong)wait_file_pos_);
+ }
+ }
+ else
+ {
+ strmake_buf(wait_file_name_, trx_wait_binlog_name);
+ wait_file_pos_ = trx_wait_binlog_pos;
+ wait_file_name_inited_ = true;
+
+ if (trace_level_ & kTraceDetail)
+ sql_print_information("%s: init wait position (%s, %lu),",
+ kWho, wait_file_name_, (ulong)wait_file_pos_);
+ }
+
+ /* Calcuate the waiting period. */
+ long diff_secs = (long) (wait_timeout_ / TIME_THOUSAND);
+ long diff_nsecs = (long) ((wait_timeout_ % TIME_THOUSAND) * TIME_MILLION);
+ long nsecs = start_ts.tv_nsec + diff_nsecs;
+ abstime.tv_sec = start_ts.tv_sec + diff_secs + nsecs/TIME_BILLION;
+ abstime.tv_nsec = nsecs % TIME_BILLION;
+
+ /* In semi-synchronous replication, we wait until the binlog-dump
+ * thread has received the reply on the relevant binlog segment from the
+ * replication slave.
+ *
+ * Let us suspend this thread to wait on the condition;
+ * when replication has progressed far enough, we will release
+ * these waiting threads.
+ */
+ rpl_semi_sync_master_wait_sessions++;
+
+ if (trace_level_ & kTraceDetail)
+ sql_print_information("%s: wait %lu ms for binlog sent (%s, %lu)",
+ kWho, wait_timeout_,
+ wait_file_name_, (ulong)wait_file_pos_);
+
+ wait_result = cond_timewait(&abstime);
+ rpl_semi_sync_master_wait_sessions--;
+
+ if (wait_result != 0)
+ {
+ /* This is a real wait timeout. */
+ sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), "
+ "semi-sync up to file %s, position %lu.",
+ trx_wait_binlog_name, (ulong)trx_wait_binlog_pos,
+ reply_file_name_, (ulong)reply_file_pos_);
+ rpl_semi_sync_master_wait_timeouts++;
+
+ /* switch semi-sync off */
+ switch_off();
+ }
+ else
+ {
+ int wait_time;
+
+ wait_time = getWaitTime(start_ts);
+ if (wait_time < 0)
+ {
+ if (trace_level_ & kTraceGeneral)
+ {
+ sql_print_error("Replication semi-sync getWaitTime fail at "
+ "wait position (%s, %lu)",
+ trx_wait_binlog_name, (ulong)trx_wait_binlog_pos);
+ }
+ rpl_semi_sync_master_timefunc_fails++;
+ }
+ else
+ {
+ rpl_semi_sync_master_trx_wait_num++;
+ rpl_semi_sync_master_trx_wait_time += wait_time;
+ }
+ }
+ }
+
+ /*
+ At this point, the binlog file and position of this transaction
+ must have been removed from ActiveTranx.
+ active_tranxs_ may be NULL if someone disabled semi sync during
+ cond_timewait()
+ */
+ assert(thd_killed(current_thd) || !active_tranxs_ ||
+ !active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name,
+ trx_wait_binlog_pos));
+
+ l_end:
+ /* Update the status counter. */
+ if (is_on())
+ rpl_semi_sync_master_yes_transactions++;
+ else
+ rpl_semi_sync_master_no_transactions++;
+
+ /* The lock held will be released by thd_exit_cond, so no need to
+ call unlock() here */
+ THD_EXIT_COND(NULL, & old_stage);
+ }
+
+ return function_exit(kWho, 0);
+}
+
+/* Indicate that semi-sync replication is OFF now.
+ *
+ * What should we do when it is disabled? The problem is that we want
+ * the semi-sync replication enabled again when the slave catches up
+ * later. But, it is not that easy to detect that the slave has caught
+ * up. This is caused by the fact that MySQL's replication protocol is
+ * asynchronous, meaning that if the master does not use the semi-sync
+ * protocol, the slave would not send anything to the master.
+ * Still, if the master is sending (N+1)-th event, we assume that it is
+ * an indicator that the slave has received N-th event and earlier ones.
+ *
+ * If semi-sync is disabled, all transactions still update the wait
+ * position with the last position in binlog. But no transactions will
+ * wait for confirmations and the active transaction list would not be
+ * maintained. In binlog dump thread, updateSyncHeader() checks whether
+ * the current sending event catches up with last wait position. If it
+ * does match, semi-sync will be switched on again.
+ */
+int ReplSemiSyncMaster::switch_off()
+{
+ const char *kWho = "ReplSemiSyncMaster::switch_off";
+ int result;
+
+ function_enter(kWho);
+ state_ = false;
+
+ /* Clear the active transaction list. */
+ assert(active_tranxs_ != NULL);
+ result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
+
+ rpl_semi_sync_master_off_times++;
+ wait_file_name_inited_ = false;
+ reply_file_name_inited_ = false;
+ sql_print_information("Semi-sync replication switched OFF.");
+ cond_broadcast(); /* wake up all waiting threads */
+
+ return function_exit(kWho, result);
+}
+
+int ReplSemiSyncMaster::try_switch_on(int server_id,
+ const char *log_file_name,
+ my_off_t log_file_pos)
+{
+ const char *kWho = "ReplSemiSyncMaster::try_switch_on";
+ bool semi_sync_on = false;
+
+ function_enter(kWho);
+
+ /* If the current sending event's position is larger than or equal to the
+ * 'largest' commit transaction binlog position, the slave is already
+ * catching up now and we can switch semi-sync on here.
+ * If commit_file_name_inited_ indicates there are no recent transactions,
+ * we can enable semi-sync immediately.
+ */
+ if (commit_file_name_inited_)
+ {
+ int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
+ commit_file_name_, commit_file_pos_);
+ semi_sync_on = (cmp >= 0);
+ }
+ else
+ {
+ semi_sync_on = true;
+ }
+
+ if (semi_sync_on)
+ {
+ /* Switch semi-sync replication on. */
+ state_ = true;
+
+ sql_print_information("Semi-sync replication switched ON with slave (server_id: %d) "
+ "at (%s, %lu)",
+ server_id, log_file_name,
+ (ulong)log_file_pos);
+ }
+
+ return function_exit(kWho, 0);
+}
+
+int ReplSemiSyncMaster::reserveSyncHeader(unsigned char *header,
+ ulong size)
+{
+ const char *kWho = "ReplSemiSyncMaster::reserveSyncHeader";
+ function_enter(kWho);
+
+ int hlen=0;
+ if (!is_semi_sync_slave())
+ {
+ hlen= 0;
+ }
+ else
+ {
+ /* No enough space for the extra header, disable semi-sync master */
+ if (sizeof(kSyncHeader) > size)
+ {
+ sql_print_warning("No enough space in the packet "
+ "for semi-sync extra header, "
+ "semi-sync replication disabled");
+ disableMaster();
+ return 0;
+ }
+
+ /* Set the magic number and the sync status. By default, no sync
+ * is required.
+ */
+ memcpy(header, kSyncHeader, sizeof(kSyncHeader));
+ hlen= sizeof(kSyncHeader);
+ }
+ return function_exit(kWho, hlen);
+}
+
+int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
+ const char *log_file_name,
+ my_off_t log_file_pos,
+ uint32 server_id)
+{
+ const char *kWho = "ReplSemiSyncMaster::updateSyncHeader";
+ int cmp = 0;
+ bool sync = false;
+
+ /* If the semi-sync master is not enabled, or the slave is not a semi-sync
+ * target, do not request replies from the slave.
+ */
+ if (!getMasterEnabled() || !is_semi_sync_slave())
+ return 0;
+
+ function_enter(kWho);
+
+ lock();
+
+ /* This is the real check inside the mutex. */
+ if (!getMasterEnabled())
+ goto l_end; // sync= false at this point in time
+
+ if (is_on())
+ {
+ /* semi-sync is ON */
+ /* sync= false; No sync unless a transaction is involved. */
+
+ if (reply_file_name_inited_)
+ {
+ cmp = ActiveTranx::compare(log_file_name, log_file_pos,
+ reply_file_name_, reply_file_pos_);
+ if (cmp <= 0)
+ {
+ /* If we have already got the reply for the event, then we do
+ * not need to sync the transaction again.
+ */
+ goto l_end;
+ }
+ }
+
+ if (wait_file_name_inited_)
+ {
+ cmp = ActiveTranx::compare(log_file_name, log_file_pos,
+ wait_file_name_, wait_file_pos_);
+ }
+ else
+ {
+ cmp = 1;
+ }
+
+ /* If we are already waiting for some transaction replies which
+ * are later in binlog, do not wait for this one event.
+ */
+ if (cmp >= 0)
+ {
+ /*
+ * We only wait if the event is a transaction's ending event.
+ */
+ assert(active_tranxs_ != NULL);
+ sync = active_tranxs_->is_tranx_end_pos(log_file_name,
+ log_file_pos);
+ }
+ }
+ else
+ {
+ if (commit_file_name_inited_)
+ {
+ int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
+ commit_file_name_, commit_file_pos_);
+ sync = (cmp >= 0);
+ }
+ else
+ {
+ sync = true;
+ }
+ }
+
+ if (trace_level_ & kTraceDetail)
+ sql_print_information("%s: server(%d), (%s, %lu) sync(%d), repl(%d)",
+ kWho, server_id, log_file_name,
+ (ulong)log_file_pos, sync, (int)is_on());
+
+ l_end:
+ unlock();
+
+ /* We do not need to clear sync flag because we set it to 0 when we
+ * reserve the packet header.
+ */
+ if (sync)
+ {
+ (packet)[2] = kPacketFlagSync;
+ }
+
+ return function_exit(kWho, 0);
+}
+
+int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
+ my_off_t log_file_pos)
+{
+ const char *kWho = "ReplSemiSyncMaster::writeTranxInBinlog";
+ int result = 0;
+
+ function_enter(kWho);
+
+ lock();
+
+ /* This is the real check inside the mutex. */
+ if (!getMasterEnabled())
+ goto l_end;
+
+ /* Update the 'largest' transaction commit position seen so far even
+ * though semi-sync is switched off.
+ * It is much better that we update commit_file_* here, instead of
+ * inside commitTrx(). This is mostly because updateSyncHeader()
+ * will watch for commit_file_* to decide whether to switch semi-sync
+ * on. The detailed reason is explained in function updateSyncHeader().
+ */
+ if (commit_file_name_inited_)
+ {
+ int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
+ commit_file_name_, commit_file_pos_);
+ if (cmp > 0)
+ {
+ /* This is a larger position, let's update the maximum info. */
+ strncpy(commit_file_name_, log_file_name, FN_REFLEN-1);
+ commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
+ commit_file_pos_ = log_file_pos;
+ }
+ }
+ else
+ {
+ strncpy(commit_file_name_, log_file_name, FN_REFLEN-1);
+ commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
+ commit_file_pos_ = log_file_pos;
+ commit_file_name_inited_ = true;
+ }
+
+ if (is_on())
+ {
+ assert(active_tranxs_ != NULL);
+ if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
+ {
+ /*
+ if insert tranx_node failed, print a warning message
+ and turn off semi-sync
+ */
+ sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu",
+ log_file_name, (ulong)log_file_pos);
+ switch_off();
+ }
+ }
+
+ l_end:
+ unlock();
+
+ return function_exit(kWho, result);
+}
+
+int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
+ const char *event_buf)
+{
+ const char *kWho = "ReplSemiSyncMaster::readSlaveReply";
+ const unsigned char *packet;
+ char log_file_name[FN_REFLEN];
+ my_off_t log_file_pos;
+ ulong log_file_len = 0;
+ ulong packet_len;
+ int result = -1;
+ struct timespec start_ts;
+ ulong trc_level = trace_level_;
+ LINT_INIT_STRUCT(start_ts);
+
+ function_enter(kWho);
+
+ assert((unsigned char)event_buf[1] == kPacketMagicNum);
+ if ((unsigned char)event_buf[2] != kPacketFlagSync)
+ {
+ /* current event does not require reply */
+ result = 0;
+ goto l_end;
+ }
+
+ if (trc_level & kTraceNetWait)
+ set_timespec(start_ts, 0);
+
+ /* We flush to make sure that the current event is sent to the network,
+ * instead of being buffered in the TCP/IP stack.
+ */
+ if (net_flush(net))
+ {
+ sql_print_error("Semi-sync master failed on net_flush() "
+ "before waiting for slave reply");
+ goto l_end;
+ }
+
+ net_clear(net, 0);
+ if (trc_level & kTraceDetail)
+ sql_print_information("%s: Wait for replica's reply", kWho);
+
+ /* Wait for the network here. Though binlog dump thread can indefinitely wait
+ * here, transactions would not wait indefintely.
+ * Transactions wait on binlog replies detected by binlog dump threads. If
+ * binlog dump threads wait too long, transactions will timeout and continue.
+ */
+ packet_len = my_net_read(net);
+
+ if (trc_level & kTraceNetWait)
+ {
+ int wait_time = getWaitTime(start_ts);
+ if (wait_time < 0)
+ {
+ sql_print_error("Semi-sync master wait for reply "
+ "fail to get wait time.");
+ rpl_semi_sync_master_timefunc_fails++;
+ }
+ else
+ {
+ rpl_semi_sync_master_net_wait_num++;
+ rpl_semi_sync_master_net_wait_time += wait_time;
+ }
+ }
+
+ if (packet_len == packet_error || packet_len < REPLY_BINLOG_NAME_OFFSET)
+ {
+ if (packet_len == packet_error)
+ sql_print_error("Read semi-sync reply network error: %s (errno: %d)",
+ net->last_error, net->last_errno);
+ else
+ sql_print_error("Read semi-sync reply length error: %s (errno: %d)",
+ net->last_error, net->last_errno);
+ goto l_end;
+ }
+
+ packet = net->read_pos;
+ if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
+ {
+ sql_print_error("Read semi-sync reply magic number error");
+ goto l_end;
+ }
+
+ log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
+ log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
+ if (log_file_len >= FN_REFLEN)
+ {
+ sql_print_error("Read semi-sync reply binlog file length too large");
+ goto l_end;
+ }
+ strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
+ log_file_name[log_file_len] = 0;
+
+ if (trc_level & kTraceDetail)
+ sql_print_information("%s: Got reply (%s, %lu)",
+ kWho, log_file_name, (ulong)log_file_pos);
+
+ result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
+
+ l_end:
+ return function_exit(kWho, result);
+}
+
+
+int ReplSemiSyncMaster::resetMaster()
+{
+ const char *kWho = "ReplSemiSyncMaster::resetMaster";
+ int result = 0;
+
+ function_enter(kWho);
+
+
+ lock();
+
+ state_ = getMasterEnabled()? 1 : 0;
+
+ wait_file_name_inited_ = false;
+ reply_file_name_inited_ = false;
+ commit_file_name_inited_ = false;
+
+ rpl_semi_sync_master_yes_transactions = 0;
+ rpl_semi_sync_master_no_transactions = 0;
+ rpl_semi_sync_master_off_times = 0;
+ rpl_semi_sync_master_timefunc_fails = 0;
+ rpl_semi_sync_master_wait_sessions = 0;
+ rpl_semi_sync_master_wait_pos_backtraverse = 0;
+ rpl_semi_sync_master_trx_wait_num = 0;
+ rpl_semi_sync_master_trx_wait_time = 0;
+ rpl_semi_sync_master_net_wait_num = 0;
+ rpl_semi_sync_master_net_wait_time = 0;
+
+ unlock();
+
+ return function_exit(kWho, result);
+}
+
+void ReplSemiSyncMaster::setExportStats()
+{
+ lock();
+
+ rpl_semi_sync_master_status = state_;
+ rpl_semi_sync_master_avg_trx_wait_time=
+ ((rpl_semi_sync_master_trx_wait_num) ?
+ (ulong)((double)rpl_semi_sync_master_trx_wait_time /
+ ((double)rpl_semi_sync_master_trx_wait_num)) : 0);
+ rpl_semi_sync_master_avg_net_wait_time=
+ ((rpl_semi_sync_master_net_wait_num) ?
+ (ulong)((double)rpl_semi_sync_master_net_wait_time /
+ ((double)rpl_semi_sync_master_net_wait_num)) : 0);
+
+ unlock();
+}
+
+/* Get the waiting time given the wait's staring time.
+ *
+ * Return:
+ * >= 0: the waiting time in microsecons(us)
+ * < 0: error in get time or time back traverse
+ */
+static int getWaitTime(const struct timespec& start_ts)
+{
+ ulonglong start_usecs, end_usecs;
+ struct timespec end_ts;
+
+ /* Starting time in microseconds(us). */
+ start_usecs = timespec_to_usec(&start_ts);
+
+ /* Get the wait time interval. */
+ set_timespec(end_ts, 0);
+
+ /* Ending time in microseconds(us). */
+ end_usecs = timespec_to_usec(&end_ts);
+
+ if (end_usecs < start_usecs)
+ return -1;
+
+ return (int)(end_usecs - start_usecs);
+}
+
+/***************************************************************************
+ Semisync master interface setup and deinit
+***************************************************************************/
+
+C_MODE_START
+
+int repl_semi_report_binlog_update(Binlog_storage_param *param,
+ const char *log_file,
+ my_off_t log_pos, uint32 flags)
+{
+ int error= 0;
+
+ if (repl_semisync_master.getMasterEnabled())
+ {
+ /*
+ Let us store the binlog file name and the position, so that
+ we know how long to wait for the binlog to the replicated to
+ the slave in synchronous replication.
+ */
+ error= repl_semisync_master.writeTranxInBinlog(log_file,
+ log_pos);
+ }
+
+ return error;
+}
+
+int repl_semi_request_commit(Trans_param *param)
+{
+ return 0;
+}
+
+int repl_semi_report_binlog_sync(Binlog_storage_param *param,
+ const char *log_file,
+ my_off_t log_pos, uint32 flags)
+{
+ int error= 0;
+ if (rpl_semi_sync_master_wait_point ==
+ SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC)
+ {
+ error= repl_semisync_master.commitTrx(log_file, log_pos);
+ }
+
+ return error;
+}
+
+int repl_semi_report_commit(Trans_param *param)
+{
+ if (rpl_semi_sync_master_wait_point !=
+ SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT)
+ {
+ return 0;
+ }
+
+ bool is_real_trans= param->flags & TRANS_IS_REAL_TRANS;
+
+ if (is_real_trans && param->log_pos)
+ {
+ const char *binlog_name= param->log_file;
+ return repl_semisync_master.commitTrx(binlog_name, param->log_pos);
+ }
+ return 0;
+}
+
+int repl_semi_report_rollback(Trans_param *param)
+{
+ return repl_semi_report_commit(param);
+}
+
+int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
+ const char *log_file,
+ my_off_t log_pos)
+{
+ bool semi_sync_slave= repl_semisync_master.is_semi_sync_slave();
+
+ if (semi_sync_slave)
+ {
+ /* One more semi-sync slave */
+ repl_semisync_master.add_slave();
+
+ /*
+ Let's assume this semi-sync slave has already received all
+ binlog events before the filename and position it requests.
+ */
+ repl_semisync_master.reportReplyBinlog(param->server_id, log_file, log_pos);
+ }
+ sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
+ semi_sync_slave ? "semi-sync" : "asynchronous",
+ param->server_id, log_file, (ulong)log_pos);
+
+ return 0;
+}
+
+int repl_semi_binlog_dump_end(Binlog_transmit_param *param)
+{
+ bool semi_sync_slave= repl_semisync_master.is_semi_sync_slave();
+
+ sql_print_information("Stop %s binlog_dump to slave (server_id: %d)",
+ semi_sync_slave ? "semi-sync" : "asynchronous",
+ param->server_id);
+ if (semi_sync_slave)
+ {
+ /* One less semi-sync slave */
+ repl_semisync_master.remove_slave();
+ }
+ return 0;
+}
+
+int repl_semi_reserve_header(Binlog_transmit_param *param,
+ unsigned char *header,
+ ulong size, ulong *len)
+{
+ *len += repl_semisync_master.reserveSyncHeader(header, size);
+ return 0;
+}
+
+int repl_semi_before_send_event(Binlog_transmit_param *param,
+ unsigned char *packet, ulong len,
+ const char *log_file, my_off_t log_pos)
+{
+ return repl_semisync_master.updateSyncHeader(packet,
+ log_file,
+ log_pos,
+ param->server_id);
+}
+
+int repl_semi_after_send_event(Binlog_transmit_param *param,
+ const char *event_buf, ulong len)
+{
+ if (repl_semisync_master.is_semi_sync_slave())
+ {
+ THD *thd= current_thd;
+ /*
+ Possible errors in reading slave reply are ignored deliberately
+ because we do not want dump thread to quit on this. Error
+ messages are already reported.
+ */
+ (void) repl_semisync_master.readSlaveReply(&thd->net,
+ param->server_id, event_buf);
+ thd->clear_error();
+ }
+ return 0;
+}
+
+int repl_semi_reset_master(Binlog_transmit_param *param)
+{
+ if (repl_semisync_master.resetMaster())
+ return 1;
+ return 0;
+}
+
+C_MODE_END
+
+Trans_observer trans_observer=
+{
+ sizeof(Trans_observer), // len
+
+ repl_semi_report_commit, // after_commit
+ repl_semi_report_rollback, // after_rollback
+};
+
+Binlog_storage_observer storage_observer=
+{
+ sizeof(Binlog_storage_observer), // len
+
+ repl_semi_report_binlog_update, // report_update
+ repl_semi_report_binlog_sync, // after_sync
+};
+
+Binlog_transmit_observer transmit_observer=
+{
+ sizeof(Binlog_transmit_observer), // len
+
+ repl_semi_binlog_dump_start, // start
+ repl_semi_binlog_dump_end, // stop
+ repl_semi_reserve_header, // reserve_header
+ repl_semi_before_send_event, // before_send_event
+ repl_semi_after_send_event, // after_send_event
+ repl_semi_reset_master, // reset
+};
+
+static bool semi_sync_master_inited= 0;
+
+int semi_sync_master_init()
+{
+ void *p= 0;
+ if (repl_semisync_master.initObject())
+ return 1;
+ if (register_trans_observer(&trans_observer, p))
+ return 1;
+ if (register_binlog_storage_observer(&storage_observer, p))
+ return 1;
+ if (register_binlog_transmit_observer(&transmit_observer, p))
+ return 1;
+ semi_sync_master_inited= 1;
+ return 0;
+}
+
+void semi_sync_master_deinit()
+{
+ void *p= 0;
+ if (!semi_sync_master_inited)
+ return;
+
+ unregister_trans_observer(&trans_observer, p);
+ unregister_binlog_storage_observer(&storage_observer, p);
+ unregister_binlog_transmit_observer(&transmit_observer, p);
+ repl_semisync_master.cleanup();
+ semi_sync_master_inited= 0;
+}
diff --git a/sql/semisync_master.h b/sql/semisync_master.h
new file mode 100644
index 00000000000..ff1e3dd48b4
--- /dev/null
+++ b/sql/semisync_master.h
@@ -0,0 +1,636 @@
+/* Copyright (C) 2007 Google Inc.
+ Copyright (c) 2008 MySQL AB, 2009 Sun Microsystems, Inc.
+ Use is subject to license terms.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+
+#ifndef SEMISYNC_MASTER_H
+#define SEMISYNC_MASTER_H
+
+#include "semisync.h"
+
+#ifdef HAVE_PSI_INTERFACE
+extern PSI_mutex_key key_LOCK_binlog;
+extern PSI_cond_key key_COND_binlog_send;
+#endif
+
+extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave;
+
+struct TranxNode {
+ char log_name_[FN_REFLEN];
+ my_off_t log_pos_;
+ struct TranxNode *next_; /* the next node in the sorted list */
+ struct TranxNode *hash_next_; /* the next node during hash collision */
+};
+
+/**
+ @class TranxNodeAllocator
+
+ This class provides memory allocating and freeing methods for
+ TranxNode. The main target is performance.
+
+ @section ALLOCATE How to allocate a node
+ The pointer of the first node after 'last_node' in current_block is
+ returned. current_block will move to the next free Block when all nodes of
+ it are in use. A new Block is allocated and is put into the rear of the
+ Block link table if no Block is free.
+
+ The list starts up empty (ie, there is no allocated Block).
+
+ After some nodes are freed, there probably are some free nodes before
+ the sequence of the allocated nodes, but we do not reuse it. It is better
+ to keep the allocated nodes are in the sequence, for it is more efficient
+ for allocating and freeing TranxNode.
+
+ @section FREENODE How to free nodes
+ There are two methods for freeing nodes. They are free_all_nodes and
+ free_nodes_before.
+
+ 'A Block is free' means all of its nodes are free.
+ @subsection free_nodes_before
+ As all allocated nodes are in the sequence, 'Before one node' means all
+ nodes before given node in the same Block and all Blocks before the Block
+ which containing the given node. As such, all Blocks before the given one
+ ('node') are free Block and moved into the rear of the Block link table.
+ The Block containing the given 'node', however, is not. For at least the
+ given 'node' is still in use. This will waste at most one Block, but it is
+ more efficient.
+ */
+#define BLOCK_TRANX_NODES 16
+class TranxNodeAllocator
+{
+public:
+ /**
+ @param reserved_nodes
+ The number of reserved TranxNodes. It is used to set 'reserved_blocks'
+ which can contain at least 'reserved_nodes' number of TranxNodes. When
+ freeing memory, we will reserve at least reserved_blocks of Blocks not
+ freed.
+ */
+ TranxNodeAllocator(uint reserved_nodes) :
+ reserved_blocks(reserved_nodes/BLOCK_TRANX_NODES +
+ (reserved_nodes%BLOCK_TRANX_NODES > 1 ? 2 : 1)),
+ first_block(NULL), last_block(NULL),
+ current_block(NULL), last_node(-1), block_num(0) {}
+
+ ~TranxNodeAllocator()
+ {
+ Block *block= first_block;
+ while (block != NULL)
+ {
+ Block *next= block->next;
+ free_block(block);
+ block= next;
+ }
+ }
+
+ /**
+ The pointer of the first node after 'last_node' in current_block is
+ returned. current_block will move to the next free Block when all nodes of
+ it are in use. A new Block is allocated and is put into the rear of the
+ Block link table if no Block is free.
+
+ @return Return a TranxNode *, or NULL if an error occurred.
+ */
+ TranxNode *allocate_node()
+ {
+ TranxNode *trx_node;
+ Block *block= current_block;
+
+ if (last_node == BLOCK_TRANX_NODES-1)
+ {
+ current_block= current_block->next;
+ last_node= -1;
+ }
+
+ if (current_block == NULL && allocate_block())
+ {
+ current_block= block;
+ if (current_block)
+ last_node= BLOCK_TRANX_NODES-1;
+ return NULL;
+ }
+
+ trx_node= &(current_block->nodes[++last_node]);
+ trx_node->log_name_[0] = '\0';
+ trx_node->log_pos_= 0;
+ trx_node->next_= 0;
+ trx_node->hash_next_= 0;
+ return trx_node;
+ }
+
+ /**
+ All nodes are freed.
+
+ @return Return 0, or 1 if an error occurred.
+ */
+ int free_all_nodes()
+ {
+ current_block= first_block;
+ last_node= -1;
+ free_blocks();
+ return 0;
+ }
+
+ /**
+ All Blocks before the given 'node' are free Block and moved into the rear
+ of the Block link table.
+
+ @param node All nodes before 'node' will be freed
+
+ @return Return 0, or 1 if an error occurred.
+ */
+ int free_nodes_before(TranxNode* node)
+ {
+ Block *block;
+ Block *prev_block= NULL;
+
+ block= first_block;
+ while (block != current_block->next)
+ {
+ /* Find the Block containing the given node */
+ if (&(block->nodes[0]) <= node && &(block->nodes[BLOCK_TRANX_NODES]) >= node)
+ {
+ /* All Blocks before the given node are put into the rear */
+ if (first_block != block)
+ {
+ last_block->next= first_block;
+ first_block= block;
+ last_block= prev_block;
+ last_block->next= NULL;
+ free_blocks();
+ }
+ return 0;
+ }
+ prev_block= block;
+ block= block->next;
+ }
+
+ /* Node does not find should never happen */
+ DBUG_ASSERT(0);
+ return 1;
+ }
+
+private:
+ uint reserved_blocks;
+
+ /**
+ A sequence memory which contains BLOCK_TRANX_NODES TranxNodes.
+
+ BLOCK_TRANX_NODES The number of TranxNodes which are in a Block.
+
+ next Every Block has a 'next' pointer which points to the next Block.
+ These linking Blocks constitute a Block link table.
+ */
+ struct Block {
+ Block *next;
+ TranxNode nodes[BLOCK_TRANX_NODES];
+ };
+
+ /**
+ The 'first_block' is the head of the Block link table;
+ */
+ Block *first_block;
+ /**
+ The 'last_block' is the rear of the Block link table;
+ */
+ Block *last_block;
+
+ /**
+ current_block always points the Block in the Block link table in
+ which the last allocated node is. The Blocks before it are all in use
+ and the Blocks after it are all free.
+ */
+ Block *current_block;
+
+ /**
+ It always points to the last node which has been allocated in the
+ current_block.
+ */
+ int last_node;
+
+ /**
+ How many Blocks are in the Block link table.
+ */
+ uint block_num;
+
+ /**
+ Allocate a block and then assign it to current_block.
+ */
+ int allocate_block()
+ {
+ Block *block= (Block *)my_malloc(sizeof(Block), MYF(0));
+ if (block)
+ {
+ block->next= NULL;
+
+ if (first_block == NULL)
+ first_block= block;
+ else
+ last_block->next= block;
+
+ /* New Block is always put into the rear */
+ last_block= block;
+ /* New Block is always the current_block */
+ current_block= block;
+ ++block_num;
+ return 0;
+ }
+ return 1;
+ }
+
+ /**
+ Free a given Block.
+ @param block The Block will be freed.
+ */
+ void free_block(Block *block)
+ {
+ my_free(block);
+ --block_num;
+ }
+
+
+ /**
+ If there are some free Blocks and the total number of the Blocks in the
+ Block link table is larger than the 'reserved_blocks', Some free Blocks
+ will be freed until the total number of the Blocks is equal to the
+ 'reserved_blocks' or there is only one free Block behind the
+ 'current_block'.
+ */
+ void free_blocks()
+ {
+ if (current_block == NULL || current_block->next == NULL)
+ return;
+
+ /* One free Block is always kept behind the current block */
+ Block *block= current_block->next->next;
+ while (block_num > reserved_blocks && block != NULL)
+ {
+ Block *next= block->next;
+ free_block(block);
+ block= next;
+ }
+ current_block->next->next= block;
+ if (block == NULL)
+ last_block= current_block->next;
+ }
+};
+
+/**
+ This class manages memory for active transaction list.
+
+ We record each active transaction with a TranxNode, each session
+ can have only one open transaction. Because of EVENT, the total
+ active transaction nodes can exceed the maximum allowed
+ connections.
+*/
+class ActiveTranx
+ :public Trace {
+private:
+
+ TranxNodeAllocator allocator_;
+ /* These two record the active transaction list in sort order. */
+ TranxNode *trx_front_, *trx_rear_;
+
+ TranxNode **trx_htb_; /* A hash table on active transactions. */
+
+ int num_entries_; /* maximum hash table entries */
+ mysql_mutex_t *lock_; /* mutex lock */
+
+ inline void assert_lock_owner();
+
+ inline unsigned int calc_hash(const unsigned char *key,unsigned int length);
+ unsigned int get_hash_value(const char *log_file_name, my_off_t log_file_pos);
+
+ int compare(const char *log_file_name1, my_off_t log_file_pos1,
+ const TranxNode *node2) {
+ return compare(log_file_name1, log_file_pos1,
+ node2->log_name_, node2->log_pos_);
+ }
+ int compare(const TranxNode *node1,
+ const char *log_file_name2, my_off_t log_file_pos2) {
+ return compare(node1->log_name_, node1->log_pos_,
+ log_file_name2, log_file_pos2);
+ }
+ int compare(const TranxNode *node1, const TranxNode *node2) {
+ return compare(node1->log_name_, node1->log_pos_,
+ node2->log_name_, node2->log_pos_);
+ }
+
+public:
+ ActiveTranx(mysql_mutex_t *lock, unsigned long trace_level);
+ ~ActiveTranx();
+
+ /* Insert an active transaction node with the specified position.
+ *
+ * Return:
+ * 0: success; non-zero: error
+ */
+ int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
+
+ /* Clear the active transaction nodes until(inclusive) the specified
+ * position.
+ * If log_file_name is NULL, everything will be cleared: the sorted
+ * list and the hash table will be reset to empty.
+ *
+ * Return:
+ * 0: success; non-zero: error
+ */
+ int clear_active_tranx_nodes(const char *log_file_name,
+ my_off_t log_file_pos);
+
+ /* Given a position, check to see whether the position is an active
+ * transaction's ending position by probing the hash table.
+ */
+ bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos);
+
+ /* Given two binlog positions, compare which one is bigger based on
+ * (file_name, file_position).
+ */
+ static int compare(const char *log_file_name1, my_off_t log_file_pos1,
+ const char *log_file_name2, my_off_t log_file_pos2);
+
+};
+
+/**
+ The extension class for the master of semi-synchronous replication
+*/
+class ReplSemiSyncMaster
+ :public ReplSemiSyncBase {
+ private:
+ ActiveTranx *active_tranxs_; /* active transaction list: the list will
+ be cleared when semi-sync switches off. */
+
+ /* True when initObject has been called */
+ bool init_done_;
+
+ /* This cond variable is signaled when enough binlog has been sent to slave,
+ * so that a waiting trx can return the 'ok' to the client for a commit.
+ */
+ mysql_cond_t COND_binlog_send;
+
+ /* Mutex that protects the following state variables and the active
+ * transaction list.
+ * Under no cirumstances we can acquire mysql_bin_log.LOCK_log if we are
+ * already holding LOCK_binlog_ because it can cause deadlocks.
+ */
+ mysql_mutex_t LOCK_binlog;
+
+ /* This is set to true when reply_file_name_ contains meaningful data. */
+ bool reply_file_name_inited_;
+
+ /* The binlog name up to which we have received replies from any slaves. */
+ char reply_file_name_[FN_REFLEN];
+
+ /* The position in that file up to which we have the reply from any slaves. */
+ my_off_t reply_file_pos_;
+
+ /* This is set to true when we know the 'smallest' wait position. */
+ bool wait_file_name_inited_;
+
+ /* NULL, or the 'smallest' filename that a transaction is waiting for
+ * slave replies.
+ */
+ char wait_file_name_[FN_REFLEN];
+
+ /* The smallest position in that file that a trx is waiting for: the trx
+ * can proceed and send an 'ok' to the client when the master has got the
+ * reply from the slave indicating that it already got the binlog events.
+ */
+ my_off_t wait_file_pos_;
+
+ /* This is set to true when we know the 'largest' transaction commit
+ * position in the binlog file.
+ * We always maintain the position no matter whether semi-sync is switched
+ * on switched off. When a transaction wait timeout occurs, semi-sync will
+ * switch off. Binlog-dump thread can use the three fields to detect when
+ * slaves catch up on replication so that semi-sync can switch on again.
+ */
+ bool commit_file_name_inited_;
+
+ /* The 'largest' binlog filename that a commit transaction is seeing. */
+ char commit_file_name_[FN_REFLEN];
+
+ /* The 'largest' position in that file that a commit transaction is seeing. */
+ my_off_t commit_file_pos_;
+
+ /* All global variables which can be set by parameters. */
+ volatile bool master_enabled_; /* semi-sync is enabled on the master */
+ unsigned long wait_timeout_; /* timeout period(ms) during tranx wait */
+
+ bool state_; /* whether semi-sync is switched */
+
+ void lock();
+ void unlock();
+ void cond_broadcast();
+ int cond_timewait(struct timespec *wait_time);
+
+ /* Is semi-sync replication on? */
+ bool is_on() {
+ return (state_);
+ }
+
+ void set_master_enabled(bool enabled) {
+ master_enabled_ = enabled;
+ }
+
+ /* Switch semi-sync off because of timeout in transaction waiting. */
+ int switch_off();
+
+ /* Switch semi-sync on when slaves catch up. */
+ int try_switch_on(int server_id,
+ const char *log_file_name, my_off_t log_file_pos);
+
+ public:
+ ReplSemiSyncMaster();
+ ~ReplSemiSyncMaster() {}
+
+ void cleanup();
+
+ bool getMasterEnabled() {
+ return master_enabled_;
+ }
+ void setTraceLevel(unsigned long trace_level) {
+ trace_level_ = trace_level;
+ if (active_tranxs_)
+ active_tranxs_->trace_level_ = trace_level;
+ }
+
+ /* Set the transaction wait timeout period, in milliseconds. */
+ void setWaitTimeout(unsigned long wait_timeout) {
+ wait_timeout_ = wait_timeout;
+ }
+
+ /* Initialize this class after MySQL parameters are initialized. this
+ * function should be called once at bootstrap time.
+ */
+ int initObject();
+
+ /* Enable the object to enable semi-sync replication inside the master. */
+ int enableMaster();
+
+ /* Enable the object to enable semi-sync replication inside the master. */
+ int disableMaster();
+
+ /* Add a semi-sync replication slave */
+ void add_slave();
+
+ /* Remove a semi-sync replication slave */
+ void remove_slave();
+
+ /* Is the slave servered by the thread requested semi-sync */
+ bool is_semi_sync_slave();
+
+ /* In semi-sync replication, reports up to which binlog position we have
+ * received replies from the slave indicating that it already get the events.
+ *
+ * Input:
+ * server_id - (IN) master server id number
+ * log_file_name - (IN) binlog file name
+ * end_offset - (IN) the offset in the binlog file up to which we have
+ * the replies from the slave
+ *
+ * Return:
+ * 0: success; non-zero: error
+ */
+ int reportReplyBinlog(uint32 server_id,
+ const char* log_file_name,
+ my_off_t end_offset);
+
+ /* Commit a transaction in the final step. This function is called from
+ * InnoDB before returning from the low commit. If semi-sync is switch on,
+ * the function will wait to see whether binlog-dump thread get the reply for
+ * the events of the transaction. Remember that this is not a direct wait,
+ * instead, it waits to see whether the binlog-dump thread has reached the
+ * point. If the wait times out, semi-sync status will be switched off and
+ * all other transaction would not wait either.
+ *
+ * Input: (the transaction events' ending binlog position)
+ * trx_wait_binlog_name - (IN) ending position's file name
+ * trx_wait_binlog_pos - (IN) ending position's file offset
+ *
+ * Return:
+ * 0: success; non-zero: error
+ */
+ int commitTrx(const char* trx_wait_binlog_name,
+ my_off_t trx_wait_binlog_pos);
+
+ /* Reserve space in the replication event packet header:
+ * . slave semi-sync off: 1 byte - (0)
+ * . slave semi-sync on: 3 byte - (0, 0xef, 0/1}
+ *
+ * Input:
+ * header - (IN) the header buffer
+ * size - (IN) size of the header buffer
+ *
+ * Return:
+ * size of the bytes reserved for header
+ */
+ int reserveSyncHeader(unsigned char *header, unsigned long size);
+
+ /* Update the sync bit in the packet header to indicate to the slave whether
+ * the master will wait for the reply of the event. If semi-sync is switched
+ * off and we detect that the slave is catching up, we switch semi-sync on.
+ *
+ * Input:
+ * packet - (IN) the packet containing the replication event
+ * log_file_name - (IN) the event ending position's file name
+ * log_file_pos - (IN) the event ending position's file offset
+ * server_id - (IN) master server id number
+ *
+ * Return:
+ * 0: success; non-zero: error
+ */
+ int updateSyncHeader(unsigned char *packet,
+ const char *log_file_name,
+ my_off_t log_file_pos,
+ uint32 server_id);
+
+ /* Called when a transaction finished writing binlog events.
+ * . update the 'largest' transactions' binlog event position
+ * . insert the ending position in the active transaction list if
+ * semi-sync is on
+ *
+ * Input: (the transaction events' ending binlog position)
+ * log_file_name - (IN) transaction ending position's file name
+ * log_file_pos - (IN) transaction ending position's file offset
+ *
+ * Return:
+ * 0: success; non-zero: error
+ */
+ int writeTranxInBinlog(const char* log_file_name, my_off_t log_file_pos);
+
+ /* Read the slave's reply so that we know how much progress the slave makes
+ * on receive replication events.
+ *
+ * Input:
+ * net - (IN) the connection to master
+ * server_id - (IN) master server id number
+ * event_buf - (IN) pointer to the event packet
+ *
+ * Return:
+ * 0: success; non-zero: error
+ */
+ int readSlaveReply(NET *net, uint32 server_id, const char *event_buf);
+
+ /* Export internal statistics for semi-sync replication. */
+ void setExportStats();
+
+ /* 'reset master' command is issued from the user and semi-sync need to
+ * go off for that.
+ */
+ int resetMaster();
+};
+
+enum rpl_semi_sync_master_wait_point_t {
+ SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC,
+ SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT,
+};
+
+/* System and status variables for the master component */
+extern my_bool rpl_semi_sync_master_enabled;
+extern my_bool rpl_semi_sync_master_status;
+extern ulong rpl_semi_sync_master_wait_point;
+extern ulong rpl_semi_sync_master_clients;
+extern ulong rpl_semi_sync_master_timeout;
+extern ulong rpl_semi_sync_master_trace_level;
+extern ulong rpl_semi_sync_master_yes_transactions;
+extern ulong rpl_semi_sync_master_no_transactions;
+extern ulong rpl_semi_sync_master_off_times;
+extern ulong rpl_semi_sync_master_wait_timeouts;
+extern ulong rpl_semi_sync_master_timefunc_fails;
+extern ulong rpl_semi_sync_master_num_timeouts;
+extern ulong rpl_semi_sync_master_wait_sessions;
+extern ulong rpl_semi_sync_master_wait_pos_backtraverse;
+extern ulong rpl_semi_sync_master_avg_trx_wait_time;
+extern ulong rpl_semi_sync_master_avg_net_wait_time;
+extern ulonglong rpl_semi_sync_master_net_wait_num;
+extern ulonglong rpl_semi_sync_master_trx_wait_num;
+extern ulonglong rpl_semi_sync_master_net_wait_time;
+extern ulonglong rpl_semi_sync_master_trx_wait_time;
+
+/*
+ This indicates whether we should keep waiting if no semi-sync slave
+ is available.
+ 0 : stop waiting if detected no avaialable semi-sync slave.
+ 1 (default) : keep waiting until timeout even no available semi-sync slave.
+*/
+extern char rpl_semi_sync_master_wait_no_slave;
+extern ReplSemiSyncMaster repl_semisync_master;
+
+int semi_sync_master_init();
+void semi_sync_master_deinit();
+
+#endif /* SEMISYNC_MASTER_H */
diff --git a/sql/semisync_slave.cc b/sql/semisync_slave.cc
new file mode 100644
index 00000000000..63bf9dca0e8
--- /dev/null
+++ b/sql/semisync_slave.cc
@@ -0,0 +1,288 @@
+/* Copyright (c) 2008 MySQL AB, 2009 Sun Microsystems, Inc.
+ Use is subject to license terms.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+
+#include <my_global.h>
+#include "semisync_slave.h"
+
+my_bool rpl_semi_sync_slave_enabled;
+my_bool rpl_semi_sync_slave_status= 0;
+ulong rpl_semi_sync_slave_trace_level;
+ReplSemiSyncSlave repl_semisync_slave;
+
+/*
+ indicate whether or not the slave should send a reply to the master.
+
+ This is set to true in repl_semi_slave_read_event if the current
+ event read is the last event of a transaction. And the value is
+ checked in repl_semi_slave_queue_event.
+*/
+bool semi_sync_need_reply= false;
+
+
+int ReplSemiSyncSlave::initObject()
+{
+ int result= 0;
+ const char *kWho = "ReplSemiSyncSlave::initObject";
+
+ if (init_done_)
+ {
+ fprintf(stderr, "%s called twice\n", kWho);
+ return 1;
+ }
+ init_done_ = true;
+
+ /* References to the parameter works after set_options(). */
+ setSlaveEnabled(rpl_semi_sync_slave_enabled);
+ setTraceLevel(rpl_semi_sync_slave_trace_level);
+
+ return result;
+}
+
+int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
+ unsigned long total_len,
+ bool *need_reply,
+ const char **payload,
+ unsigned long *payload_len)
+{
+ const char *kWho = "ReplSemiSyncSlave::slaveReadSyncHeader";
+ int read_res = 0;
+ function_enter(kWho);
+
+ if ((unsigned char)(header[0]) == kPacketMagicNum)
+ {
+ *need_reply = (header[1] & kPacketFlagSync);
+ *payload_len = total_len - 2;
+ *payload = header + 2;
+
+ if (trace_level_ & kTraceDetail)
+ sql_print_information("%s: reply - %d", kWho, *need_reply);
+ }
+ else
+ {
+ sql_print_error("Missing magic number for semi-sync packet, packet "
+ "len: %lu", total_len);
+ read_res = -1;
+ }
+
+ return function_exit(kWho, read_res);
+}
+
+int ReplSemiSyncSlave::slaveStart(Binlog_relay_IO_param *param)
+{
+ bool semi_sync= getSlaveEnabled();
+
+ sql_print_information("Slave I/O thread: Start %s replication to\
+ master '%s@%s:%d' in log '%s' at position %lu",
+ semi_sync ? "semi-sync" : "asynchronous",
+ param->user, param->host, param->port,
+ param->master_log_name[0] ? param->master_log_name : "FIRST",
+ (unsigned long)param->master_log_pos);
+
+ if (semi_sync && !rpl_semi_sync_slave_status)
+ rpl_semi_sync_slave_status= 1;
+ return 0;
+}
+
+int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param)
+{
+ if (rpl_semi_sync_slave_status)
+ rpl_semi_sync_slave_status= 0;
+ if (mysql_reply)
+ mysql_close(mysql_reply);
+ mysql_reply= 0;
+ return 0;
+}
+
+int ReplSemiSyncSlave::slaveReply(MYSQL *mysql,
+ const char *binlog_filename,
+ my_off_t binlog_filepos)
+{
+ const char *kWho = "ReplSemiSyncSlave::slaveReply";
+ NET *net= &mysql->net;
+ uchar reply_buffer[REPLY_MAGIC_NUM_LEN
+ + REPLY_BINLOG_POS_LEN
+ + REPLY_BINLOG_NAME_LEN];
+ int reply_res, name_len = strlen(binlog_filename);
+
+ function_enter(kWho);
+
+ /* Prepare the buffer of the reply. */
+ reply_buffer[REPLY_MAGIC_NUM_OFFSET] = kPacketMagicNum;
+ int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos);
+ memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET,
+ binlog_filename,
+ name_len + 1 /* including trailing '\0' */);
+
+ if (trace_level_ & kTraceDetail)
+ sql_print_information("%s: reply (%s, %lu)", kWho,
+ binlog_filename, (ulong)binlog_filepos);
+
+ net_clear(net, 0);
+ /* Send the reply. */
+ reply_res = my_net_write(net, reply_buffer,
+ name_len + REPLY_BINLOG_NAME_OFFSET);
+ if (!reply_res)
+ {
+ reply_res = net_flush(net);
+ if (reply_res)
+ sql_print_error("Semi-sync slave net_flush() reply failed");
+ }
+ else
+ {
+ sql_print_error("Semi-sync slave send reply failed: %s (%d)",
+ net->last_error, net->last_errno);
+ }
+
+ return function_exit(kWho, reply_res);
+}
+
+/***************************************************************************
+ Semisync slave interface setup and deinit
+***************************************************************************/
+
+C_MODE_START
+
+int repl_semi_reset_slave(Binlog_relay_IO_param *param)
+{
+ // TODO: reset semi-sync slave status here
+ return 0;
+}
+
+int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
+ uint32 flags)
+{
+ MYSQL *mysql= param->mysql;
+ MYSQL_RES *res= 0;
+ MYSQL_ROW row;
+ const char *query;
+
+ if (!repl_semisync_slave.getSlaveEnabled())
+ return 0;
+
+ /* Check if master server has semi-sync plugin installed */
+ query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'";
+ if (mysql_real_query(mysql, query, strlen(query)) ||
+ !(res= mysql_store_result(mysql)))
+ {
+ sql_print_error("Execution failed on master: %s", query);
+ return 1;
+ }
+
+ row= mysql_fetch_row(res);
+ if (!row)
+ {
+ /* Master does not support semi-sync */
+ sql_print_warning("Master server does not support semi-sync, "
+ "fallback to asynchronous replication");
+ rpl_semi_sync_slave_status= 0;
+ mysql_free_result(res);
+ return 0;
+ }
+ mysql_free_result(res);
+
+ /*
+ Tell master dump thread that we want to do semi-sync
+ replication
+ */
+ query= "SET @rpl_semi_sync_slave= 1";
+ if (mysql_real_query(mysql, query, strlen(query)))
+ {
+ sql_print_error("Set 'rpl_semi_sync_slave=1' on master failed");
+ return 1;
+ }
+ mysql_free_result(mysql_store_result(mysql));
+ rpl_semi_sync_slave_status= 1;
+ return 0;
+}
+
+int repl_semi_slave_read_event(Binlog_relay_IO_param *param,
+ const char *packet, unsigned long len,
+ const char **event_buf, unsigned long *event_len)
+{
+ if (rpl_semi_sync_slave_status)
+ return repl_semisync_slave.slaveReadSyncHeader(packet, len,
+ &semi_sync_need_reply,
+ event_buf, event_len);
+ *event_buf= packet;
+ *event_len= len;
+ return 0;
+}
+
+int repl_semi_slave_queue_event(Binlog_relay_IO_param *param,
+ const char *event_buf,
+ unsigned long event_len,
+ uint32 flags)
+{
+ if (rpl_semi_sync_slave_status && semi_sync_need_reply)
+ {
+ /*
+ We deliberately ignore the error in slaveReply, such error
+ should not cause the slave IO thread to stop, and the error
+ messages are already reported.
+ */
+ (void) repl_semisync_slave.slaveReply(param->mysql,
+ param->master_log_name,
+ param->master_log_pos);
+ }
+ return 0;
+}
+
+int repl_semi_slave_io_start(Binlog_relay_IO_param *param)
+{
+ return repl_semisync_slave.slaveStart(param);
+}
+
+int repl_semi_slave_io_end(Binlog_relay_IO_param *param)
+{
+ return repl_semisync_slave.slaveStop(param);
+}
+
+C_MODE_END
+
+Binlog_relay_IO_observer relay_io_observer=
+{
+ sizeof(Binlog_relay_IO_observer), // len
+
+ repl_semi_slave_io_start, // start
+ repl_semi_slave_io_end, // stop
+ repl_semi_slave_request_dump, // request_transmit
+ repl_semi_slave_read_event, // after_read_event
+ repl_semi_slave_queue_event, // after_queue_event
+ repl_semi_reset_slave, // reset
+};
+
+static bool semi_sync_slave_inited= 0;
+
+int semi_sync_slave_init()
+{
+ void *p= 0;
+ if (repl_semisync_slave.initObject())
+ return 1;
+ if (register_binlog_relay_io_observer(&relay_io_observer, p))
+ return 1;
+ semi_sync_slave_inited= 1;
+ return 0;
+}
+
+void semi_sync_slave_deinit()
+{
+ void *p= 0;
+ if (!semi_sync_slave_inited)
+ return;
+ unregister_binlog_relay_io_observer(&relay_io_observer, p);
+ semi_sync_slave_inited= 0;
+}
diff --git a/sql/semisync_slave.h b/sql/semisync_slave.h
new file mode 100644
index 00000000000..9cca8bbbdb4
--- /dev/null
+++ b/sql/semisync_slave.h
@@ -0,0 +1,101 @@
+/* Copyright (c) 2006 MySQL AB, 2009 Sun Microsystems, Inc.
+ Use is subject to license terms.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+
+#ifndef SEMISYNC_SLAVE_H
+#define SEMISYNC_SLAVE_H
+
+#include "semisync.h"
+
+/**
+ The extension class for the slave of semi-synchronous replication
+*/
+class ReplSemiSyncSlave
+ :public ReplSemiSyncBase {
+public:
+ ReplSemiSyncSlave()
+ :slave_enabled_(false)
+ {}
+ ~ReplSemiSyncSlave() {}
+
+ void setTraceLevel(unsigned long trace_level) {
+ trace_level_ = trace_level;
+ }
+
+ /* Initialize this class after MySQL parameters are initialized. this
+ * function should be called once at bootstrap time.
+ */
+ int initObject();
+
+ bool getSlaveEnabled() {
+ return slave_enabled_;
+ }
+ void setSlaveEnabled(bool enabled) {
+ slave_enabled_ = enabled;
+ }
+
+ /* A slave reads the semi-sync packet header and separate the metadata
+ * from the payload data.
+ *
+ * Input:
+ * header - (IN) packet header pointer
+ * total_len - (IN) total packet length: metadata + payload
+ * need_reply - (IN) whether the master is waiting for the reply
+ * payload - (IN) payload: the replication event
+ * payload_len - (IN) payload length
+ *
+ * Return:
+ * 0: success; non-zero: error
+ */
+ int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply,
+ const char **payload, unsigned long *payload_len);
+
+ /* A slave replies to the master indicating its replication process. It
+ * indicates that the slave has received all events before the specified
+ * binlog position.
+ *
+ * Input:
+ * mysql - (IN) the mysql network connection
+ * binlog_filename - (IN) the reply point's binlog file name
+ * binlog_filepos - (IN) the reply point's binlog file offset
+ *
+ * Return:
+ * 0: success; non-zero: error
+ */
+ int slaveReply(MYSQL *mysql, const char *binlog_filename,
+ my_off_t binlog_filepos);
+
+ int slaveStart(Binlog_relay_IO_param *param);
+ int slaveStop(Binlog_relay_IO_param *param);
+
+private:
+ /* True when initObject has been called */
+ bool init_done_;
+ bool slave_enabled_; /* semi-sycn is enabled on the slave */
+ MYSQL *mysql_reply; /* connection to send reply */
+};
+
+
+/* System and status variables for the slave component */
+extern my_bool rpl_semi_sync_slave_enabled;
+extern my_bool rpl_semi_sync_slave_status;
+extern ulong rpl_semi_sync_slave_trace_level;
+extern ReplSemiSyncSlave repl_semisync_slave;
+
+int semi_sync_slave_init();
+void semi_sync_slave_deinit();
+
+#endif /* SEMISYNC_SLAVE_H */
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 82f8359fefc..e897b6c21ce 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -61,6 +61,8 @@
#include "sql_repl.h"
#include "opt_range.h"
#include "rpl_parallel.h"
+#include "semisync_master.h"
+#include "semisync_slave.h"
#include <ssl_compat.h>
/*
@@ -3039,8 +3041,188 @@ static Sys_var_replicate_events_marked_for_skip Replicate_events_marked_for_skip
"the slave).",
GLOBAL_VAR(opt_replicate_events_marked_for_skip), CMD_LINE(REQUIRED_ARG),
replicate_events_marked_for_skip_names, DEFAULT(RPL_SKIP_REPLICATE));
+
+/* new options for semisync */
+
+static bool fix_rpl_semi_sync_master_enabled(sys_var *self, THD *thd,
+ enum_var_type type)
+{
+ if (rpl_semi_sync_master_enabled)
+ {
+ if (repl_semisync_master.enableMaster() != 0)
+ rpl_semi_sync_master_enabled= false;
+#ifdef HAVE_ACC_RECEIVER
+ else if (ack_receiver.start())
+ {
+ repl_semisync_master.disableMaster();
+ rpl_semi_sync_master_enabled= false;
+ }
+#endif
+ }
+ else
+ {
+ if (repl_semisync_master.disableMaster() != 0)
+ rpl_semi_sync_master_enabled= true;
+#ifdef HAVE_ACC_RECEIVER
+ if (!rpl_semi_sync_master_enabled)
+ ack_receiver.stop();
+#endif
+ }
+ return false;
+}
+
+static bool fix_rpl_semi_sync_master_timeout(sys_var *self, THD *thd,
+ enum_var_type type)
+{
+ repl_semisync_master.setWaitTimeout(rpl_semi_sync_master_timeout);
+ return false;
+}
+
+static bool fix_rpl_semi_sync_master_trace_level(sys_var *self, THD *thd,
+ enum_var_type type)
+{
+ repl_semisync_master.setTraceLevel(rpl_semi_sync_master_trace_level);
+#ifdef HAVE_ACC_RECEIVER
+ ack_receiver.setTraceLevel(rpl_semi_sync_master_trace_level);
+#endif
+ return false;
+}
+
+static bool fix_rpl_semi_sync_master_wait_point(sys_var *self, THD *thd,
+ enum_var_type type)
+{
+#ifdef HAVE_ACC_RECEIVER
+ repl_semisync_master.setWaitPoint(rpl_semi_sync_master_wait_point);
#endif
+ return false;
+}
+
+static bool fix_rpl_semi_sync_master_wait_no_slave(sys_var *self, THD *thd,
+ enum_var_type type)
+{
+#ifdef HAVE_ACC_RECEIVER
+ repl_semisync_master.checkAndSwitch();
+#endif
+ return false;
+}
+
+static Sys_var_mybool Sys_semisync_master_enabled(
+ "rpl_semi_sync_master_enabled",
+ "Enable semi-synchronous replication master (disabled by default).",
+ GLOBAL_VAR(rpl_semi_sync_master_enabled),
+ CMD_LINE(OPT_ARG), DEFAULT(FALSE),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(fix_rpl_semi_sync_master_enabled));
+
+static Sys_var_ulong Sys_semisync_master_timeout(
+ "rpl_semi_sync_master_timeout",
+ "The timeout value (in ms) for semi-synchronous replication in the "
+ "master",
+ GLOBAL_VAR(rpl_semi_sync_master_timeout),
+ CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(0,~0L),DEFAULT(10000),BLOCK_SIZE(1),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(fix_rpl_semi_sync_master_timeout));
+static Sys_var_mybool Sys_semisync_master_wait_no_slave(
+ "rpl_semi_sync_master_wait_no_slave",
+ "Wait until timeout when no semi-synchronous replication slave "
+ "available (enabled by default).",
+ GLOBAL_VAR(rpl_semi_sync_master_wait_no_slave),
+ CMD_LINE(OPT_ARG), DEFAULT(TRUE),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(fix_rpl_semi_sync_master_wait_no_slave));
+
+static Sys_var_ulong Sys_semisync_master_trace_level(
+ "rpl_semi_sync_master_trace_level",
+ "The tracing level for semi-sync replication.",
+ GLOBAL_VAR(rpl_semi_sync_master_trace_level),
+ CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(0,~0L),DEFAULT(32),BLOCK_SIZE(1),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(fix_rpl_semi_sync_master_trace_level));
+
+static const char *repl_semisync_wait_point[]=
+{"AFTER_SYNC", "AFTER_COMMIT", NullS};
+
+static Sys_var_enum Sys_semisync_master_wait_point(
+ "rpl_semi_sync_master_wait_point",
+ "Should transaction wait for semi-sync ack after having synced binlog, "
+ "or after having committed in storage engine.",
+ GLOBAL_VAR(rpl_semi_sync_master_wait_point), CMD_LINE(REQUIRED_ARG),
+ repl_semisync_wait_point, DEFAULT(1),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG,ON_CHECK(0),
+ ON_UPDATE(fix_rpl_semi_sync_master_wait_point));
+
+static bool fix_rpl_semi_sync_slave_enabled(sys_var *self, THD *thd,
+ enum_var_type type)
+{
+ repl_semisync_slave.setSlaveEnabled(rpl_semi_sync_slave_enabled != 0);
+ return false;
+}
+
+static bool fix_rpl_semi_sync_slave_trace_level(sys_var *self, THD *thd,
+ enum_var_type type)
+{
+ repl_semisync_slave.setTraceLevel(rpl_semi_sync_slave_trace_level);
+ return false;
+}
+
+#ifdef HAVE_ACC_RECEIVER
+static bool fix_rpl_semi_sync_slave_delay_master(sys_var *self, THD *thd,
+ enum_var_type type)
+{
+ repl_semisync_slave.setDelayMaster(rpl_semi_sync_slave_delay_master);
+ return false;
+}
+
+static bool fix_rpl_semi_sync_slave_kill_conn_timeout(sys_var *self, THD *thd,
+ enum_var_type type)
+{
+ repl_semisync_slave.setKillConnTimeout(rpl_semi_sync_slave_kill_conn_timeout);
+ return false;
+}
+#endif
+
+
+static Sys_var_mybool Sys_semisync_slave_enabled(
+ "rpl_semi_sync_slave_enabled",
+ "Enable semi-synchronous replication slave (disabled by default).",
+ GLOBAL_VAR(rpl_semi_sync_slave_enabled),
+ CMD_LINE(OPT_ARG), DEFAULT(FALSE),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(fix_rpl_semi_sync_slave_enabled));
+
+static Sys_var_ulong Sys_semisync_slave_trace_level(
+ "rpl_semi_sync_slave_trace_level",
+ "The tracing level for semi-sync replication.",
+ GLOBAL_VAR(rpl_semi_sync_slave_trace_level),
+ CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(0,~0L),DEFAULT(32),BLOCK_SIZE(1),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(fix_rpl_semi_sync_slave_trace_level));
+
+#ifdef HAVE_ACC_RECEIVER
+static Sys_var_mybool Sys_semisync_slave_delay_master(
+ "rpl_semi_sync_slave_delay_master",
+ "Only write master info file when ack is needed.",
+ GLOBAL_VAR(rpl_semi_sync_slave_delay_master),
+ CMD_LINE(OPT_ARG), DEFAULT(FALSE),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(fix_rpl_semi_sync_slave_delay_master));
+
+static Sys_var_uint Sys_semisync_slave_kill_conn_timeout(
+ "rpl_semi_sync_slave_kill_conn_timeout",
+ "Timeout for the mysql connection used to kill the slave io_thread's "
+ "connection on master. This timeout comes into play when stop slave "
+ "is executed.",
+ GLOBAL_VAR(rpl_semi_sync_slave_kill_conn_timeout),
+ CMD_LINE(OPT_ARG),
+ VALID_RANGE(0, UINT_MAX), DEFAULT(5), BLOCK_SIZE(1),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(fix_rpl_semi_sync_slave_kill_conn_timeout));
+#endif
+#endif /* HAVE_REPLICATION */
static Sys_var_ulong Sys_slow_launch_time(
"slow_launch_time",