diff options
author | Monty <monty@mariadb.org> | 2017-10-25 11:07:44 +0300 |
---|---|---|
committer | Monty <monty@mariadb.org> | 2017-12-18 13:43:36 +0200 |
commit | 2e53b96a0aa9dcb18d8bbe12e5bc7e0aba208540 (patch) | |
tree | 112b522a5ae3a3132266052e38edcdac0a919e1a /sql | |
parent | 77030649fb1f492b6dd9351a7d4b36e1aeb29f4d (diff) | |
download | mariadb-git-2e53b96a0aa9dcb18d8bbe12e5bc7e0aba208540.tar.gz |
Moved semisync from a plugin to normal server
Part of MDEV-13073 AliSQL Optimize performance of semisync
Did the following renames to match other similar variables
key_ss_mutex_LOCK_binlog_ > key_LOCK_bing
key_ss_cond_COND_binlog_send_ -> key_COND_binlog_send
COND_binlog_send_ -> COND_binlog_send
LOCK_binlog_ -> LOCK_binlog
debian/mariadb-server-10.2.install does not install semisync libs.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/CMakeLists.txt | 3 | ||||
-rw-r--r-- | sql/mysqld.cc | 66 | ||||
-rw-r--r-- | sql/replication.h | 14 | ||||
-rw-r--r-- | sql/rpl_handler.cc | 8 | ||||
-rw-r--r-- | sql/semisync.cc | 32 | ||||
-rw-r--r-- | sql/semisync.h | 83 | ||||
-rw-r--r-- | sql/semisync_master.cc | 1430 | ||||
-rw-r--r-- | sql/semisync_master.h | 636 | ||||
-rw-r--r-- | sql/semisync_slave.cc | 288 | ||||
-rw-r--r-- | sql/semisync_slave.h | 101 | ||||
-rw-r--r-- | sql/sys_vars.cc | 182 |
11 files changed, 2827 insertions, 16 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 0f67032bcbe..6c63f3feca3 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -36,7 +36,7 @@ ELSE() ENDIF() INCLUDE_DIRECTORIES( -${CMAKE_SOURCE_DIR}/include +${CMAKE_SOURCE_DIR}/include ${CMAKE_SOURCE_DIR}/sql ${PCRE_INCLUDES} ${ZLIB_INCLUDE_DIR} @@ -138,6 +138,7 @@ SET (SQL_SOURCE my_apc.cc mf_iocache_encr.cc item_jsonfunc.cc my_json_writer.cc rpl_gtid.cc rpl_parallel.cc + semisync.cc semisync_master.cc semisync_slave.cc sql_type.cc item_windowfunc.cc sql_window.cc sql_cte.cc diff --git a/sql/mysqld.cc b/sql/mysqld.cc index f3cb39959a7..fc783ae5559 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -97,8 +97,9 @@ #include "set_var.h" #include "rpl_injector.h" - #include "rpl_handler.h" +#include "semisync_master.h" +#include "semisync_slave.h" #ifdef HAVE_SYS_PRCTL_H #include <sys/prctl.h> @@ -934,6 +935,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, PSI_mutex_key key_RELAYLOG_LOCK_index; PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state, key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool, key_LOCK_parallel_entry; +PSI_mutex_key key_LOCK_binlog; PSI_mutex_key key_LOCK_stats, key_LOCK_global_user_client_stats, key_LOCK_global_table_stats, @@ -1021,7 +1023,8 @@ static PSI_mutex_info all_server_mutexes[]= { &key_LOCK_binlog_state, "LOCK_binlog_state", 0}, { &key_LOCK_rpl_thread, "LOCK_rpl_thread", 0}, { &key_LOCK_rpl_thread_pool, "LOCK_rpl_thread_pool", 0}, - { &key_LOCK_parallel_entry, "LOCK_parallel_entry", 0} + { &key_LOCK_parallel_entry, "LOCK_parallel_entry", 0}, + { &key_LOCK_binlog, "LOCK_binlog", 0} }; PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger, @@ -1062,7 +1065,7 @@ PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond, key_rpl_group_info_sleep_cond, key_TABLE_SHARE_cond, key_user_level_lock_cond, key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache, - key_COND_start_thread, + key_COND_start_thread, key_COND_binlog_send, key_BINLOG_COND_queue_busy; PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready, key_COND_wait_commit; @@ -1124,7 +1127,8 @@ static PSI_cond_info all_server_conds[]= { &key_COND_slave_background, "COND_slave_background", 0}, { &key_COND_start_thread, "COND_start_thread", PSI_FLAG_GLOBAL}, { &key_COND_wait_gtid, "COND_wait_gtid", 0}, - { &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0} + { &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0}, + { &key_COND_binlog_send, "COND_binlog_send", 0} }; PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert, @@ -2217,6 +2221,10 @@ void clean_up(bool print_message) ha_end(); if (tc_log) tc_log->close(); +#ifdef HAVE_REPLICATION + semi_sync_master_deinit(); + semi_sync_slave_deinit(); +#endif delegates_destroy(); xid_cache_free(); tdc_deinit(); @@ -5170,6 +5178,9 @@ static int init_server_components() "this server. However this will be ignored as the " "--log-bin option is not defined."); } + + semi_sync_master_init(); + semi_sync_slave_init(); #endif if (opt_bin_log) @@ -8230,6 +8241,27 @@ static int show_ssl_get_cipher_list(THD *thd, SHOW_VAR *var, char *buff, return 0; } +#define SHOW_FNAME(name) \ + rpl_semi_sync_master_show_##name + +#define DEF_SHOW_FUNC(name, show_type) \ + static int SHOW_FNAME(name)(MYSQL_THD thd, SHOW_VAR *var, char *buff) \ + { \ + repl_semisync_master.setExportStats(); \ + var->type= show_type; \ + var->value= (char *)&rpl_semi_sync_master_##name; \ + return 0; \ + } + +DEF_SHOW_FUNC(status, SHOW_BOOL) +DEF_SHOW_FUNC(clients, SHOW_LONG) +DEF_SHOW_FUNC(wait_sessions, SHOW_LONG) +DEF_SHOW_FUNC(trx_wait_time, SHOW_LONGLONG) +DEF_SHOW_FUNC(trx_wait_num, SHOW_LONGLONG) +DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG) +DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG) +DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG) +DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG) #ifdef HAVE_YASSL @@ -8548,6 +8580,30 @@ SHOW_VAR status_vars[]= { {"Rows_sent", (char*) offsetof(STATUS_VAR, rows_sent), SHOW_LONGLONG_STATUS}, {"Rows_read", (char*) offsetof(STATUS_VAR, rows_read), SHOW_LONGLONG_STATUS}, {"Rows_tmp_read", (char*) offsetof(STATUS_VAR, rows_tmp_read), SHOW_LONGLONG_STATUS}, +#ifdef HAVE_REPLICATION + {"Rpl_semi_sync_master_status", (char*) &SHOW_FNAME(status), SHOW_FUNC}, + {"Rpl_semi_sync_master_clients", (char*) &SHOW_FNAME(clients), SHOW_FUNC}, + {"Rpl_semi_sync_master_yes_tx", (char*) &rpl_semi_sync_master_yes_transactions, SHOW_LONG}, + {"Rpl_semi_sync_master_no_tx", (char*) &rpl_semi_sync_master_no_transactions, SHOW_LONG}, + {"Rpl_semi_sync_master_wait_sessions", (char*) &SHOW_FNAME(wait_sessions), SHOW_FUNC}, + {"Rpl_semi_sync_master_no_times", (char*) &rpl_semi_sync_master_off_times, SHOW_LONG}, + {"Rpl_semi_sync_master_timefunc_failures", (char*) &rpl_semi_sync_master_timefunc_fails, SHOW_LONG}, + {"Rpl_semi_sync_master_wait_pos_backtraverse", (char*) &rpl_semi_sync_master_wait_pos_backtraverse, SHOW_LONG}, + {"Rpl_semi_sync_master_tx_wait_time", (char*) &SHOW_FNAME(trx_wait_time), SHOW_FUNC}, + {"Rpl_semi_sync_master_tx_waits", (char*) &SHOW_FNAME(trx_wait_num), SHOW_FUNC}, + {"Rpl_semi_sync_master_tx_avg_wait_time", (char*) &SHOW_FNAME(avg_trx_wait_time), SHOW_FUNC}, + {"Rpl_semi_sync_master_net_wait_time", (char*) &SHOW_FNAME(net_wait_time), SHOW_FUNC}, + {"Rpl_semi_sync_master_net_waits", (char*) &SHOW_FNAME(net_wait_num), SHOW_FUNC}, + {"Rpl_semi_sync_master_net_avg_wait_time", (char*) &SHOW_FNAME(avg_net_wait_time), SHOW_FUNC}, +#ifdef HAVE_ACC_RECEIVER + {"Rpl_semi_sync_master_request_ack", (char*) &rpl_semi_sync_master_request_ack, SHOW_LONGLONG}, + {"Rpl_semi_sync_master_get_ack", (char*)&rpl_semi_sync_master_get_ack, SHOW_LONGLONG}, +#endif + {"Rpl_semi_sync_slave_status", (char*) &rpl_semi_sync_slave_status, SHOW_BOOL}, +#ifdef HAVE_ACC_RECEIVER + {"Rpl_semi_sync_slave_send_ack", (char*) &rpl_semi_sync_slave_send_ack, SHOW_LONGLONG}, +#endif +#endif /* HAVE_REPLICATION */ #ifdef HAVE_QUERY_CACHE {"Qcache_free_blocks", (char*) &query_cache.free_memory_blocks, SHOW_LONG_NOFLUSH}, {"Qcache_free_memory", (char*) &query_cache.free_memory, SHOW_LONG_NOFLUSH}, @@ -10285,6 +10341,8 @@ PSI_stage_info stage_waiting_for_insert= { 0, "Waiting for INSERT", 0}; PSI_stage_info stage_waiting_for_master_to_send_event= { 0, "Waiting for master to send event", 0}; PSI_stage_info stage_waiting_for_master_update= { 0, "Waiting for master update", 0}; PSI_stage_info stage_waiting_for_relay_log_space= { 0, "Waiting for the slave SQL thread to free enough relay log space", 0}; +PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave= +{ 0, "Waiting for semi-sync ACK from slave", 0}; PSI_stage_info stage_waiting_for_slave_mutex_on_exit= { 0, "Waiting for slave mutex on exit", 0}; PSI_stage_info stage_waiting_for_slave_thread_to_start= { 0, "Waiting for slave thread to start", 0}; PSI_stage_info stage_waiting_for_table_flush= { 0, "Waiting for table flush", 0}; diff --git a/sql/replication.h b/sql/replication.h index 4731c2246ef..d8672310110 100644 --- a/sql/replication.h +++ b/sql/replication.h @@ -18,16 +18,14 @@ /*************************************************************************** NOTE: plugin locking. - This API was created specifically for the semisync plugin and its locking - logic is also matches semisync plugin usage pattern. In particular, a plugin - is locked on Binlog_transmit_observer::transmit_start and is unlocked after - Binlog_transmit_observer::transmit_stop. All other master observable events - happen between these two and don't lock the plugin at all. This works well - for the semisync_master plugin. + + The plugin is locked on Binlog_transmit_observer::transmit_start and is + unlocked after Binlog_transmit_observer::transmit_stop. All other + master observable events happen between these two and don't lock the + plugin at all. Also a plugin is locked on Binlog_relay_IO_observer::thread_start - and unlocked after Binlog_relay_IO_observer::thread_stop. This works well for - the semisync_slave plugin. + and unlocked after Binlog_relay_IO_observer::thread_stop. ***************************************************************************/ #include <mysql.h> diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc index e3ff2a17a6a..27e411ca6de 100644 --- a/sql/rpl_handler.cc +++ b/sql/rpl_handler.cc @@ -149,13 +149,17 @@ void delegates_destroy() { if (transaction_delegate) transaction_delegate->~Trans_delegate(); + transaction_delegate= 0; if (binlog_storage_delegate) binlog_storage_delegate->~Binlog_storage_delegate(); + binlog_storage_delegate= 0; #ifdef HAVE_REPLICATION if (binlog_transmit_delegate) binlog_transmit_delegate->~Binlog_transmit_delegate(); + binlog_transmit_delegate= 0; if (binlog_relay_io_delegate) binlog_relay_io_delegate->~Binlog_relay_IO_delegate(); + binlog_relay_io_delegate= 0; #endif /* HAVE_REPLICATION */ } @@ -171,13 +175,11 @@ void delegates_destroy() Observer_info *info= iter++; \ for (; info; info= iter++) \ { \ - if (do_lock) plugin_lock(thd, plugin_int_to_ref(info->plugin_int)); \ if (((Observer *)info->observer)->f \ && ((Observer *)info->observer)->f args) \ { \ r= 1; \ - sql_print_error("Run function '" #f "' in plugin '%s' failed", \ - info->plugin_int->name.str); \ + sql_print_error("Run function '" #f "' failed"); \ break; \ } \ } \ diff --git a/sql/semisync.cc b/sql/semisync.cc new file mode 100644 index 00000000000..df37f03ec2f --- /dev/null +++ b/sql/semisync.cc @@ -0,0 +1,32 @@ +/* Copyright (C) 2007 Google Inc. + Copyright (C) 2008 MySQL AB + Use is subject to license terms + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#include <my_global.h> +#include "semisync.h" + +const unsigned char ReplSemiSyncBase::kPacketMagicNum = 0xef; +const unsigned char ReplSemiSyncBase::kPacketFlagSync = 0x01; + + +const unsigned long Trace::kTraceGeneral = 0x0001; +const unsigned long Trace::kTraceDetail = 0x0010; +const unsigned long Trace::kTraceNetWait = 0x0020; +const unsigned long Trace::kTraceFunction = 0x0040; + +const unsigned char ReplSemiSyncBase::kSyncHeader[2] = + {ReplSemiSyncBase::kPacketMagicNum, 0}; diff --git a/sql/semisync.h b/sql/semisync.h new file mode 100644 index 00000000000..3142f920f1e --- /dev/null +++ b/sql/semisync.h @@ -0,0 +1,83 @@ +/* Copyright (C) 2007 Google Inc. + Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#ifndef SEMISYNC_H +#define SEMISYNC_H + +#include "mysqld.h" +#include "log_event.h" +#include "replication.h" + +/** + This class is used to trace function calls and other process + information +*/ +class Trace { +public: + static const unsigned long kTraceFunction; + static const unsigned long kTraceGeneral; + static const unsigned long kTraceDetail; + static const unsigned long kTraceNetWait; + + unsigned long trace_level_; /* the level for tracing */ + + inline void function_enter(const char *func_name) + { + if (trace_level_ & kTraceFunction) + sql_print_information("---> %s enter", func_name); + } + inline int function_exit(const char *func_name, int exit_code) + { + if (trace_level_ & kTraceFunction) + sql_print_information("<--- %s exit (%d)", func_name, exit_code); + return exit_code; + } + + Trace() + :trace_level_(0L) + {} + Trace(unsigned long trace_level) + :trace_level_(trace_level) + {} +}; + +/** + Base class for semi-sync master and slave classes +*/ +class ReplSemiSyncBase + :public Trace { +public: + static const unsigned char kSyncHeader[2]; /* three byte packet header */ + + /* Constants in network packet header. */ + static const unsigned char kPacketMagicNum; + static const unsigned char kPacketFlagSync; +}; + +/* The layout of a semisync slave reply packet: + 1 byte for the magic num + 8 bytes for the binlog positon + n bytes for the binlog filename, terminated with a '\0' +*/ +#define REPLY_MAGIC_NUM_LEN 1 +#define REPLY_BINLOG_POS_LEN 8 +#define REPLY_BINLOG_NAME_LEN (FN_REFLEN + 1) +#define REPLY_MAGIC_NUM_OFFSET 0 +#define REPLY_BINLOG_POS_OFFSET (REPLY_MAGIC_NUM_OFFSET + REPLY_MAGIC_NUM_LEN) +#define REPLY_BINLOG_NAME_OFFSET (REPLY_BINLOG_POS_OFFSET + REPLY_BINLOG_POS_LEN) + +#endif /* SEMISYNC_H */ diff --git a/sql/semisync_master.cc b/sql/semisync_master.cc new file mode 100644 index 00000000000..21c52addce9 --- /dev/null +++ b/sql/semisync_master.cc @@ -0,0 +1,1430 @@ +/* Copyright (C) 2007 Google Inc. + Copyright (c) 2008, 2013, Oracle and/or its affiliates. + Copyright (c) 2011, 2016, MariaDB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#include <my_global.h> +#include "semisync_master.h" + +#define TIME_THOUSAND 1000 +#define TIME_MILLION 1000000 +#define TIME_BILLION 1000000000 + +/* This indicates whether semi-synchronous replication is enabled. */ +my_bool rpl_semi_sync_master_enabled; +my_bool rpl_semi_sync_master_wait_no_slave = 1; +my_bool rpl_semi_sync_master_status = 0; +ulong rpl_semi_sync_master_wait_point = + SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT; +ulong rpl_semi_sync_master_timeout; +ulong rpl_semi_sync_master_trace_level; +ulong rpl_semi_sync_master_yes_transactions = 0; +ulong rpl_semi_sync_master_no_transactions = 0; +ulong rpl_semi_sync_master_off_times = 0; +ulong rpl_semi_sync_master_timefunc_fails = 0; +ulong rpl_semi_sync_master_wait_timeouts = 0; +ulong rpl_semi_sync_master_wait_sessions = 0; +ulong rpl_semi_sync_master_wait_pos_backtraverse = 0; +ulong rpl_semi_sync_master_avg_trx_wait_time = 0; +ulonglong rpl_semi_sync_master_trx_wait_num = 0; +ulong rpl_semi_sync_master_avg_net_wait_time = 0; +ulonglong rpl_semi_sync_master_net_wait_num = 0; +ulong rpl_semi_sync_master_clients = 0; +ulonglong rpl_semi_sync_master_net_wait_time = 0; +ulonglong rpl_semi_sync_master_trx_wait_time = 0; + +ReplSemiSyncMaster repl_semisync_master; + +static int getWaitTime(const struct timespec& start_ts); + +static ulonglong timespec_to_usec(const struct timespec *ts) +{ + return (ulonglong) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND; +} + +/******************************************************************************* + * + * <ActiveTranx> class : manage all active transaction nodes + * + ******************************************************************************/ + +ActiveTranx::ActiveTranx(mysql_mutex_t *lock, + ulong trace_level) + : Trace(trace_level), allocator_(max_connections), + num_entries_(max_connections << 1), /* Transaction hash table size + * is set to double the size + * of max_connections */ + lock_(lock) +{ + /* No transactions are in the list initially. */ + trx_front_ = NULL; + trx_rear_ = NULL; + + /* Create the hash table to find a transaction's ending event. */ + trx_htb_ = new TranxNode *[num_entries_]; + for (int idx = 0; idx < num_entries_; ++idx) + trx_htb_[idx] = NULL; + + sql_print_information("Semi-sync replication initialized for transactions."); +} + +ActiveTranx::~ActiveTranx() +{ + delete [] trx_htb_; + trx_htb_ = NULL; + num_entries_ = 0; +} + +unsigned int ActiveTranx::calc_hash(const unsigned char *key, + unsigned int length) +{ + unsigned int nr = 1, nr2 = 4; + + /* The hash implementation comes from calc_hashnr() in mysys/hash.c. */ + while (length--) + { + nr ^= (((nr & 63)+nr2)*((unsigned int) (unsigned char) *key++))+ (nr << 8); + nr2 += 3; + } + return((unsigned int) nr); +} + +unsigned int ActiveTranx::get_hash_value(const char *log_file_name, + my_off_t log_file_pos) +{ + unsigned int hash1 = calc_hash((const unsigned char *)log_file_name, + strlen(log_file_name)); + unsigned int hash2 = calc_hash((const unsigned char *)(&log_file_pos), + sizeof(log_file_pos)); + + return (hash1 + hash2) % num_entries_; +} + +int ActiveTranx::compare(const char *log_file_name1, my_off_t log_file_pos1, + const char *log_file_name2, my_off_t log_file_pos2) +{ + int cmp = strcmp(log_file_name1, log_file_name2); + + if (cmp != 0) + return cmp; + + if (log_file_pos1 > log_file_pos2) + return 1; + else if (log_file_pos1 < log_file_pos2) + return -1; + return 0; +} + +int ActiveTranx::insert_tranx_node(const char *log_file_name, + my_off_t log_file_pos) +{ + const char *kWho = "ActiveTranx:insert_tranx_node"; + TranxNode *ins_node; + int result = 0; + unsigned int hash_val; + + function_enter(kWho); + + ins_node = allocator_.allocate_node(); + if (!ins_node) + { + sql_print_error("%s: transaction node allocation failed for: (%s, %lu)", + kWho, log_file_name, (ulong)log_file_pos); + result = -1; + goto l_end; + } + + /* insert the binlog position in the active transaction list. */ + strncpy(ins_node->log_name_, log_file_name, FN_REFLEN-1); + ins_node->log_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */ + ins_node->log_pos_ = log_file_pos; + + if (!trx_front_) + { + /* The list is empty. */ + trx_front_ = trx_rear_ = ins_node; + } + else + { + int cmp = compare(ins_node, trx_rear_); + if (cmp > 0) + { + /* Compare with the tail first. If the transaction happens later in + * binlog, then make it the new tail. + */ + trx_rear_->next_ = ins_node; + trx_rear_ = ins_node; + } + else + { + /* Otherwise, it is an error because the transaction should hold the + * mysql_bin_log.LOCK_log when appending events. + */ + sql_print_error("%s: binlog write out-of-order, tail (%s, %lu), " + "new node (%s, %lu)", kWho, + trx_rear_->log_name_, (ulong)trx_rear_->log_pos_, + ins_node->log_name_, (ulong)ins_node->log_pos_); + result = -1; + goto l_end; + } + } + + hash_val = get_hash_value(ins_node->log_name_, ins_node->log_pos_); + ins_node->hash_next_ = trx_htb_[hash_val]; + trx_htb_[hash_val] = ins_node; + + if (trace_level_ & kTraceDetail) + sql_print_information("%s: insert (%s, %lu) in entry(%u)", kWho, + ins_node->log_name_, (ulong)ins_node->log_pos_, + hash_val); + + l_end: + return function_exit(kWho, result); +} + +bool ActiveTranx::is_tranx_end_pos(const char *log_file_name, + my_off_t log_file_pos) +{ + const char *kWho = "ActiveTranx::is_tranx_end_pos"; + function_enter(kWho); + + unsigned int hash_val = get_hash_value(log_file_name, log_file_pos); + TranxNode *entry = trx_htb_[hash_val]; + + while (entry != NULL) + { + if (compare(entry, log_file_name, log_file_pos) == 0) + break; + + entry = entry->hash_next_; + } + + if (trace_level_ & kTraceDetail) + sql_print_information("%s: probe (%s, %lu) in entry(%u)", kWho, + log_file_name, (ulong)log_file_pos, hash_val); + + function_exit(kWho, (entry != NULL)); + return (entry != NULL); +} + +int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name, + my_off_t log_file_pos) +{ + const char *kWho = "ActiveTranx::::clear_active_tranx_nodes"; + TranxNode *new_front; + + function_enter(kWho); + + if (log_file_name != NULL) + { + new_front = trx_front_; + + while (new_front) + { + if (compare(new_front, log_file_name, log_file_pos) > 0) + break; + new_front = new_front->next_; + } + } + else + { + /* If log_file_name is NULL, clear everything. */ + new_front = NULL; + } + + if (new_front == NULL) + { + /* No active transaction nodes after the call. */ + + /* Clear the hash table. */ + memset(trx_htb_, 0, num_entries_ * sizeof(TranxNode *)); + allocator_.free_all_nodes(); + + /* Clear the active transaction list. */ + if (trx_front_ != NULL) + { + trx_front_ = NULL; + trx_rear_ = NULL; + } + + if (trace_level_ & kTraceDetail) + sql_print_information("%s: cleared all nodes", kWho); + } + else if (new_front != trx_front_) + { + TranxNode *curr_node, *next_node; + + /* Delete all transaction nodes before the confirmation point. */ + int n_frees = 0; + curr_node = trx_front_; + while (curr_node != new_front) + { + next_node = curr_node->next_; + n_frees++; + + /* Remove the node from the hash table. */ + unsigned int hash_val = get_hash_value(curr_node->log_name_, curr_node->log_pos_); + TranxNode **hash_ptr = &(trx_htb_[hash_val]); + while ((*hash_ptr) != NULL) + { + if ((*hash_ptr) == curr_node) + { + (*hash_ptr) = curr_node->hash_next_; + break; + } + hash_ptr = &((*hash_ptr)->hash_next_); + } + + curr_node = next_node; + } + + trx_front_ = new_front; + allocator_.free_nodes_before(trx_front_); + + if (trace_level_ & kTraceDetail) + sql_print_information("%s: cleared %d nodes back until pos (%s, %lu)", + kWho, n_frees, + trx_front_->log_name_, (ulong)trx_front_->log_pos_); + } + + return function_exit(kWho, 0); +} + + +/******************************************************************************* + * + * <ReplSemiSyncMaster> class: the basic code layer for sync-replication master. + * <ReplSemiSyncSlave> class: the basic code layer for sync-replication slave. + * + * The most important functions during semi-syn replication listed: + * + * Master: + * . reportReplyBinlog(): called by the binlog dump thread when it receives + * the slave's status information. + * . updateSyncHeader(): based on transaction waiting information, decide + * whether to request the slave to reply. + * . writeTranxInBinlog(): called by the transaction thread when it finishes + * writing all transaction events in binlog. + * . commitTrx(): transaction thread wait for the slave reply. + * + * Slave: + * . slaveReadSyncHeader(): read the semi-sync header from the master, get the + * sync status and get the payload for events. + * . slaveReply(): reply to the master about the replication progress. + * + ******************************************************************************/ + +ReplSemiSyncMaster::ReplSemiSyncMaster() + : active_tranxs_(NULL), + init_done_(false), + reply_file_name_inited_(false), + reply_file_pos_(0L), + wait_file_name_inited_(false), + wait_file_pos_(0), + master_enabled_(false), + wait_timeout_(0L), + state_(0) +{ + strcpy(reply_file_name_, ""); + strcpy(wait_file_name_, ""); +} + +int ReplSemiSyncMaster::initObject() +{ + int result; + const char *kWho = "ReplSemiSyncMaster::initObject"; + + if (init_done_) + { + fprintf(stderr, "%s called twice\n", kWho); + return 1; + } + init_done_ = true; + + /* References to the parameter works after set_options(). */ + setWaitTimeout(rpl_semi_sync_master_timeout); + setTraceLevel(rpl_semi_sync_master_trace_level); + + /* Mutex initialization can only be done after MY_INIT(). */ + mysql_mutex_init(key_LOCK_binlog, + &LOCK_binlog, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_binlog_send, + &COND_binlog_send, NULL); + + if (rpl_semi_sync_master_enabled) + result = enableMaster(); + else + result = disableMaster(); + + return result; +} + +int ReplSemiSyncMaster::enableMaster() +{ + int result = 0; + + /* Must have the lock when we do enable of disable. */ + lock(); + + if (!getMasterEnabled()) + { + active_tranxs_ = new ActiveTranx(&LOCK_binlog, trace_level_); + if (active_tranxs_ != NULL) + { + commit_file_name_inited_ = false; + reply_file_name_inited_ = false; + wait_file_name_inited_ = false; + + set_master_enabled(true); + state_ = true; + run_hooks_enabled= 1; + sql_print_information("Semi-sync replication enabled on the master."); + } + else + { + sql_print_error("Cannot allocate memory to enable semi-sync on the master."); + result = -1; + } + } + + unlock(); + + return result; +} + +int ReplSemiSyncMaster::disableMaster() +{ + /* Must have the lock when we do enable of disable. */ + lock(); + + if (getMasterEnabled()) + { + /* Switch off the semi-sync first so that waiting transaction will be + * waken up. + */ + switch_off(); + + assert(active_tranxs_ != NULL); + delete active_tranxs_; + active_tranxs_ = NULL; + + reply_file_name_inited_ = false; + wait_file_name_inited_ = false; + commit_file_name_inited_ = false; + + set_master_enabled(false); + sql_print_information("Semi-sync replication disabled on the master."); + } + + unlock(); + + return 0; +} + +void ReplSemiSyncMaster::cleanup() +{ + if (init_done_) + { + mysql_mutex_destroy(&LOCK_binlog); + mysql_cond_destroy(&COND_binlog_send); + init_done_= 0; + } + + delete active_tranxs_; +} + +void ReplSemiSyncMaster::lock() +{ + mysql_mutex_lock(&LOCK_binlog); +} + +void ReplSemiSyncMaster::unlock() +{ + mysql_mutex_unlock(&LOCK_binlog); +} + +void ReplSemiSyncMaster::cond_broadcast() +{ + mysql_cond_broadcast(&COND_binlog_send); +} + +int ReplSemiSyncMaster::cond_timewait(struct timespec *wait_time) +{ + const char *kWho = "ReplSemiSyncMaster::cond_timewait()"; + int wait_res; + + function_enter(kWho); + wait_res= mysql_cond_timedwait(&COND_binlog_send, + &LOCK_binlog, wait_time); + return function_exit(kWho, wait_res); +} + +void ReplSemiSyncMaster::add_slave() +{ + lock(); + rpl_semi_sync_master_clients++; + unlock(); +} + +void ReplSemiSyncMaster::remove_slave() +{ + lock(); + rpl_semi_sync_master_clients--; + + /* Only switch off if semi-sync is enabled and is on */ + if (getMasterEnabled() && is_on()) + { + /* If user has chosen not to wait if no semi-sync slave available + and the last semi-sync slave exits, turn off semi-sync on master + immediately. + */ + if (!rpl_semi_sync_master_wait_no_slave && + rpl_semi_sync_master_clients == 0) + switch_off(); + } + unlock(); +} + +bool ReplSemiSyncMaster::is_semi_sync_slave() +{ + int null_value; + long long val= 0; + get_user_var_int("rpl_semi_sync_slave", &val, &null_value); + return val; +} + +int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id, + const char *log_file_name, + my_off_t log_file_pos) +{ + const char *kWho = "ReplSemiSyncMaster::reportReplyBinlog"; + int cmp; + bool can_release_threads = false; + bool need_copy_send_pos = true; + + if (!(getMasterEnabled())) + return 0; + + function_enter(kWho); + + lock(); + + /* This is the real check inside the mutex. */ + if (!getMasterEnabled()) + goto l_end; + + if (!is_on()) + /* We check to see whether we can switch semi-sync ON. */ + try_switch_on(server_id, log_file_name, log_file_pos); + + /* The position should increase monotonically, if there is only one + * thread sending the binlog to the slave. + * In reality, to improve the transaction availability, we allow multiple + * sync replication slaves. So, if any one of them get the transaction, + * the transaction session in the primary can move forward. + */ + if (reply_file_name_inited_) + { + cmp = ActiveTranx::compare(log_file_name, log_file_pos, + reply_file_name_, reply_file_pos_); + + /* If the requested position is behind the sending binlog position, + * would not adjust sending binlog position. + * We based on the assumption that there are multiple semi-sync slave, + * and at least one of them shou/ld be up to date. + * If all semi-sync slaves are behind, at least initially, the primary + * can find the situation after the waiting timeout. After that, some + * slaves should catch up quickly. + */ + if (cmp < 0) + { + /* If the position is behind, do not copy it. */ + need_copy_send_pos = false; + } + } + + if (need_copy_send_pos) + { + strmake_buf(reply_file_name_, log_file_name); + reply_file_pos_ = log_file_pos; + reply_file_name_inited_ = true; + + /* Remove all active transaction nodes before this point. */ + assert(active_tranxs_ != NULL); + active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos); + + if (trace_level_ & kTraceDetail) + sql_print_information("%s: Got reply at (%s, %lu)", kWho, + log_file_name, (ulong)log_file_pos); + } + + if (rpl_semi_sync_master_wait_sessions > 0) + { + /* Let us check if some of the waiting threads doing a trx + * commit can now proceed. + */ + cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_, + wait_file_name_, wait_file_pos_); + if (cmp >= 0) + { + /* Yes, at least one waiting thread can now proceed: + * let us release all waiting threads with a broadcast + */ + can_release_threads = true; + wait_file_name_inited_ = false; + } + } + + l_end: + unlock(); + + if (can_release_threads) + { + if (trace_level_ & kTraceDetail) + sql_print_information("%s: signal all waiting threads.", kWho); + + cond_broadcast(); + } + + return function_exit(kWho, 0); +} + +int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, + my_off_t trx_wait_binlog_pos) +{ + const char *kWho = "ReplSemiSyncMaster::commitTrx"; + + function_enter(kWho); + + if (getMasterEnabled() && trx_wait_binlog_name) + { + struct timespec start_ts; + struct timespec abstime; + int wait_result; + PSI_stage_info old_stage; + + set_timespec(start_ts, 0); + + DEBUG_SYNC(current_thd, "rpl_semisync_master_commit_trx_before_lock"); + /* Acquire the mutex. */ + lock(); + + /* This must be called after acquired the lock */ + THD_ENTER_COND(NULL, &COND_binlog_send, &LOCK_binlog, + & stage_waiting_for_semi_sync_ack_from_slave, + & old_stage); + + /* This is the real check inside the mutex. */ + if (!getMasterEnabled() || !is_on()) + goto l_end; + + if (trace_level_ & kTraceDetail) + { + sql_print_information("%s: wait pos (%s, %lu), repl(%d)\n", kWho, + trx_wait_binlog_name, (ulong)trx_wait_binlog_pos, + (int)is_on()); + } + + while (is_on() && !thd_killed(current_thd)) + { + if (reply_file_name_inited_) + { + int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_, + trx_wait_binlog_name, trx_wait_binlog_pos); + if (cmp >= 0) + { + /* We have already sent the relevant binlog to the slave: no need to + * wait here. + */ + if (trace_level_ & kTraceDetail) + sql_print_information("%s: Binlog reply is ahead (%s, %lu),", + kWho, reply_file_name_, (ulong)reply_file_pos_); + break; + } + } + + /* Let us update the info about the minimum binlog position of waiting + * threads. + */ + if (wait_file_name_inited_) + { + int cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos, + wait_file_name_, wait_file_pos_); + if (cmp <= 0) + { + /* This thd has a lower position, let's update the minimum info. */ + strmake_buf(wait_file_name_, trx_wait_binlog_name); + wait_file_pos_ = trx_wait_binlog_pos; + + rpl_semi_sync_master_wait_pos_backtraverse++; + if (trace_level_ & kTraceDetail) + sql_print_information("%s: move back wait position (%s, %lu),", + kWho, wait_file_name_, (ulong)wait_file_pos_); + } + } + else + { + strmake_buf(wait_file_name_, trx_wait_binlog_name); + wait_file_pos_ = trx_wait_binlog_pos; + wait_file_name_inited_ = true; + + if (trace_level_ & kTraceDetail) + sql_print_information("%s: init wait position (%s, %lu),", + kWho, wait_file_name_, (ulong)wait_file_pos_); + } + + /* Calcuate the waiting period. */ + long diff_secs = (long) (wait_timeout_ / TIME_THOUSAND); + long diff_nsecs = (long) ((wait_timeout_ % TIME_THOUSAND) * TIME_MILLION); + long nsecs = start_ts.tv_nsec + diff_nsecs; + abstime.tv_sec = start_ts.tv_sec + diff_secs + nsecs/TIME_BILLION; + abstime.tv_nsec = nsecs % TIME_BILLION; + + /* In semi-synchronous replication, we wait until the binlog-dump + * thread has received the reply on the relevant binlog segment from the + * replication slave. + * + * Let us suspend this thread to wait on the condition; + * when replication has progressed far enough, we will release + * these waiting threads. + */ + rpl_semi_sync_master_wait_sessions++; + + if (trace_level_ & kTraceDetail) + sql_print_information("%s: wait %lu ms for binlog sent (%s, %lu)", + kWho, wait_timeout_, + wait_file_name_, (ulong)wait_file_pos_); + + wait_result = cond_timewait(&abstime); + rpl_semi_sync_master_wait_sessions--; + + if (wait_result != 0) + { + /* This is a real wait timeout. */ + sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), " + "semi-sync up to file %s, position %lu.", + trx_wait_binlog_name, (ulong)trx_wait_binlog_pos, + reply_file_name_, (ulong)reply_file_pos_); + rpl_semi_sync_master_wait_timeouts++; + + /* switch semi-sync off */ + switch_off(); + } + else + { + int wait_time; + + wait_time = getWaitTime(start_ts); + if (wait_time < 0) + { + if (trace_level_ & kTraceGeneral) + { + sql_print_error("Replication semi-sync getWaitTime fail at " + "wait position (%s, %lu)", + trx_wait_binlog_name, (ulong)trx_wait_binlog_pos); + } + rpl_semi_sync_master_timefunc_fails++; + } + else + { + rpl_semi_sync_master_trx_wait_num++; + rpl_semi_sync_master_trx_wait_time += wait_time; + } + } + } + + /* + At this point, the binlog file and position of this transaction + must have been removed from ActiveTranx. + active_tranxs_ may be NULL if someone disabled semi sync during + cond_timewait() + */ + assert(thd_killed(current_thd) || !active_tranxs_ || + !active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name, + trx_wait_binlog_pos)); + + l_end: + /* Update the status counter. */ + if (is_on()) + rpl_semi_sync_master_yes_transactions++; + else + rpl_semi_sync_master_no_transactions++; + + /* The lock held will be released by thd_exit_cond, so no need to + call unlock() here */ + THD_EXIT_COND(NULL, & old_stage); + } + + return function_exit(kWho, 0); +} + +/* Indicate that semi-sync replication is OFF now. + * + * What should we do when it is disabled? The problem is that we want + * the semi-sync replication enabled again when the slave catches up + * later. But, it is not that easy to detect that the slave has caught + * up. This is caused by the fact that MySQL's replication protocol is + * asynchronous, meaning that if the master does not use the semi-sync + * protocol, the slave would not send anything to the master. + * Still, if the master is sending (N+1)-th event, we assume that it is + * an indicator that the slave has received N-th event and earlier ones. + * + * If semi-sync is disabled, all transactions still update the wait + * position with the last position in binlog. But no transactions will + * wait for confirmations and the active transaction list would not be + * maintained. In binlog dump thread, updateSyncHeader() checks whether + * the current sending event catches up with last wait position. If it + * does match, semi-sync will be switched on again. + */ +int ReplSemiSyncMaster::switch_off() +{ + const char *kWho = "ReplSemiSyncMaster::switch_off"; + int result; + + function_enter(kWho); + state_ = false; + + /* Clear the active transaction list. */ + assert(active_tranxs_ != NULL); + result = active_tranxs_->clear_active_tranx_nodes(NULL, 0); + + rpl_semi_sync_master_off_times++; + wait_file_name_inited_ = false; + reply_file_name_inited_ = false; + sql_print_information("Semi-sync replication switched OFF."); + cond_broadcast(); /* wake up all waiting threads */ + + return function_exit(kWho, result); +} + +int ReplSemiSyncMaster::try_switch_on(int server_id, + const char *log_file_name, + my_off_t log_file_pos) +{ + const char *kWho = "ReplSemiSyncMaster::try_switch_on"; + bool semi_sync_on = false; + + function_enter(kWho); + + /* If the current sending event's position is larger than or equal to the + * 'largest' commit transaction binlog position, the slave is already + * catching up now and we can switch semi-sync on here. + * If commit_file_name_inited_ indicates there are no recent transactions, + * we can enable semi-sync immediately. + */ + if (commit_file_name_inited_) + { + int cmp = ActiveTranx::compare(log_file_name, log_file_pos, + commit_file_name_, commit_file_pos_); + semi_sync_on = (cmp >= 0); + } + else + { + semi_sync_on = true; + } + + if (semi_sync_on) + { + /* Switch semi-sync replication on. */ + state_ = true; + + sql_print_information("Semi-sync replication switched ON with slave (server_id: %d) " + "at (%s, %lu)", + server_id, log_file_name, + (ulong)log_file_pos); + } + + return function_exit(kWho, 0); +} + +int ReplSemiSyncMaster::reserveSyncHeader(unsigned char *header, + ulong size) +{ + const char *kWho = "ReplSemiSyncMaster::reserveSyncHeader"; + function_enter(kWho); + + int hlen=0; + if (!is_semi_sync_slave()) + { + hlen= 0; + } + else + { + /* No enough space for the extra header, disable semi-sync master */ + if (sizeof(kSyncHeader) > size) + { + sql_print_warning("No enough space in the packet " + "for semi-sync extra header, " + "semi-sync replication disabled"); + disableMaster(); + return 0; + } + + /* Set the magic number and the sync status. By default, no sync + * is required. + */ + memcpy(header, kSyncHeader, sizeof(kSyncHeader)); + hlen= sizeof(kSyncHeader); + } + return function_exit(kWho, hlen); +} + +int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet, + const char *log_file_name, + my_off_t log_file_pos, + uint32 server_id) +{ + const char *kWho = "ReplSemiSyncMaster::updateSyncHeader"; + int cmp = 0; + bool sync = false; + + /* If the semi-sync master is not enabled, or the slave is not a semi-sync + * target, do not request replies from the slave. + */ + if (!getMasterEnabled() || !is_semi_sync_slave()) + return 0; + + function_enter(kWho); + + lock(); + + /* This is the real check inside the mutex. */ + if (!getMasterEnabled()) + goto l_end; // sync= false at this point in time + + if (is_on()) + { + /* semi-sync is ON */ + /* sync= false; No sync unless a transaction is involved. */ + + if (reply_file_name_inited_) + { + cmp = ActiveTranx::compare(log_file_name, log_file_pos, + reply_file_name_, reply_file_pos_); + if (cmp <= 0) + { + /* If we have already got the reply for the event, then we do + * not need to sync the transaction again. + */ + goto l_end; + } + } + + if (wait_file_name_inited_) + { + cmp = ActiveTranx::compare(log_file_name, log_file_pos, + wait_file_name_, wait_file_pos_); + } + else + { + cmp = 1; + } + + /* If we are already waiting for some transaction replies which + * are later in binlog, do not wait for this one event. + */ + if (cmp >= 0) + { + /* + * We only wait if the event is a transaction's ending event. + */ + assert(active_tranxs_ != NULL); + sync = active_tranxs_->is_tranx_end_pos(log_file_name, + log_file_pos); + } + } + else + { + if (commit_file_name_inited_) + { + int cmp = ActiveTranx::compare(log_file_name, log_file_pos, + commit_file_name_, commit_file_pos_); + sync = (cmp >= 0); + } + else + { + sync = true; + } + } + + if (trace_level_ & kTraceDetail) + sql_print_information("%s: server(%d), (%s, %lu) sync(%d), repl(%d)", + kWho, server_id, log_file_name, + (ulong)log_file_pos, sync, (int)is_on()); + + l_end: + unlock(); + + /* We do not need to clear sync flag because we set it to 0 when we + * reserve the packet header. + */ + if (sync) + { + (packet)[2] = kPacketFlagSync; + } + + return function_exit(kWho, 0); +} + +int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name, + my_off_t log_file_pos) +{ + const char *kWho = "ReplSemiSyncMaster::writeTranxInBinlog"; + int result = 0; + + function_enter(kWho); + + lock(); + + /* This is the real check inside the mutex. */ + if (!getMasterEnabled()) + goto l_end; + + /* Update the 'largest' transaction commit position seen so far even + * though semi-sync is switched off. + * It is much better that we update commit_file_* here, instead of + * inside commitTrx(). This is mostly because updateSyncHeader() + * will watch for commit_file_* to decide whether to switch semi-sync + * on. The detailed reason is explained in function updateSyncHeader(). + */ + if (commit_file_name_inited_) + { + int cmp = ActiveTranx::compare(log_file_name, log_file_pos, + commit_file_name_, commit_file_pos_); + if (cmp > 0) + { + /* This is a larger position, let's update the maximum info. */ + strncpy(commit_file_name_, log_file_name, FN_REFLEN-1); + commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */ + commit_file_pos_ = log_file_pos; + } + } + else + { + strncpy(commit_file_name_, log_file_name, FN_REFLEN-1); + commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */ + commit_file_pos_ = log_file_pos; + commit_file_name_inited_ = true; + } + + if (is_on()) + { + assert(active_tranxs_ != NULL); + if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos)) + { + /* + if insert tranx_node failed, print a warning message + and turn off semi-sync + */ + sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu", + log_file_name, (ulong)log_file_pos); + switch_off(); + } + } + + l_end: + unlock(); + + return function_exit(kWho, result); +} + +int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id, + const char *event_buf) +{ + const char *kWho = "ReplSemiSyncMaster::readSlaveReply"; + const unsigned char *packet; + char log_file_name[FN_REFLEN]; + my_off_t log_file_pos; + ulong log_file_len = 0; + ulong packet_len; + int result = -1; + struct timespec start_ts; + ulong trc_level = trace_level_; + LINT_INIT_STRUCT(start_ts); + + function_enter(kWho); + + assert((unsigned char)event_buf[1] == kPacketMagicNum); + if ((unsigned char)event_buf[2] != kPacketFlagSync) + { + /* current event does not require reply */ + result = 0; + goto l_end; + } + + if (trc_level & kTraceNetWait) + set_timespec(start_ts, 0); + + /* We flush to make sure that the current event is sent to the network, + * instead of being buffered in the TCP/IP stack. + */ + if (net_flush(net)) + { + sql_print_error("Semi-sync master failed on net_flush() " + "before waiting for slave reply"); + goto l_end; + } + + net_clear(net, 0); + if (trc_level & kTraceDetail) + sql_print_information("%s: Wait for replica's reply", kWho); + + /* Wait for the network here. Though binlog dump thread can indefinitely wait + * here, transactions would not wait indefintely. + * Transactions wait on binlog replies detected by binlog dump threads. If + * binlog dump threads wait too long, transactions will timeout and continue. + */ + packet_len = my_net_read(net); + + if (trc_level & kTraceNetWait) + { + int wait_time = getWaitTime(start_ts); + if (wait_time < 0) + { + sql_print_error("Semi-sync master wait for reply " + "fail to get wait time."); + rpl_semi_sync_master_timefunc_fails++; + } + else + { + rpl_semi_sync_master_net_wait_num++; + rpl_semi_sync_master_net_wait_time += wait_time; + } + } + + if (packet_len == packet_error || packet_len < REPLY_BINLOG_NAME_OFFSET) + { + if (packet_len == packet_error) + sql_print_error("Read semi-sync reply network error: %s (errno: %d)", + net->last_error, net->last_errno); + else + sql_print_error("Read semi-sync reply length error: %s (errno: %d)", + net->last_error, net->last_errno); + goto l_end; + } + + packet = net->read_pos; + if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum) + { + sql_print_error("Read semi-sync reply magic number error"); + goto l_end; + } + + log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET); + log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET; + if (log_file_len >= FN_REFLEN) + { + sql_print_error("Read semi-sync reply binlog file length too large"); + goto l_end; + } + strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len); + log_file_name[log_file_len] = 0; + + if (trc_level & kTraceDetail) + sql_print_information("%s: Got reply (%s, %lu)", + kWho, log_file_name, (ulong)log_file_pos); + + result = reportReplyBinlog(server_id, log_file_name, log_file_pos); + + l_end: + return function_exit(kWho, result); +} + + +int ReplSemiSyncMaster::resetMaster() +{ + const char *kWho = "ReplSemiSyncMaster::resetMaster"; + int result = 0; + + function_enter(kWho); + + + lock(); + + state_ = getMasterEnabled()? 1 : 0; + + wait_file_name_inited_ = false; + reply_file_name_inited_ = false; + commit_file_name_inited_ = false; + + rpl_semi_sync_master_yes_transactions = 0; + rpl_semi_sync_master_no_transactions = 0; + rpl_semi_sync_master_off_times = 0; + rpl_semi_sync_master_timefunc_fails = 0; + rpl_semi_sync_master_wait_sessions = 0; + rpl_semi_sync_master_wait_pos_backtraverse = 0; + rpl_semi_sync_master_trx_wait_num = 0; + rpl_semi_sync_master_trx_wait_time = 0; + rpl_semi_sync_master_net_wait_num = 0; + rpl_semi_sync_master_net_wait_time = 0; + + unlock(); + + return function_exit(kWho, result); +} + +void ReplSemiSyncMaster::setExportStats() +{ + lock(); + + rpl_semi_sync_master_status = state_; + rpl_semi_sync_master_avg_trx_wait_time= + ((rpl_semi_sync_master_trx_wait_num) ? + (ulong)((double)rpl_semi_sync_master_trx_wait_time / + ((double)rpl_semi_sync_master_trx_wait_num)) : 0); + rpl_semi_sync_master_avg_net_wait_time= + ((rpl_semi_sync_master_net_wait_num) ? + (ulong)((double)rpl_semi_sync_master_net_wait_time / + ((double)rpl_semi_sync_master_net_wait_num)) : 0); + + unlock(); +} + +/* Get the waiting time given the wait's staring time. + * + * Return: + * >= 0: the waiting time in microsecons(us) + * < 0: error in get time or time back traverse + */ +static int getWaitTime(const struct timespec& start_ts) +{ + ulonglong start_usecs, end_usecs; + struct timespec end_ts; + + /* Starting time in microseconds(us). */ + start_usecs = timespec_to_usec(&start_ts); + + /* Get the wait time interval. */ + set_timespec(end_ts, 0); + + /* Ending time in microseconds(us). */ + end_usecs = timespec_to_usec(&end_ts); + + if (end_usecs < start_usecs) + return -1; + + return (int)(end_usecs - start_usecs); +} + +/*************************************************************************** + Semisync master interface setup and deinit +***************************************************************************/ + +C_MODE_START + +int repl_semi_report_binlog_update(Binlog_storage_param *param, + const char *log_file, + my_off_t log_pos, uint32 flags) +{ + int error= 0; + + if (repl_semisync_master.getMasterEnabled()) + { + /* + Let us store the binlog file name and the position, so that + we know how long to wait for the binlog to the replicated to + the slave in synchronous replication. + */ + error= repl_semisync_master.writeTranxInBinlog(log_file, + log_pos); + } + + return error; +} + +int repl_semi_request_commit(Trans_param *param) +{ + return 0; +} + +int repl_semi_report_binlog_sync(Binlog_storage_param *param, + const char *log_file, + my_off_t log_pos, uint32 flags) +{ + int error= 0; + if (rpl_semi_sync_master_wait_point == + SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC) + { + error= repl_semisync_master.commitTrx(log_file, log_pos); + } + + return error; +} + +int repl_semi_report_commit(Trans_param *param) +{ + if (rpl_semi_sync_master_wait_point != + SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT) + { + return 0; + } + + bool is_real_trans= param->flags & TRANS_IS_REAL_TRANS; + + if (is_real_trans && param->log_pos) + { + const char *binlog_name= param->log_file; + return repl_semisync_master.commitTrx(binlog_name, param->log_pos); + } + return 0; +} + +int repl_semi_report_rollback(Trans_param *param) +{ + return repl_semi_report_commit(param); +} + +int repl_semi_binlog_dump_start(Binlog_transmit_param *param, + const char *log_file, + my_off_t log_pos) +{ + bool semi_sync_slave= repl_semisync_master.is_semi_sync_slave(); + + if (semi_sync_slave) + { + /* One more semi-sync slave */ + repl_semisync_master.add_slave(); + + /* + Let's assume this semi-sync slave has already received all + binlog events before the filename and position it requests. + */ + repl_semisync_master.reportReplyBinlog(param->server_id, log_file, log_pos); + } + sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)", + semi_sync_slave ? "semi-sync" : "asynchronous", + param->server_id, log_file, (ulong)log_pos); + + return 0; +} + +int repl_semi_binlog_dump_end(Binlog_transmit_param *param) +{ + bool semi_sync_slave= repl_semisync_master.is_semi_sync_slave(); + + sql_print_information("Stop %s binlog_dump to slave (server_id: %d)", + semi_sync_slave ? "semi-sync" : "asynchronous", + param->server_id); + if (semi_sync_slave) + { + /* One less semi-sync slave */ + repl_semisync_master.remove_slave(); + } + return 0; +} + +int repl_semi_reserve_header(Binlog_transmit_param *param, + unsigned char *header, + ulong size, ulong *len) +{ + *len += repl_semisync_master.reserveSyncHeader(header, size); + return 0; +} + +int repl_semi_before_send_event(Binlog_transmit_param *param, + unsigned char *packet, ulong len, + const char *log_file, my_off_t log_pos) +{ + return repl_semisync_master.updateSyncHeader(packet, + log_file, + log_pos, + param->server_id); +} + +int repl_semi_after_send_event(Binlog_transmit_param *param, + const char *event_buf, ulong len) +{ + if (repl_semisync_master.is_semi_sync_slave()) + { + THD *thd= current_thd; + /* + Possible errors in reading slave reply are ignored deliberately + because we do not want dump thread to quit on this. Error + messages are already reported. + */ + (void) repl_semisync_master.readSlaveReply(&thd->net, + param->server_id, event_buf); + thd->clear_error(); + } + return 0; +} + +int repl_semi_reset_master(Binlog_transmit_param *param) +{ + if (repl_semisync_master.resetMaster()) + return 1; + return 0; +} + +C_MODE_END + +Trans_observer trans_observer= +{ + sizeof(Trans_observer), // len + + repl_semi_report_commit, // after_commit + repl_semi_report_rollback, // after_rollback +}; + +Binlog_storage_observer storage_observer= +{ + sizeof(Binlog_storage_observer), // len + + repl_semi_report_binlog_update, // report_update + repl_semi_report_binlog_sync, // after_sync +}; + +Binlog_transmit_observer transmit_observer= +{ + sizeof(Binlog_transmit_observer), // len + + repl_semi_binlog_dump_start, // start + repl_semi_binlog_dump_end, // stop + repl_semi_reserve_header, // reserve_header + repl_semi_before_send_event, // before_send_event + repl_semi_after_send_event, // after_send_event + repl_semi_reset_master, // reset +}; + +static bool semi_sync_master_inited= 0; + +int semi_sync_master_init() +{ + void *p= 0; + if (repl_semisync_master.initObject()) + return 1; + if (register_trans_observer(&trans_observer, p)) + return 1; + if (register_binlog_storage_observer(&storage_observer, p)) + return 1; + if (register_binlog_transmit_observer(&transmit_observer, p)) + return 1; + semi_sync_master_inited= 1; + return 0; +} + +void semi_sync_master_deinit() +{ + void *p= 0; + if (!semi_sync_master_inited) + return; + + unregister_trans_observer(&trans_observer, p); + unregister_binlog_storage_observer(&storage_observer, p); + unregister_binlog_transmit_observer(&transmit_observer, p); + repl_semisync_master.cleanup(); + semi_sync_master_inited= 0; +} diff --git a/sql/semisync_master.h b/sql/semisync_master.h new file mode 100644 index 00000000000..ff1e3dd48b4 --- /dev/null +++ b/sql/semisync_master.h @@ -0,0 +1,636 @@ +/* Copyright (C) 2007 Google Inc. + Copyright (c) 2008 MySQL AB, 2009 Sun Microsystems, Inc. + Use is subject to license terms. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#ifndef SEMISYNC_MASTER_H +#define SEMISYNC_MASTER_H + +#include "semisync.h" + +#ifdef HAVE_PSI_INTERFACE +extern PSI_mutex_key key_LOCK_binlog; +extern PSI_cond_key key_COND_binlog_send; +#endif + +extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave; + +struct TranxNode { + char log_name_[FN_REFLEN]; + my_off_t log_pos_; + struct TranxNode *next_; /* the next node in the sorted list */ + struct TranxNode *hash_next_; /* the next node during hash collision */ +}; + +/** + @class TranxNodeAllocator + + This class provides memory allocating and freeing methods for + TranxNode. The main target is performance. + + @section ALLOCATE How to allocate a node + The pointer of the first node after 'last_node' in current_block is + returned. current_block will move to the next free Block when all nodes of + it are in use. A new Block is allocated and is put into the rear of the + Block link table if no Block is free. + + The list starts up empty (ie, there is no allocated Block). + + After some nodes are freed, there probably are some free nodes before + the sequence of the allocated nodes, but we do not reuse it. It is better + to keep the allocated nodes are in the sequence, for it is more efficient + for allocating and freeing TranxNode. + + @section FREENODE How to free nodes + There are two methods for freeing nodes. They are free_all_nodes and + free_nodes_before. + + 'A Block is free' means all of its nodes are free. + @subsection free_nodes_before + As all allocated nodes are in the sequence, 'Before one node' means all + nodes before given node in the same Block and all Blocks before the Block + which containing the given node. As such, all Blocks before the given one + ('node') are free Block and moved into the rear of the Block link table. + The Block containing the given 'node', however, is not. For at least the + given 'node' is still in use. This will waste at most one Block, but it is + more efficient. + */ +#define BLOCK_TRANX_NODES 16 +class TranxNodeAllocator +{ +public: + /** + @param reserved_nodes + The number of reserved TranxNodes. It is used to set 'reserved_blocks' + which can contain at least 'reserved_nodes' number of TranxNodes. When + freeing memory, we will reserve at least reserved_blocks of Blocks not + freed. + */ + TranxNodeAllocator(uint reserved_nodes) : + reserved_blocks(reserved_nodes/BLOCK_TRANX_NODES + + (reserved_nodes%BLOCK_TRANX_NODES > 1 ? 2 : 1)), + first_block(NULL), last_block(NULL), + current_block(NULL), last_node(-1), block_num(0) {} + + ~TranxNodeAllocator() + { + Block *block= first_block; + while (block != NULL) + { + Block *next= block->next; + free_block(block); + block= next; + } + } + + /** + The pointer of the first node after 'last_node' in current_block is + returned. current_block will move to the next free Block when all nodes of + it are in use. A new Block is allocated and is put into the rear of the + Block link table if no Block is free. + + @return Return a TranxNode *, or NULL if an error occurred. + */ + TranxNode *allocate_node() + { + TranxNode *trx_node; + Block *block= current_block; + + if (last_node == BLOCK_TRANX_NODES-1) + { + current_block= current_block->next; + last_node= -1; + } + + if (current_block == NULL && allocate_block()) + { + current_block= block; + if (current_block) + last_node= BLOCK_TRANX_NODES-1; + return NULL; + } + + trx_node= &(current_block->nodes[++last_node]); + trx_node->log_name_[0] = '\0'; + trx_node->log_pos_= 0; + trx_node->next_= 0; + trx_node->hash_next_= 0; + return trx_node; + } + + /** + All nodes are freed. + + @return Return 0, or 1 if an error occurred. + */ + int free_all_nodes() + { + current_block= first_block; + last_node= -1; + free_blocks(); + return 0; + } + + /** + All Blocks before the given 'node' are free Block and moved into the rear + of the Block link table. + + @param node All nodes before 'node' will be freed + + @return Return 0, or 1 if an error occurred. + */ + int free_nodes_before(TranxNode* node) + { + Block *block; + Block *prev_block= NULL; + + block= first_block; + while (block != current_block->next) + { + /* Find the Block containing the given node */ + if (&(block->nodes[0]) <= node && &(block->nodes[BLOCK_TRANX_NODES]) >= node) + { + /* All Blocks before the given node are put into the rear */ + if (first_block != block) + { + last_block->next= first_block; + first_block= block; + last_block= prev_block; + last_block->next= NULL; + free_blocks(); + } + return 0; + } + prev_block= block; + block= block->next; + } + + /* Node does not find should never happen */ + DBUG_ASSERT(0); + return 1; + } + +private: + uint reserved_blocks; + + /** + A sequence memory which contains BLOCK_TRANX_NODES TranxNodes. + + BLOCK_TRANX_NODES The number of TranxNodes which are in a Block. + + next Every Block has a 'next' pointer which points to the next Block. + These linking Blocks constitute a Block link table. + */ + struct Block { + Block *next; + TranxNode nodes[BLOCK_TRANX_NODES]; + }; + + /** + The 'first_block' is the head of the Block link table; + */ + Block *first_block; + /** + The 'last_block' is the rear of the Block link table; + */ + Block *last_block; + + /** + current_block always points the Block in the Block link table in + which the last allocated node is. The Blocks before it are all in use + and the Blocks after it are all free. + */ + Block *current_block; + + /** + It always points to the last node which has been allocated in the + current_block. + */ + int last_node; + + /** + How many Blocks are in the Block link table. + */ + uint block_num; + + /** + Allocate a block and then assign it to current_block. + */ + int allocate_block() + { + Block *block= (Block *)my_malloc(sizeof(Block), MYF(0)); + if (block) + { + block->next= NULL; + + if (first_block == NULL) + first_block= block; + else + last_block->next= block; + + /* New Block is always put into the rear */ + last_block= block; + /* New Block is always the current_block */ + current_block= block; + ++block_num; + return 0; + } + return 1; + } + + /** + Free a given Block. + @param block The Block will be freed. + */ + void free_block(Block *block) + { + my_free(block); + --block_num; + } + + + /** + If there are some free Blocks and the total number of the Blocks in the + Block link table is larger than the 'reserved_blocks', Some free Blocks + will be freed until the total number of the Blocks is equal to the + 'reserved_blocks' or there is only one free Block behind the + 'current_block'. + */ + void free_blocks() + { + if (current_block == NULL || current_block->next == NULL) + return; + + /* One free Block is always kept behind the current block */ + Block *block= current_block->next->next; + while (block_num > reserved_blocks && block != NULL) + { + Block *next= block->next; + free_block(block); + block= next; + } + current_block->next->next= block; + if (block == NULL) + last_block= current_block->next; + } +}; + +/** + This class manages memory for active transaction list. + + We record each active transaction with a TranxNode, each session + can have only one open transaction. Because of EVENT, the total + active transaction nodes can exceed the maximum allowed + connections. +*/ +class ActiveTranx + :public Trace { +private: + + TranxNodeAllocator allocator_; + /* These two record the active transaction list in sort order. */ + TranxNode *trx_front_, *trx_rear_; + + TranxNode **trx_htb_; /* A hash table on active transactions. */ + + int num_entries_; /* maximum hash table entries */ + mysql_mutex_t *lock_; /* mutex lock */ + + inline void assert_lock_owner(); + + inline unsigned int calc_hash(const unsigned char *key,unsigned int length); + unsigned int get_hash_value(const char *log_file_name, my_off_t log_file_pos); + + int compare(const char *log_file_name1, my_off_t log_file_pos1, + const TranxNode *node2) { + return compare(log_file_name1, log_file_pos1, + node2->log_name_, node2->log_pos_); + } + int compare(const TranxNode *node1, + const char *log_file_name2, my_off_t log_file_pos2) { + return compare(node1->log_name_, node1->log_pos_, + log_file_name2, log_file_pos2); + } + int compare(const TranxNode *node1, const TranxNode *node2) { + return compare(node1->log_name_, node1->log_pos_, + node2->log_name_, node2->log_pos_); + } + +public: + ActiveTranx(mysql_mutex_t *lock, unsigned long trace_level); + ~ActiveTranx(); + + /* Insert an active transaction node with the specified position. + * + * Return: + * 0: success; non-zero: error + */ + int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos); + + /* Clear the active transaction nodes until(inclusive) the specified + * position. + * If log_file_name is NULL, everything will be cleared: the sorted + * list and the hash table will be reset to empty. + * + * Return: + * 0: success; non-zero: error + */ + int clear_active_tranx_nodes(const char *log_file_name, + my_off_t log_file_pos); + + /* Given a position, check to see whether the position is an active + * transaction's ending position by probing the hash table. + */ + bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos); + + /* Given two binlog positions, compare which one is bigger based on + * (file_name, file_position). + */ + static int compare(const char *log_file_name1, my_off_t log_file_pos1, + const char *log_file_name2, my_off_t log_file_pos2); + +}; + +/** + The extension class for the master of semi-synchronous replication +*/ +class ReplSemiSyncMaster + :public ReplSemiSyncBase { + private: + ActiveTranx *active_tranxs_; /* active transaction list: the list will + be cleared when semi-sync switches off. */ + + /* True when initObject has been called */ + bool init_done_; + + /* This cond variable is signaled when enough binlog has been sent to slave, + * so that a waiting trx can return the 'ok' to the client for a commit. + */ + mysql_cond_t COND_binlog_send; + + /* Mutex that protects the following state variables and the active + * transaction list. + * Under no cirumstances we can acquire mysql_bin_log.LOCK_log if we are + * already holding LOCK_binlog_ because it can cause deadlocks. + */ + mysql_mutex_t LOCK_binlog; + + /* This is set to true when reply_file_name_ contains meaningful data. */ + bool reply_file_name_inited_; + + /* The binlog name up to which we have received replies from any slaves. */ + char reply_file_name_[FN_REFLEN]; + + /* The position in that file up to which we have the reply from any slaves. */ + my_off_t reply_file_pos_; + + /* This is set to true when we know the 'smallest' wait position. */ + bool wait_file_name_inited_; + + /* NULL, or the 'smallest' filename that a transaction is waiting for + * slave replies. + */ + char wait_file_name_[FN_REFLEN]; + + /* The smallest position in that file that a trx is waiting for: the trx + * can proceed and send an 'ok' to the client when the master has got the + * reply from the slave indicating that it already got the binlog events. + */ + my_off_t wait_file_pos_; + + /* This is set to true when we know the 'largest' transaction commit + * position in the binlog file. + * We always maintain the position no matter whether semi-sync is switched + * on switched off. When a transaction wait timeout occurs, semi-sync will + * switch off. Binlog-dump thread can use the three fields to detect when + * slaves catch up on replication so that semi-sync can switch on again. + */ + bool commit_file_name_inited_; + + /* The 'largest' binlog filename that a commit transaction is seeing. */ + char commit_file_name_[FN_REFLEN]; + + /* The 'largest' position in that file that a commit transaction is seeing. */ + my_off_t commit_file_pos_; + + /* All global variables which can be set by parameters. */ + volatile bool master_enabled_; /* semi-sync is enabled on the master */ + unsigned long wait_timeout_; /* timeout period(ms) during tranx wait */ + + bool state_; /* whether semi-sync is switched */ + + void lock(); + void unlock(); + void cond_broadcast(); + int cond_timewait(struct timespec *wait_time); + + /* Is semi-sync replication on? */ + bool is_on() { + return (state_); + } + + void set_master_enabled(bool enabled) { + master_enabled_ = enabled; + } + + /* Switch semi-sync off because of timeout in transaction waiting. */ + int switch_off(); + + /* Switch semi-sync on when slaves catch up. */ + int try_switch_on(int server_id, + const char *log_file_name, my_off_t log_file_pos); + + public: + ReplSemiSyncMaster(); + ~ReplSemiSyncMaster() {} + + void cleanup(); + + bool getMasterEnabled() { + return master_enabled_; + } + void setTraceLevel(unsigned long trace_level) { + trace_level_ = trace_level; + if (active_tranxs_) + active_tranxs_->trace_level_ = trace_level; + } + + /* Set the transaction wait timeout period, in milliseconds. */ + void setWaitTimeout(unsigned long wait_timeout) { + wait_timeout_ = wait_timeout; + } + + /* Initialize this class after MySQL parameters are initialized. this + * function should be called once at bootstrap time. + */ + int initObject(); + + /* Enable the object to enable semi-sync replication inside the master. */ + int enableMaster(); + + /* Enable the object to enable semi-sync replication inside the master. */ + int disableMaster(); + + /* Add a semi-sync replication slave */ + void add_slave(); + + /* Remove a semi-sync replication slave */ + void remove_slave(); + + /* Is the slave servered by the thread requested semi-sync */ + bool is_semi_sync_slave(); + + /* In semi-sync replication, reports up to which binlog position we have + * received replies from the slave indicating that it already get the events. + * + * Input: + * server_id - (IN) master server id number + * log_file_name - (IN) binlog file name + * end_offset - (IN) the offset in the binlog file up to which we have + * the replies from the slave + * + * Return: + * 0: success; non-zero: error + */ + int reportReplyBinlog(uint32 server_id, + const char* log_file_name, + my_off_t end_offset); + + /* Commit a transaction in the final step. This function is called from + * InnoDB before returning from the low commit. If semi-sync is switch on, + * the function will wait to see whether binlog-dump thread get the reply for + * the events of the transaction. Remember that this is not a direct wait, + * instead, it waits to see whether the binlog-dump thread has reached the + * point. If the wait times out, semi-sync status will be switched off and + * all other transaction would not wait either. + * + * Input: (the transaction events' ending binlog position) + * trx_wait_binlog_name - (IN) ending position's file name + * trx_wait_binlog_pos - (IN) ending position's file offset + * + * Return: + * 0: success; non-zero: error + */ + int commitTrx(const char* trx_wait_binlog_name, + my_off_t trx_wait_binlog_pos); + + /* Reserve space in the replication event packet header: + * . slave semi-sync off: 1 byte - (0) + * . slave semi-sync on: 3 byte - (0, 0xef, 0/1} + * + * Input: + * header - (IN) the header buffer + * size - (IN) size of the header buffer + * + * Return: + * size of the bytes reserved for header + */ + int reserveSyncHeader(unsigned char *header, unsigned long size); + + /* Update the sync bit in the packet header to indicate to the slave whether + * the master will wait for the reply of the event. If semi-sync is switched + * off and we detect that the slave is catching up, we switch semi-sync on. + * + * Input: + * packet - (IN) the packet containing the replication event + * log_file_name - (IN) the event ending position's file name + * log_file_pos - (IN) the event ending position's file offset + * server_id - (IN) master server id number + * + * Return: + * 0: success; non-zero: error + */ + int updateSyncHeader(unsigned char *packet, + const char *log_file_name, + my_off_t log_file_pos, + uint32 server_id); + + /* Called when a transaction finished writing binlog events. + * . update the 'largest' transactions' binlog event position + * . insert the ending position in the active transaction list if + * semi-sync is on + * + * Input: (the transaction events' ending binlog position) + * log_file_name - (IN) transaction ending position's file name + * log_file_pos - (IN) transaction ending position's file offset + * + * Return: + * 0: success; non-zero: error + */ + int writeTranxInBinlog(const char* log_file_name, my_off_t log_file_pos); + + /* Read the slave's reply so that we know how much progress the slave makes + * on receive replication events. + * + * Input: + * net - (IN) the connection to master + * server_id - (IN) master server id number + * event_buf - (IN) pointer to the event packet + * + * Return: + * 0: success; non-zero: error + */ + int readSlaveReply(NET *net, uint32 server_id, const char *event_buf); + + /* Export internal statistics for semi-sync replication. */ + void setExportStats(); + + /* 'reset master' command is issued from the user and semi-sync need to + * go off for that. + */ + int resetMaster(); +}; + +enum rpl_semi_sync_master_wait_point_t { + SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC, + SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT, +}; + +/* System and status variables for the master component */ +extern my_bool rpl_semi_sync_master_enabled; +extern my_bool rpl_semi_sync_master_status; +extern ulong rpl_semi_sync_master_wait_point; +extern ulong rpl_semi_sync_master_clients; +extern ulong rpl_semi_sync_master_timeout; +extern ulong rpl_semi_sync_master_trace_level; +extern ulong rpl_semi_sync_master_yes_transactions; +extern ulong rpl_semi_sync_master_no_transactions; +extern ulong rpl_semi_sync_master_off_times; +extern ulong rpl_semi_sync_master_wait_timeouts; +extern ulong rpl_semi_sync_master_timefunc_fails; +extern ulong rpl_semi_sync_master_num_timeouts; +extern ulong rpl_semi_sync_master_wait_sessions; +extern ulong rpl_semi_sync_master_wait_pos_backtraverse; +extern ulong rpl_semi_sync_master_avg_trx_wait_time; +extern ulong rpl_semi_sync_master_avg_net_wait_time; +extern ulonglong rpl_semi_sync_master_net_wait_num; +extern ulonglong rpl_semi_sync_master_trx_wait_num; +extern ulonglong rpl_semi_sync_master_net_wait_time; +extern ulonglong rpl_semi_sync_master_trx_wait_time; + +/* + This indicates whether we should keep waiting if no semi-sync slave + is available. + 0 : stop waiting if detected no avaialable semi-sync slave. + 1 (default) : keep waiting until timeout even no available semi-sync slave. +*/ +extern char rpl_semi_sync_master_wait_no_slave; +extern ReplSemiSyncMaster repl_semisync_master; + +int semi_sync_master_init(); +void semi_sync_master_deinit(); + +#endif /* SEMISYNC_MASTER_H */ diff --git a/sql/semisync_slave.cc b/sql/semisync_slave.cc new file mode 100644 index 00000000000..63bf9dca0e8 --- /dev/null +++ b/sql/semisync_slave.cc @@ -0,0 +1,288 @@ +/* Copyright (c) 2008 MySQL AB, 2009 Sun Microsystems, Inc. + Use is subject to license terms. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#include <my_global.h> +#include "semisync_slave.h" + +my_bool rpl_semi_sync_slave_enabled; +my_bool rpl_semi_sync_slave_status= 0; +ulong rpl_semi_sync_slave_trace_level; +ReplSemiSyncSlave repl_semisync_slave; + +/* + indicate whether or not the slave should send a reply to the master. + + This is set to true in repl_semi_slave_read_event if the current + event read is the last event of a transaction. And the value is + checked in repl_semi_slave_queue_event. +*/ +bool semi_sync_need_reply= false; + + +int ReplSemiSyncSlave::initObject() +{ + int result= 0; + const char *kWho = "ReplSemiSyncSlave::initObject"; + + if (init_done_) + { + fprintf(stderr, "%s called twice\n", kWho); + return 1; + } + init_done_ = true; + + /* References to the parameter works after set_options(). */ + setSlaveEnabled(rpl_semi_sync_slave_enabled); + setTraceLevel(rpl_semi_sync_slave_trace_level); + + return result; +} + +int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header, + unsigned long total_len, + bool *need_reply, + const char **payload, + unsigned long *payload_len) +{ + const char *kWho = "ReplSemiSyncSlave::slaveReadSyncHeader"; + int read_res = 0; + function_enter(kWho); + + if ((unsigned char)(header[0]) == kPacketMagicNum) + { + *need_reply = (header[1] & kPacketFlagSync); + *payload_len = total_len - 2; + *payload = header + 2; + + if (trace_level_ & kTraceDetail) + sql_print_information("%s: reply - %d", kWho, *need_reply); + } + else + { + sql_print_error("Missing magic number for semi-sync packet, packet " + "len: %lu", total_len); + read_res = -1; + } + + return function_exit(kWho, read_res); +} + +int ReplSemiSyncSlave::slaveStart(Binlog_relay_IO_param *param) +{ + bool semi_sync= getSlaveEnabled(); + + sql_print_information("Slave I/O thread: Start %s replication to\ + master '%s@%s:%d' in log '%s' at position %lu", + semi_sync ? "semi-sync" : "asynchronous", + param->user, param->host, param->port, + param->master_log_name[0] ? param->master_log_name : "FIRST", + (unsigned long)param->master_log_pos); + + if (semi_sync && !rpl_semi_sync_slave_status) + rpl_semi_sync_slave_status= 1; + return 0; +} + +int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param) +{ + if (rpl_semi_sync_slave_status) + rpl_semi_sync_slave_status= 0; + if (mysql_reply) + mysql_close(mysql_reply); + mysql_reply= 0; + return 0; +} + +int ReplSemiSyncSlave::slaveReply(MYSQL *mysql, + const char *binlog_filename, + my_off_t binlog_filepos) +{ + const char *kWho = "ReplSemiSyncSlave::slaveReply"; + NET *net= &mysql->net; + uchar reply_buffer[REPLY_MAGIC_NUM_LEN + + REPLY_BINLOG_POS_LEN + + REPLY_BINLOG_NAME_LEN]; + int reply_res, name_len = strlen(binlog_filename); + + function_enter(kWho); + + /* Prepare the buffer of the reply. */ + reply_buffer[REPLY_MAGIC_NUM_OFFSET] = kPacketMagicNum; + int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos); + memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET, + binlog_filename, + name_len + 1 /* including trailing '\0' */); + + if (trace_level_ & kTraceDetail) + sql_print_information("%s: reply (%s, %lu)", kWho, + binlog_filename, (ulong)binlog_filepos); + + net_clear(net, 0); + /* Send the reply. */ + reply_res = my_net_write(net, reply_buffer, + name_len + REPLY_BINLOG_NAME_OFFSET); + if (!reply_res) + { + reply_res = net_flush(net); + if (reply_res) + sql_print_error("Semi-sync slave net_flush() reply failed"); + } + else + { + sql_print_error("Semi-sync slave send reply failed: %s (%d)", + net->last_error, net->last_errno); + } + + return function_exit(kWho, reply_res); +} + +/*************************************************************************** + Semisync slave interface setup and deinit +***************************************************************************/ + +C_MODE_START + +int repl_semi_reset_slave(Binlog_relay_IO_param *param) +{ + // TODO: reset semi-sync slave status here + return 0; +} + +int repl_semi_slave_request_dump(Binlog_relay_IO_param *param, + uint32 flags) +{ + MYSQL *mysql= param->mysql; + MYSQL_RES *res= 0; + MYSQL_ROW row; + const char *query; + + if (!repl_semisync_slave.getSlaveEnabled()) + return 0; + + /* Check if master server has semi-sync plugin installed */ + query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'"; + if (mysql_real_query(mysql, query, strlen(query)) || + !(res= mysql_store_result(mysql))) + { + sql_print_error("Execution failed on master: %s", query); + return 1; + } + + row= mysql_fetch_row(res); + if (!row) + { + /* Master does not support semi-sync */ + sql_print_warning("Master server does not support semi-sync, " + "fallback to asynchronous replication"); + rpl_semi_sync_slave_status= 0; + mysql_free_result(res); + return 0; + } + mysql_free_result(res); + + /* + Tell master dump thread that we want to do semi-sync + replication + */ + query= "SET @rpl_semi_sync_slave= 1"; + if (mysql_real_query(mysql, query, strlen(query))) + { + sql_print_error("Set 'rpl_semi_sync_slave=1' on master failed"); + return 1; + } + mysql_free_result(mysql_store_result(mysql)); + rpl_semi_sync_slave_status= 1; + return 0; +} + +int repl_semi_slave_read_event(Binlog_relay_IO_param *param, + const char *packet, unsigned long len, + const char **event_buf, unsigned long *event_len) +{ + if (rpl_semi_sync_slave_status) + return repl_semisync_slave.slaveReadSyncHeader(packet, len, + &semi_sync_need_reply, + event_buf, event_len); + *event_buf= packet; + *event_len= len; + return 0; +} + +int repl_semi_slave_queue_event(Binlog_relay_IO_param *param, + const char *event_buf, + unsigned long event_len, + uint32 flags) +{ + if (rpl_semi_sync_slave_status && semi_sync_need_reply) + { + /* + We deliberately ignore the error in slaveReply, such error + should not cause the slave IO thread to stop, and the error + messages are already reported. + */ + (void) repl_semisync_slave.slaveReply(param->mysql, + param->master_log_name, + param->master_log_pos); + } + return 0; +} + +int repl_semi_slave_io_start(Binlog_relay_IO_param *param) +{ + return repl_semisync_slave.slaveStart(param); +} + +int repl_semi_slave_io_end(Binlog_relay_IO_param *param) +{ + return repl_semisync_slave.slaveStop(param); +} + +C_MODE_END + +Binlog_relay_IO_observer relay_io_observer= +{ + sizeof(Binlog_relay_IO_observer), // len + + repl_semi_slave_io_start, // start + repl_semi_slave_io_end, // stop + repl_semi_slave_request_dump, // request_transmit + repl_semi_slave_read_event, // after_read_event + repl_semi_slave_queue_event, // after_queue_event + repl_semi_reset_slave, // reset +}; + +static bool semi_sync_slave_inited= 0; + +int semi_sync_slave_init() +{ + void *p= 0; + if (repl_semisync_slave.initObject()) + return 1; + if (register_binlog_relay_io_observer(&relay_io_observer, p)) + return 1; + semi_sync_slave_inited= 1; + return 0; +} + +void semi_sync_slave_deinit() +{ + void *p= 0; + if (!semi_sync_slave_inited) + return; + unregister_binlog_relay_io_observer(&relay_io_observer, p); + semi_sync_slave_inited= 0; +} diff --git a/sql/semisync_slave.h b/sql/semisync_slave.h new file mode 100644 index 00000000000..9cca8bbbdb4 --- /dev/null +++ b/sql/semisync_slave.h @@ -0,0 +1,101 @@ +/* Copyright (c) 2006 MySQL AB, 2009 Sun Microsystems, Inc. + Use is subject to license terms. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#ifndef SEMISYNC_SLAVE_H +#define SEMISYNC_SLAVE_H + +#include "semisync.h" + +/** + The extension class for the slave of semi-synchronous replication +*/ +class ReplSemiSyncSlave + :public ReplSemiSyncBase { +public: + ReplSemiSyncSlave() + :slave_enabled_(false) + {} + ~ReplSemiSyncSlave() {} + + void setTraceLevel(unsigned long trace_level) { + trace_level_ = trace_level; + } + + /* Initialize this class after MySQL parameters are initialized. this + * function should be called once at bootstrap time. + */ + int initObject(); + + bool getSlaveEnabled() { + return slave_enabled_; + } + void setSlaveEnabled(bool enabled) { + slave_enabled_ = enabled; + } + + /* A slave reads the semi-sync packet header and separate the metadata + * from the payload data. + * + * Input: + * header - (IN) packet header pointer + * total_len - (IN) total packet length: metadata + payload + * need_reply - (IN) whether the master is waiting for the reply + * payload - (IN) payload: the replication event + * payload_len - (IN) payload length + * + * Return: + * 0: success; non-zero: error + */ + int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply, + const char **payload, unsigned long *payload_len); + + /* A slave replies to the master indicating its replication process. It + * indicates that the slave has received all events before the specified + * binlog position. + * + * Input: + * mysql - (IN) the mysql network connection + * binlog_filename - (IN) the reply point's binlog file name + * binlog_filepos - (IN) the reply point's binlog file offset + * + * Return: + * 0: success; non-zero: error + */ + int slaveReply(MYSQL *mysql, const char *binlog_filename, + my_off_t binlog_filepos); + + int slaveStart(Binlog_relay_IO_param *param); + int slaveStop(Binlog_relay_IO_param *param); + +private: + /* True when initObject has been called */ + bool init_done_; + bool slave_enabled_; /* semi-sycn is enabled on the slave */ + MYSQL *mysql_reply; /* connection to send reply */ +}; + + +/* System and status variables for the slave component */ +extern my_bool rpl_semi_sync_slave_enabled; +extern my_bool rpl_semi_sync_slave_status; +extern ulong rpl_semi_sync_slave_trace_level; +extern ReplSemiSyncSlave repl_semisync_slave; + +int semi_sync_slave_init(); +void semi_sync_slave_deinit(); + +#endif /* SEMISYNC_SLAVE_H */ diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 82f8359fefc..e897b6c21ce 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -61,6 +61,8 @@ #include "sql_repl.h" #include "opt_range.h" #include "rpl_parallel.h" +#include "semisync_master.h" +#include "semisync_slave.h" #include <ssl_compat.h> /* @@ -3039,8 +3041,188 @@ static Sys_var_replicate_events_marked_for_skip Replicate_events_marked_for_skip "the slave).", GLOBAL_VAR(opt_replicate_events_marked_for_skip), CMD_LINE(REQUIRED_ARG), replicate_events_marked_for_skip_names, DEFAULT(RPL_SKIP_REPLICATE)); + +/* new options for semisync */ + +static bool fix_rpl_semi_sync_master_enabled(sys_var *self, THD *thd, + enum_var_type type) +{ + if (rpl_semi_sync_master_enabled) + { + if (repl_semisync_master.enableMaster() != 0) + rpl_semi_sync_master_enabled= false; +#ifdef HAVE_ACC_RECEIVER + else if (ack_receiver.start()) + { + repl_semisync_master.disableMaster(); + rpl_semi_sync_master_enabled= false; + } +#endif + } + else + { + if (repl_semisync_master.disableMaster() != 0) + rpl_semi_sync_master_enabled= true; +#ifdef HAVE_ACC_RECEIVER + if (!rpl_semi_sync_master_enabled) + ack_receiver.stop(); +#endif + } + return false; +} + +static bool fix_rpl_semi_sync_master_timeout(sys_var *self, THD *thd, + enum_var_type type) +{ + repl_semisync_master.setWaitTimeout(rpl_semi_sync_master_timeout); + return false; +} + +static bool fix_rpl_semi_sync_master_trace_level(sys_var *self, THD *thd, + enum_var_type type) +{ + repl_semisync_master.setTraceLevel(rpl_semi_sync_master_trace_level); +#ifdef HAVE_ACC_RECEIVER + ack_receiver.setTraceLevel(rpl_semi_sync_master_trace_level); +#endif + return false; +} + +static bool fix_rpl_semi_sync_master_wait_point(sys_var *self, THD *thd, + enum_var_type type) +{ +#ifdef HAVE_ACC_RECEIVER + repl_semisync_master.setWaitPoint(rpl_semi_sync_master_wait_point); #endif + return false; +} + +static bool fix_rpl_semi_sync_master_wait_no_slave(sys_var *self, THD *thd, + enum_var_type type) +{ +#ifdef HAVE_ACC_RECEIVER + repl_semisync_master.checkAndSwitch(); +#endif + return false; +} + +static Sys_var_mybool Sys_semisync_master_enabled( + "rpl_semi_sync_master_enabled", + "Enable semi-synchronous replication master (disabled by default).", + GLOBAL_VAR(rpl_semi_sync_master_enabled), + CMD_LINE(OPT_ARG), DEFAULT(FALSE), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(fix_rpl_semi_sync_master_enabled)); + +static Sys_var_ulong Sys_semisync_master_timeout( + "rpl_semi_sync_master_timeout", + "The timeout value (in ms) for semi-synchronous replication in the " + "master", + GLOBAL_VAR(rpl_semi_sync_master_timeout), + CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0,~0L),DEFAULT(10000),BLOCK_SIZE(1), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(fix_rpl_semi_sync_master_timeout)); +static Sys_var_mybool Sys_semisync_master_wait_no_slave( + "rpl_semi_sync_master_wait_no_slave", + "Wait until timeout when no semi-synchronous replication slave " + "available (enabled by default).", + GLOBAL_VAR(rpl_semi_sync_master_wait_no_slave), + CMD_LINE(OPT_ARG), DEFAULT(TRUE), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(fix_rpl_semi_sync_master_wait_no_slave)); + +static Sys_var_ulong Sys_semisync_master_trace_level( + "rpl_semi_sync_master_trace_level", + "The tracing level for semi-sync replication.", + GLOBAL_VAR(rpl_semi_sync_master_trace_level), + CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0,~0L),DEFAULT(32),BLOCK_SIZE(1), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(fix_rpl_semi_sync_master_trace_level)); + +static const char *repl_semisync_wait_point[]= +{"AFTER_SYNC", "AFTER_COMMIT", NullS}; + +static Sys_var_enum Sys_semisync_master_wait_point( + "rpl_semi_sync_master_wait_point", + "Should transaction wait for semi-sync ack after having synced binlog, " + "or after having committed in storage engine.", + GLOBAL_VAR(rpl_semi_sync_master_wait_point), CMD_LINE(REQUIRED_ARG), + repl_semisync_wait_point, DEFAULT(1), + NO_MUTEX_GUARD, NOT_IN_BINLOG,ON_CHECK(0), + ON_UPDATE(fix_rpl_semi_sync_master_wait_point)); + +static bool fix_rpl_semi_sync_slave_enabled(sys_var *self, THD *thd, + enum_var_type type) +{ + repl_semisync_slave.setSlaveEnabled(rpl_semi_sync_slave_enabled != 0); + return false; +} + +static bool fix_rpl_semi_sync_slave_trace_level(sys_var *self, THD *thd, + enum_var_type type) +{ + repl_semisync_slave.setTraceLevel(rpl_semi_sync_slave_trace_level); + return false; +} + +#ifdef HAVE_ACC_RECEIVER +static bool fix_rpl_semi_sync_slave_delay_master(sys_var *self, THD *thd, + enum_var_type type) +{ + repl_semisync_slave.setDelayMaster(rpl_semi_sync_slave_delay_master); + return false; +} + +static bool fix_rpl_semi_sync_slave_kill_conn_timeout(sys_var *self, THD *thd, + enum_var_type type) +{ + repl_semisync_slave.setKillConnTimeout(rpl_semi_sync_slave_kill_conn_timeout); + return false; +} +#endif + + +static Sys_var_mybool Sys_semisync_slave_enabled( + "rpl_semi_sync_slave_enabled", + "Enable semi-synchronous replication slave (disabled by default).", + GLOBAL_VAR(rpl_semi_sync_slave_enabled), + CMD_LINE(OPT_ARG), DEFAULT(FALSE), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(fix_rpl_semi_sync_slave_enabled)); + +static Sys_var_ulong Sys_semisync_slave_trace_level( + "rpl_semi_sync_slave_trace_level", + "The tracing level for semi-sync replication.", + GLOBAL_VAR(rpl_semi_sync_slave_trace_level), + CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0,~0L),DEFAULT(32),BLOCK_SIZE(1), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(fix_rpl_semi_sync_slave_trace_level)); + +#ifdef HAVE_ACC_RECEIVER +static Sys_var_mybool Sys_semisync_slave_delay_master( + "rpl_semi_sync_slave_delay_master", + "Only write master info file when ack is needed.", + GLOBAL_VAR(rpl_semi_sync_slave_delay_master), + CMD_LINE(OPT_ARG), DEFAULT(FALSE), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(fix_rpl_semi_sync_slave_delay_master)); + +static Sys_var_uint Sys_semisync_slave_kill_conn_timeout( + "rpl_semi_sync_slave_kill_conn_timeout", + "Timeout for the mysql connection used to kill the slave io_thread's " + "connection on master. This timeout comes into play when stop slave " + "is executed.", + GLOBAL_VAR(rpl_semi_sync_slave_kill_conn_timeout), + CMD_LINE(OPT_ARG), + VALID_RANGE(0, UINT_MAX), DEFAULT(5), BLOCK_SIZE(1), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(fix_rpl_semi_sync_slave_kill_conn_timeout)); +#endif +#endif /* HAVE_REPLICATION */ static Sys_var_ulong Sys_slow_launch_time( "slow_launch_time", |