summaryrefslogtreecommitdiff
path: root/plugin/semisync/semisync_master.cc
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/semisync/semisync_master.cc')
-rw-r--r--plugin/semisync/semisync_master.cc1218
1 files changed, 0 insertions, 1218 deletions
diff --git a/plugin/semisync/semisync_master.cc b/plugin/semisync/semisync_master.cc
deleted file mode 100644
index 975b2e13253..00000000000
--- a/plugin/semisync/semisync_master.cc
+++ /dev/null
@@ -1,1218 +0,0 @@
-/* Copyright (C) 2007 Google Inc.
- Copyright (c) 2008, 2013, Oracle and/or its affiliates.
- Copyright (c) 2011, 2016, MariaDB
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; version 2 of the License.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
-
-
-#include <my_global.h>
-#include "semisync_master.h"
-
-#define TIME_THOUSAND 1000
-#define TIME_MILLION 1000000
-#define TIME_BILLION 1000000000
-
-/* This indicates whether semi-synchronous replication is enabled. */
-char rpl_semi_sync_master_enabled;
-unsigned long rpl_semi_sync_master_wait_point =
- SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT;
-unsigned long rpl_semi_sync_master_timeout;
-unsigned long rpl_semi_sync_master_trace_level;
-char rpl_semi_sync_master_status = 0;
-unsigned long rpl_semi_sync_master_yes_transactions = 0;
-unsigned long rpl_semi_sync_master_no_transactions = 0;
-unsigned long rpl_semi_sync_master_off_times = 0;
-unsigned long rpl_semi_sync_master_timefunc_fails = 0;
-unsigned long rpl_semi_sync_master_wait_timeouts = 0;
-unsigned long rpl_semi_sync_master_wait_sessions = 0;
-unsigned long rpl_semi_sync_master_wait_pos_backtraverse = 0;
-unsigned long rpl_semi_sync_master_avg_trx_wait_time = 0;
-unsigned long long rpl_semi_sync_master_trx_wait_num = 0;
-unsigned long rpl_semi_sync_master_avg_net_wait_time = 0;
-unsigned long long rpl_semi_sync_master_net_wait_num = 0;
-unsigned long rpl_semi_sync_master_clients = 0;
-unsigned long long rpl_semi_sync_master_net_wait_time = 0;
-unsigned long long rpl_semi_sync_master_trx_wait_time = 0;
-char rpl_semi_sync_master_wait_no_slave = 1;
-
-
-static int getWaitTime(const struct timespec& start_ts);
-
-static unsigned long long timespec_to_usec(const struct timespec *ts)
-{
- return (unsigned long long) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND;
-}
-
-/*******************************************************************************
- *
- * <ActiveTranx> class : manage all active transaction nodes
- *
- ******************************************************************************/
-
-ActiveTranx::ActiveTranx(mysql_mutex_t *lock,
- unsigned long trace_level)
- : Trace(trace_level), allocator_(max_connections),
- num_entries_(max_connections << 1), /* Transaction hash table size
- * is set to double the size
- * of max_connections */
- lock_(lock)
-{
- /* No transactions are in the list initially. */
- trx_front_ = NULL;
- trx_rear_ = NULL;
-
- /* Create the hash table to find a transaction's ending event. */
- trx_htb_ = new TranxNode *[num_entries_];
- for (int idx = 0; idx < num_entries_; ++idx)
- trx_htb_[idx] = NULL;
-
- sql_print_information("Semi-sync replication initialized for transactions.");
-}
-
-ActiveTranx::~ActiveTranx()
-{
- delete [] trx_htb_;
- trx_htb_ = NULL;
- num_entries_ = 0;
-}
-
-unsigned int ActiveTranx::calc_hash(const unsigned char *key,
- unsigned int length)
-{
- unsigned int nr = 1, nr2 = 4;
-
- /* The hash implementation comes from calc_hashnr() in mysys/hash.c. */
- while (length--)
- {
- nr ^= (((nr & 63)+nr2)*((unsigned int) (unsigned char) *key++))+ (nr << 8);
- nr2 += 3;
- }
- return((unsigned int) nr);
-}
-
-unsigned int ActiveTranx::get_hash_value(const char *log_file_name,
- my_off_t log_file_pos)
-{
- unsigned int hash1 = calc_hash((const unsigned char *)log_file_name,
- strlen(log_file_name));
- unsigned int hash2 = calc_hash((const unsigned char *)(&log_file_pos),
- sizeof(log_file_pos));
-
- return (hash1 + hash2) % num_entries_;
-}
-
-int ActiveTranx::compare(const char *log_file_name1, my_off_t log_file_pos1,
- const char *log_file_name2, my_off_t log_file_pos2)
-{
- int cmp = strcmp(log_file_name1, log_file_name2);
-
- if (cmp != 0)
- return cmp;
-
- if (log_file_pos1 > log_file_pos2)
- return 1;
- else if (log_file_pos1 < log_file_pos2)
- return -1;
- return 0;
-}
-
-int ActiveTranx::insert_tranx_node(const char *log_file_name,
- my_off_t log_file_pos)
-{
- const char *kWho = "ActiveTranx:insert_tranx_node";
- TranxNode *ins_node;
- int result = 0;
- unsigned int hash_val;
-
- function_enter(kWho);
-
- ins_node = allocator_.allocate_node();
- if (!ins_node)
- {
- sql_print_error("%s: transaction node allocation failed for: (%s, %lu)",
- kWho, log_file_name, (unsigned long)log_file_pos);
- result = -1;
- goto l_end;
- }
-
- /* insert the binlog position in the active transaction list. */
- strncpy(ins_node->log_name_, log_file_name, FN_REFLEN-1);
- ins_node->log_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
- ins_node->log_pos_ = log_file_pos;
-
- if (!trx_front_)
- {
- /* The list is empty. */
- trx_front_ = trx_rear_ = ins_node;
- }
- else
- {
- int cmp = compare(ins_node, trx_rear_);
- if (cmp > 0)
- {
- /* Compare with the tail first. If the transaction happens later in
- * binlog, then make it the new tail.
- */
- trx_rear_->next_ = ins_node;
- trx_rear_ = ins_node;
- }
- else
- {
- /* Otherwise, it is an error because the transaction should hold the
- * mysql_bin_log.LOCK_log when appending events.
- */
- sql_print_error("%s: binlog write out-of-order, tail (%s, %lu), "
- "new node (%s, %lu)", kWho,
- trx_rear_->log_name_, (unsigned long)trx_rear_->log_pos_,
- ins_node->log_name_, (unsigned long)ins_node->log_pos_);
- result = -1;
- goto l_end;
- }
- }
-
- hash_val = get_hash_value(ins_node->log_name_, ins_node->log_pos_);
- ins_node->hash_next_ = trx_htb_[hash_val];
- trx_htb_[hash_val] = ins_node;
-
- if (trace_level_ & kTraceDetail)
- sql_print_information("%s: insert (%s, %lu) in entry(%u)", kWho,
- ins_node->log_name_, (unsigned long)ins_node->log_pos_,
- hash_val);
-
- l_end:
- return function_exit(kWho, result);
-}
-
-bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
- my_off_t log_file_pos)
-{
- const char *kWho = "ActiveTranx::is_tranx_end_pos";
- function_enter(kWho);
-
- unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
- TranxNode *entry = trx_htb_[hash_val];
-
- while (entry != NULL)
- {
- if (compare(entry, log_file_name, log_file_pos) == 0)
- break;
-
- entry = entry->hash_next_;
- }
-
- if (trace_level_ & kTraceDetail)
- sql_print_information("%s: probe (%s, %lu) in entry(%u)", kWho,
- log_file_name, (unsigned long)log_file_pos, hash_val);
-
- function_exit(kWho, (entry != NULL));
- return (entry != NULL);
-}
-
-int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
- my_off_t log_file_pos)
-{
- const char *kWho = "ActiveTranx::::clear_active_tranx_nodes";
- TranxNode *new_front;
-
- function_enter(kWho);
-
- if (log_file_name != NULL)
- {
- new_front = trx_front_;
-
- while (new_front)
- {
- if (compare(new_front, log_file_name, log_file_pos) > 0)
- break;
- new_front = new_front->next_;
- }
- }
- else
- {
- /* If log_file_name is NULL, clear everything. */
- new_front = NULL;
- }
-
- if (new_front == NULL)
- {
- /* No active transaction nodes after the call. */
-
- /* Clear the hash table. */
- memset(trx_htb_, 0, num_entries_ * sizeof(TranxNode *));
- allocator_.free_all_nodes();
-
- /* Clear the active transaction list. */
- if (trx_front_ != NULL)
- {
- trx_front_ = NULL;
- trx_rear_ = NULL;
- }
-
- if (trace_level_ & kTraceDetail)
- sql_print_information("%s: cleared all nodes", kWho);
- }
- else if (new_front != trx_front_)
- {
- TranxNode *curr_node, *next_node;
-
- /* Delete all transaction nodes before the confirmation point. */
- int n_frees = 0;
- curr_node = trx_front_;
- while (curr_node != new_front)
- {
- next_node = curr_node->next_;
- n_frees++;
-
- /* Remove the node from the hash table. */
- unsigned int hash_val = get_hash_value(curr_node->log_name_, curr_node->log_pos_);
- TranxNode **hash_ptr = &(trx_htb_[hash_val]);
- while ((*hash_ptr) != NULL)
- {
- if ((*hash_ptr) == curr_node)
- {
- (*hash_ptr) = curr_node->hash_next_;
- break;
- }
- hash_ptr = &((*hash_ptr)->hash_next_);
- }
-
- curr_node = next_node;
- }
-
- trx_front_ = new_front;
- allocator_.free_nodes_before(trx_front_);
-
- if (trace_level_ & kTraceDetail)
- sql_print_information("%s: cleared %d nodes back until pos (%s, %lu)",
- kWho, n_frees,
- trx_front_->log_name_, (unsigned long)trx_front_->log_pos_);
- }
-
- return function_exit(kWho, 0);
-}
-
-
-/*******************************************************************************
- *
- * <ReplSemiSyncMaster> class: the basic code layer for sync-replication master.
- * <ReplSemiSyncSlave> class: the basic code layer for sync-replication slave.
- *
- * The most important functions during semi-syn replication listed:
- *
- * Master:
- * . reportReplyBinlog(): called by the binlog dump thread when it receives
- * the slave's status information.
- * . updateSyncHeader(): based on transaction waiting information, decide
- * whether to request the slave to reply.
- * . writeTranxInBinlog(): called by the transaction thread when it finishes
- * writing all transaction events in binlog.
- * . commitTrx(): transaction thread wait for the slave reply.
- *
- * Slave:
- * . slaveReadSyncHeader(): read the semi-sync header from the master, get the
- * sync status and get the payload for events.
- * . slaveReply(): reply to the master about the replication progress.
- *
- ******************************************************************************/
-
-ReplSemiSyncMaster::ReplSemiSyncMaster()
- : active_tranxs_(NULL),
- init_done_(false),
- reply_file_name_inited_(false),
- reply_file_pos_(0L),
- wait_file_name_inited_(false),
- wait_file_pos_(0),
- master_enabled_(false),
- wait_timeout_(0L),
- state_(0)
-{
- strcpy(reply_file_name_, "");
- strcpy(wait_file_name_, "");
-}
-
-int ReplSemiSyncMaster::initObject()
-{
- int result;
- const char *kWho = "ReplSemiSyncMaster::initObject";
-
- if (init_done_)
- {
- fprintf(stderr, "%s called twice\n", kWho);
- return 1;
- }
- init_done_ = true;
-
- /* References to the parameter works after set_options(). */
- setWaitTimeout(rpl_semi_sync_master_timeout);
- setTraceLevel(rpl_semi_sync_master_trace_level);
-
- /* Mutex initialization can only be done after MY_INIT(). */
- mysql_mutex_init(key_ss_mutex_LOCK_binlog_,
- &LOCK_binlog_, MY_MUTEX_INIT_FAST);
- mysql_cond_init(key_ss_cond_COND_binlog_send_,
- &COND_binlog_send_, NULL);
-
- if (rpl_semi_sync_master_enabled)
- result = enableMaster();
- else
- result = disableMaster();
-
- return result;
-}
-
-int ReplSemiSyncMaster::enableMaster()
-{
- int result = 0;
-
- /* Must have the lock when we do enable of disable. */
- lock();
-
- if (!getMasterEnabled())
- {
- active_tranxs_ = new ActiveTranx(&LOCK_binlog_, trace_level_);
- if (active_tranxs_ != NULL)
- {
- commit_file_name_inited_ = false;
- reply_file_name_inited_ = false;
- wait_file_name_inited_ = false;
-
- set_master_enabled(true);
- state_ = true;
- sql_print_information("Semi-sync replication enabled on the master.");
- }
- else
- {
- sql_print_error("Cannot allocate memory to enable semi-sync on the master.");
- result = -1;
- }
- }
-
- unlock();
-
- return result;
-}
-
-int ReplSemiSyncMaster::disableMaster()
-{
- /* Must have the lock when we do enable of disable. */
- lock();
-
- if (getMasterEnabled())
- {
- /* Switch off the semi-sync first so that waiting transaction will be
- * waken up.
- */
- switch_off();
-
- assert(active_tranxs_ != NULL);
- delete active_tranxs_;
- active_tranxs_ = NULL;
-
- reply_file_name_inited_ = false;
- wait_file_name_inited_ = false;
- commit_file_name_inited_ = false;
-
- set_master_enabled(false);
- sql_print_information("Semi-sync replication disabled on the master.");
- }
-
- unlock();
-
- return 0;
-}
-
-void ReplSemiSyncMaster::cleanup()
-{
- if (init_done_)
- {
- mysql_mutex_destroy(&LOCK_binlog_);
- mysql_cond_destroy(&COND_binlog_send_);
- init_done_= 0;
- }
-
- delete active_tranxs_;
-}
-
-void ReplSemiSyncMaster::lock()
-{
- mysql_mutex_lock(&LOCK_binlog_);
-}
-
-void ReplSemiSyncMaster::unlock()
-{
- mysql_mutex_unlock(&LOCK_binlog_);
-}
-
-void ReplSemiSyncMaster::cond_broadcast()
-{
- mysql_cond_broadcast(&COND_binlog_send_);
-}
-
-int ReplSemiSyncMaster::cond_timewait(struct timespec *wait_time)
-{
- const char *kWho = "ReplSemiSyncMaster::cond_timewait()";
- int wait_res;
-
- function_enter(kWho);
- wait_res= mysql_cond_timedwait(&COND_binlog_send_,
- &LOCK_binlog_, wait_time);
- return function_exit(kWho, wait_res);
-}
-
-void ReplSemiSyncMaster::add_slave()
-{
- lock();
- rpl_semi_sync_master_clients++;
- unlock();
-}
-
-void ReplSemiSyncMaster::remove_slave()
-{
- lock();
- rpl_semi_sync_master_clients--;
-
- /* Only switch off if semi-sync is enabled and is on */
- if (getMasterEnabled() && is_on())
- {
- /* If user has chosen not to wait if no semi-sync slave available
- and the last semi-sync slave exits, turn off semi-sync on master
- immediately.
- */
- if (!rpl_semi_sync_master_wait_no_slave &&
- rpl_semi_sync_master_clients == 0)
- switch_off();
- }
- unlock();
-}
-
-bool ReplSemiSyncMaster::is_semi_sync_slave()
-{
- int null_value;
- long long val= 0;
- get_user_var_int("rpl_semi_sync_slave", &val, &null_value);
- return val;
-}
-
-int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
- const char *log_file_name,
- my_off_t log_file_pos)
-{
- const char *kWho = "ReplSemiSyncMaster::reportReplyBinlog";
- int cmp;
- bool can_release_threads = false;
- bool need_copy_send_pos = true;
-
- if (!(getMasterEnabled()))
- return 0;
-
- function_enter(kWho);
-
- lock();
-
- /* This is the real check inside the mutex. */
- if (!getMasterEnabled())
- goto l_end;
-
- if (!is_on())
- /* We check to see whether we can switch semi-sync ON. */
- try_switch_on(server_id, log_file_name, log_file_pos);
-
- /* The position should increase monotonically, if there is only one
- * thread sending the binlog to the slave.
- * In reality, to improve the transaction availability, we allow multiple
- * sync replication slaves. So, if any one of them get the transaction,
- * the transaction session in the primary can move forward.
- */
- if (reply_file_name_inited_)
- {
- cmp = ActiveTranx::compare(log_file_name, log_file_pos,
- reply_file_name_, reply_file_pos_);
-
- /* If the requested position is behind the sending binlog position,
- * would not adjust sending binlog position.
- * We based on the assumption that there are multiple semi-sync slave,
- * and at least one of them shou/ld be up to date.
- * If all semi-sync slaves are behind, at least initially, the primary
- * can find the situation after the waiting timeout. After that, some
- * slaves should catch up quickly.
- */
- if (cmp < 0)
- {
- /* If the position is behind, do not copy it. */
- need_copy_send_pos = false;
- }
- }
-
- if (need_copy_send_pos)
- {
- strmake_buf(reply_file_name_, log_file_name);
- reply_file_pos_ = log_file_pos;
- reply_file_name_inited_ = true;
-
- /* Remove all active transaction nodes before this point. */
- assert(active_tranxs_ != NULL);
- active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos);
-
- if (trace_level_ & kTraceDetail)
- sql_print_information("%s: Got reply at (%s, %lu)", kWho,
- log_file_name, (unsigned long)log_file_pos);
- }
-
- if (rpl_semi_sync_master_wait_sessions > 0)
- {
- /* Let us check if some of the waiting threads doing a trx
- * commit can now proceed.
- */
- cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
- wait_file_name_, wait_file_pos_);
- if (cmp >= 0)
- {
- /* Yes, at least one waiting thread can now proceed:
- * let us release all waiting threads with a broadcast
- */
- can_release_threads = true;
- wait_file_name_inited_ = false;
- }
- }
-
- l_end:
- unlock();
-
- if (can_release_threads)
- {
- if (trace_level_ & kTraceDetail)
- sql_print_information("%s: signal all waiting threads.", kWho);
-
- cond_broadcast();
- }
-
- return function_exit(kWho, 0);
-}
-
-int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
- my_off_t trx_wait_binlog_pos)
-{
- const char *kWho = "ReplSemiSyncMaster::commitTrx";
-
- function_enter(kWho);
-
- if (getMasterEnabled() && trx_wait_binlog_name)
- {
- struct timespec start_ts;
- struct timespec abstime;
- int wait_result;
- PSI_stage_info old_stage;
-
- set_timespec(start_ts, 0);
-
- DEBUG_SYNC(current_thd, "rpl_semisync_master_commit_trx_before_lock");
- /* Acquire the mutex. */
- lock();
-
- /* This must be called after acquired the lock */
- THD_ENTER_COND(NULL, &COND_binlog_send_, &LOCK_binlog_,
- & stage_waiting_for_semi_sync_ack_from_slave,
- & old_stage);
-
- /* This is the real check inside the mutex. */
- if (!getMasterEnabled() || !is_on())
- goto l_end;
-
- if (trace_level_ & kTraceDetail)
- {
- sql_print_information("%s: wait pos (%s, %lu), repl(%d)\n", kWho,
- trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos,
- (int)is_on());
- }
-
- while (is_on() && !thd_killed(current_thd))
- {
- if (reply_file_name_inited_)
- {
- int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
- trx_wait_binlog_name, trx_wait_binlog_pos);
- if (cmp >= 0)
- {
- /* We have already sent the relevant binlog to the slave: no need to
- * wait here.
- */
- if (trace_level_ & kTraceDetail)
- sql_print_information("%s: Binlog reply is ahead (%s, %lu),",
- kWho, reply_file_name_, (unsigned long)reply_file_pos_);
- break;
- }
- }
-
- /* Let us update the info about the minimum binlog position of waiting
- * threads.
- */
- if (wait_file_name_inited_)
- {
- int cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos,
- wait_file_name_, wait_file_pos_);
- if (cmp <= 0)
- {
- /* This thd has a lower position, let's update the minimum info. */
- strmake_buf(wait_file_name_, trx_wait_binlog_name);
- wait_file_pos_ = trx_wait_binlog_pos;
-
- rpl_semi_sync_master_wait_pos_backtraverse++;
- if (trace_level_ & kTraceDetail)
- sql_print_information("%s: move back wait position (%s, %lu),",
- kWho, wait_file_name_, (unsigned long)wait_file_pos_);
- }
- }
- else
- {
- strmake_buf(wait_file_name_, trx_wait_binlog_name);
- wait_file_pos_ = trx_wait_binlog_pos;
- wait_file_name_inited_ = true;
-
- if (trace_level_ & kTraceDetail)
- sql_print_information("%s: init wait position (%s, %lu),",
- kWho, wait_file_name_, (unsigned long)wait_file_pos_);
- }
-
- /* Calcuate the waiting period. */
- long diff_secs = (long) (wait_timeout_ / TIME_THOUSAND);
- long diff_nsecs = (long) ((wait_timeout_ % TIME_THOUSAND) * TIME_MILLION);
- long nsecs = start_ts.tv_nsec + diff_nsecs;
- abstime.tv_sec = start_ts.tv_sec + diff_secs + nsecs/TIME_BILLION;
- abstime.tv_nsec = nsecs % TIME_BILLION;
-
- /* In semi-synchronous replication, we wait until the binlog-dump
- * thread has received the reply on the relevant binlog segment from the
- * replication slave.
- *
- * Let us suspend this thread to wait on the condition;
- * when replication has progressed far enough, we will release
- * these waiting threads.
- */
- rpl_semi_sync_master_wait_sessions++;
-
- if (trace_level_ & kTraceDetail)
- sql_print_information("%s: wait %lu ms for binlog sent (%s, %lu)",
- kWho, wait_timeout_,
- wait_file_name_, (unsigned long)wait_file_pos_);
-
- wait_result = cond_timewait(&abstime);
- rpl_semi_sync_master_wait_sessions--;
-
- if (wait_result != 0)
- {
- /* This is a real wait timeout. */
- sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), "
- "semi-sync up to file %s, position %lu.",
- trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos,
- reply_file_name_, (unsigned long)reply_file_pos_);
- rpl_semi_sync_master_wait_timeouts++;
-
- /* switch semi-sync off */
- switch_off();
- }
- else
- {
- int wait_time;
-
- wait_time = getWaitTime(start_ts);
- if (wait_time < 0)
- {
- if (trace_level_ & kTraceGeneral)
- {
- sql_print_error("Replication semi-sync getWaitTime fail at "
- "wait position (%s, %lu)",
- trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos);
- }
- rpl_semi_sync_master_timefunc_fails++;
- }
- else
- {
- rpl_semi_sync_master_trx_wait_num++;
- rpl_semi_sync_master_trx_wait_time += wait_time;
- }
- }
- }
-
- /*
- At this point, the binlog file and position of this transaction
- must have been removed from ActiveTranx.
- active_tranxs_ may be NULL if someone disabled semi sync during
- cond_timewait()
- */
- assert(thd_killed(current_thd) || !active_tranxs_ ||
- !active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name,
- trx_wait_binlog_pos));
-
- l_end:
- /* Update the status counter. */
- if (is_on())
- rpl_semi_sync_master_yes_transactions++;
- else
- rpl_semi_sync_master_no_transactions++;
-
- /* The lock held will be released by thd_exit_cond, so no need to
- call unlock() here */
- THD_EXIT_COND(NULL, & old_stage);
- }
-
- return function_exit(kWho, 0);
-}
-
-/* Indicate that semi-sync replication is OFF now.
- *
- * What should we do when it is disabled? The problem is that we want
- * the semi-sync replication enabled again when the slave catches up
- * later. But, it is not that easy to detect that the slave has caught
- * up. This is caused by the fact that MySQL's replication protocol is
- * asynchronous, meaning that if the master does not use the semi-sync
- * protocol, the slave would not send anything to the master.
- * Still, if the master is sending (N+1)-th event, we assume that it is
- * an indicator that the slave has received N-th event and earlier ones.
- *
- * If semi-sync is disabled, all transactions still update the wait
- * position with the last position in binlog. But no transactions will
- * wait for confirmations and the active transaction list would not be
- * maintained. In binlog dump thread, updateSyncHeader() checks whether
- * the current sending event catches up with last wait position. If it
- * does match, semi-sync will be switched on again.
- */
-int ReplSemiSyncMaster::switch_off()
-{
- const char *kWho = "ReplSemiSyncMaster::switch_off";
- int result;
-
- function_enter(kWho);
- state_ = false;
-
- /* Clear the active transaction list. */
- assert(active_tranxs_ != NULL);
- result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
-
- rpl_semi_sync_master_off_times++;
- wait_file_name_inited_ = false;
- reply_file_name_inited_ = false;
- sql_print_information("Semi-sync replication switched OFF.");
- cond_broadcast(); /* wake up all waiting threads */
-
- return function_exit(kWho, result);
-}
-
-int ReplSemiSyncMaster::try_switch_on(int server_id,
- const char *log_file_name,
- my_off_t log_file_pos)
-{
- const char *kWho = "ReplSemiSyncMaster::try_switch_on";
- bool semi_sync_on = false;
-
- function_enter(kWho);
-
- /* If the current sending event's position is larger than or equal to the
- * 'largest' commit transaction binlog position, the slave is already
- * catching up now and we can switch semi-sync on here.
- * If commit_file_name_inited_ indicates there are no recent transactions,
- * we can enable semi-sync immediately.
- */
- if (commit_file_name_inited_)
- {
- int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
- commit_file_name_, commit_file_pos_);
- semi_sync_on = (cmp >= 0);
- }
- else
- {
- semi_sync_on = true;
- }
-
- if (semi_sync_on)
- {
- /* Switch semi-sync replication on. */
- state_ = true;
-
- sql_print_information("Semi-sync replication switched ON with slave (server_id: %d) "
- "at (%s, %lu)",
- server_id, log_file_name,
- (unsigned long)log_file_pos);
- }
-
- return function_exit(kWho, 0);
-}
-
-int ReplSemiSyncMaster::reserveSyncHeader(unsigned char *header,
- unsigned long size)
-{
- const char *kWho = "ReplSemiSyncMaster::reserveSyncHeader";
- function_enter(kWho);
-
- int hlen=0;
- if (!is_semi_sync_slave())
- {
- hlen= 0;
- }
- else
- {
- /* No enough space for the extra header, disable semi-sync master */
- if (sizeof(kSyncHeader) > size)
- {
- sql_print_warning("No enough space in the packet "
- "for semi-sync extra header, "
- "semi-sync replication disabled");
- disableMaster();
- return 0;
- }
-
- /* Set the magic number and the sync status. By default, no sync
- * is required.
- */
- memcpy(header, kSyncHeader, sizeof(kSyncHeader));
- hlen= sizeof(kSyncHeader);
- }
- return function_exit(kWho, hlen);
-}
-
-int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
- const char *log_file_name,
- my_off_t log_file_pos,
- uint32 server_id)
-{
- const char *kWho = "ReplSemiSyncMaster::updateSyncHeader";
- int cmp = 0;
- bool sync = false;
-
- /* If the semi-sync master is not enabled, or the slave is not a semi-sync
- * target, do not request replies from the slave.
- */
- if (!getMasterEnabled() || !is_semi_sync_slave())
- return 0;
-
- function_enter(kWho);
-
- lock();
-
- /* This is the real check inside the mutex. */
- if (!getMasterEnabled())
- goto l_end; // sync= false at this point in time
-
- if (is_on())
- {
- /* semi-sync is ON */
- /* sync= false; No sync unless a transaction is involved. */
-
- if (reply_file_name_inited_)
- {
- cmp = ActiveTranx::compare(log_file_name, log_file_pos,
- reply_file_name_, reply_file_pos_);
- if (cmp <= 0)
- {
- /* If we have already got the reply for the event, then we do
- * not need to sync the transaction again.
- */
- goto l_end;
- }
- }
-
- if (wait_file_name_inited_)
- {
- cmp = ActiveTranx::compare(log_file_name, log_file_pos,
- wait_file_name_, wait_file_pos_);
- }
- else
- {
- cmp = 1;
- }
-
- /* If we are already waiting for some transaction replies which
- * are later in binlog, do not wait for this one event.
- */
- if (cmp >= 0)
- {
- /*
- * We only wait if the event is a transaction's ending event.
- */
- assert(active_tranxs_ != NULL);
- sync = active_tranxs_->is_tranx_end_pos(log_file_name,
- log_file_pos);
- }
- }
- else
- {
- if (commit_file_name_inited_)
- {
- int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
- commit_file_name_, commit_file_pos_);
- sync = (cmp >= 0);
- }
- else
- {
- sync = true;
- }
- }
-
- if (trace_level_ & kTraceDetail)
- sql_print_information("%s: server(%d), (%s, %lu) sync(%d), repl(%d)",
- kWho, server_id, log_file_name,
- (unsigned long)log_file_pos, sync, (int)is_on());
-
- l_end:
- unlock();
-
- /* We do not need to clear sync flag because we set it to 0 when we
- * reserve the packet header.
- */
- if (sync)
- {
- (packet)[2] = kPacketFlagSync;
- }
-
- return function_exit(kWho, 0);
-}
-
-int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
- my_off_t log_file_pos)
-{
- const char *kWho = "ReplSemiSyncMaster::writeTranxInBinlog";
- int result = 0;
-
- function_enter(kWho);
-
- lock();
-
- /* This is the real check inside the mutex. */
- if (!getMasterEnabled())
- goto l_end;
-
- /* Update the 'largest' transaction commit position seen so far even
- * though semi-sync is switched off.
- * It is much better that we update commit_file_* here, instead of
- * inside commitTrx(). This is mostly because updateSyncHeader()
- * will watch for commit_file_* to decide whether to switch semi-sync
- * on. The detailed reason is explained in function updateSyncHeader().
- */
- if (commit_file_name_inited_)
- {
- int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
- commit_file_name_, commit_file_pos_);
- if (cmp > 0)
- {
- /* This is a larger position, let's update the maximum info. */
- strncpy(commit_file_name_, log_file_name, FN_REFLEN-1);
- commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
- commit_file_pos_ = log_file_pos;
- }
- }
- else
- {
- strncpy(commit_file_name_, log_file_name, FN_REFLEN-1);
- commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
- commit_file_pos_ = log_file_pos;
- commit_file_name_inited_ = true;
- }
-
- if (is_on())
- {
- assert(active_tranxs_ != NULL);
- if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
- {
- /*
- if insert tranx_node failed, print a warning message
- and turn off semi-sync
- */
- sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu",
- log_file_name, (ulong)log_file_pos);
- switch_off();
- }
- }
-
- l_end:
- unlock();
-
- return function_exit(kWho, result);
-}
-
-int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
- const char *event_buf)
-{
- const char *kWho = "ReplSemiSyncMaster::readSlaveReply";
- const unsigned char *packet;
- char log_file_name[FN_REFLEN];
- my_off_t log_file_pos;
- ulong log_file_len = 0;
- ulong packet_len;
- int result = -1;
- struct timespec start_ts;
- ulong trc_level = trace_level_;
- LINT_INIT_STRUCT(start_ts);
-
- function_enter(kWho);
-
- assert((unsigned char)event_buf[1] == kPacketMagicNum);
- if ((unsigned char)event_buf[2] != kPacketFlagSync)
- {
- /* current event does not require reply */
- result = 0;
- goto l_end;
- }
-
- if (trc_level & kTraceNetWait)
- set_timespec(start_ts, 0);
-
- /* We flush to make sure that the current event is sent to the network,
- * instead of being buffered in the TCP/IP stack.
- */
- if (net_flush(net))
- {
- sql_print_error("Semi-sync master failed on net_flush() "
- "before waiting for slave reply");
- goto l_end;
- }
-
- net_clear(net, 0);
- if (trc_level & kTraceDetail)
- sql_print_information("%s: Wait for replica's reply", kWho);
-
- /* Wait for the network here. Though binlog dump thread can indefinitely wait
- * here, transactions would not wait indefintely.
- * Transactions wait on binlog replies detected by binlog dump threads. If
- * binlog dump threads wait too long, transactions will timeout and continue.
- */
- packet_len = my_net_read(net);
-
- if (trc_level & kTraceNetWait)
- {
- int wait_time = getWaitTime(start_ts);
- if (wait_time < 0)
- {
- sql_print_error("Semi-sync master wait for reply "
- "fail to get wait time.");
- rpl_semi_sync_master_timefunc_fails++;
- }
- else
- {
- rpl_semi_sync_master_net_wait_num++;
- rpl_semi_sync_master_net_wait_time += wait_time;
- }
- }
-
- if (packet_len == packet_error || packet_len < REPLY_BINLOG_NAME_OFFSET)
- {
- if (packet_len == packet_error)
- sql_print_error("Read semi-sync reply network error: %s (errno: %d)",
- net->last_error, net->last_errno);
- else
- sql_print_error("Read semi-sync reply length error: %s (errno: %d)",
- net->last_error, net->last_errno);
- goto l_end;
- }
-
- packet = net->read_pos;
- if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
- {
- sql_print_error("Read semi-sync reply magic number error");
- goto l_end;
- }
-
- log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
- log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
- if (log_file_len >= FN_REFLEN)
- {
- sql_print_error("Read semi-sync reply binlog file length too large");
- goto l_end;
- }
- strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
- log_file_name[log_file_len] = 0;
-
- if (trc_level & kTraceDetail)
- sql_print_information("%s: Got reply (%s, %lu)",
- kWho, log_file_name, (ulong)log_file_pos);
-
- result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
-
- l_end:
- return function_exit(kWho, result);
-}
-
-
-int ReplSemiSyncMaster::resetMaster()
-{
- const char *kWho = "ReplSemiSyncMaster::resetMaster";
- int result = 0;
-
- function_enter(kWho);
-
-
- lock();
-
- state_ = getMasterEnabled()? 1 : 0;
-
- wait_file_name_inited_ = false;
- reply_file_name_inited_ = false;
- commit_file_name_inited_ = false;
-
- rpl_semi_sync_master_yes_transactions = 0;
- rpl_semi_sync_master_no_transactions = 0;
- rpl_semi_sync_master_off_times = 0;
- rpl_semi_sync_master_timefunc_fails = 0;
- rpl_semi_sync_master_wait_sessions = 0;
- rpl_semi_sync_master_wait_pos_backtraverse = 0;
- rpl_semi_sync_master_trx_wait_num = 0;
- rpl_semi_sync_master_trx_wait_time = 0;
- rpl_semi_sync_master_net_wait_num = 0;
- rpl_semi_sync_master_net_wait_time = 0;
-
- unlock();
-
- return function_exit(kWho, result);
-}
-
-void ReplSemiSyncMaster::setExportStats()
-{
- lock();
-
- rpl_semi_sync_master_status = state_;
- rpl_semi_sync_master_avg_trx_wait_time=
- ((rpl_semi_sync_master_trx_wait_num) ?
- (unsigned long)((double)rpl_semi_sync_master_trx_wait_time /
- ((double)rpl_semi_sync_master_trx_wait_num)) : 0);
- rpl_semi_sync_master_avg_net_wait_time=
- ((rpl_semi_sync_master_net_wait_num) ?
- (unsigned long)((double)rpl_semi_sync_master_net_wait_time /
- ((double)rpl_semi_sync_master_net_wait_num)) : 0);
-
- unlock();
-}
-
-/* Get the waiting time given the wait's staring time.
- *
- * Return:
- * >= 0: the waiting time in microsecons(us)
- * < 0: error in get time or time back traverse
- */
-static int getWaitTime(const struct timespec& start_ts)
-{
- unsigned long long start_usecs, end_usecs;
- struct timespec end_ts;
-
- /* Starting time in microseconds(us). */
- start_usecs = timespec_to_usec(&start_ts);
-
- /* Get the wait time interval. */
- set_timespec(end_ts, 0);
-
- /* Ending time in microseconds(us). */
- end_usecs = timespec_to_usec(&end_ts);
-
- if (end_usecs < start_usecs)
- return -1;
-
- return (int)(end_usecs - start_usecs);
-}