summaryrefslogtreecommitdiff
path: root/plugin
diff options
context:
space:
mode:
Diffstat (limited to 'plugin')
-rw-r--r--plugin/semisync/semisync.cc4
-rw-r--r--plugin/semisync/semisync.h12
-rw-r--r--plugin/semisync/semisync_master.cc545
-rw-r--r--plugin/semisync/semisync_master.h202
-rw-r--r--plugin/semisync/semisync_master_plugin.cc121
-rw-r--r--plugin/semisync/semisync_slave.cc44
-rw-r--r--plugin/semisync/semisync_slave.h16
-rw-r--r--plugin/semisync/semisync_slave_plugin.cc34
8 files changed, 877 insertions, 101 deletions
diff --git a/plugin/semisync/semisync.cc b/plugin/semisync/semisync.cc
index 4a80360ba4d..fc02b9d95b6 100644
--- a/plugin/semisync/semisync.cc
+++ b/plugin/semisync/semisync.cc
@@ -20,6 +20,7 @@
const unsigned char ReplSemiSyncBase::kPacketMagicNum = 0xef;
const unsigned char ReplSemiSyncBase::kPacketFlagSync = 0x01;
+const unsigned char ReplSemiSyncBase::kPacketFlagSyncAndReport = 0x02;
const unsigned long Trace::kTraceGeneral = 0x0001;
@@ -29,3 +30,6 @@ const unsigned long Trace::kTraceFunction = 0x0040;
const unsigned char ReplSemiSyncBase::kSyncHeader[2] =
{ReplSemiSyncBase::kPacketMagicNum, 0};
+
+const char* const ReplSemiSyncBase::kRplSemiSyncSlaveReportExec =
+ "rpl_semi_sync_slave_report_exec";
diff --git a/plugin/semisync/semisync.h b/plugin/semisync/semisync.h
index 28577296817..78faba9e773 100644
--- a/plugin/semisync/semisync.h
+++ b/plugin/semisync/semisync.h
@@ -75,13 +75,23 @@ public:
/* Constants in network packet header. */
static const unsigned char kPacketMagicNum;
+ /* this event should be semisync acked */
static const unsigned char kPacketFlagSync;
+ /* this event should be semisync acked including the current SQL position */
+ static const unsigned char kPacketFlagSyncAndReport;
+
+ /* user variable for enabling exec-pos reporting */
+ static const char* const kRplSemiSyncSlaveReportExec;
};
/* The layout of a semisync slave reply packet:
1 byte for the magic num
8 bytes for the binlog positon
- n bytes for the binlog filename, terminated with a '\0'
+ n bytes for the binlog filename, NOT terminated with a '\0'
+ [ optionally ]
+ 1 byte == 0
+ 8 bytes for the sql-thread position
+ n bytes for the sql-thread filename, terminated with a '\0'
*/
#define REPLY_MAGIC_NUM_LEN 1
#define REPLY_BINLOG_POS_LEN 8
diff --git a/plugin/semisync/semisync_master.cc b/plugin/semisync/semisync_master.cc
index 7901853c3f8..54a3a908820 100644
--- a/plugin/semisync/semisync_master.cc
+++ b/plugin/semisync/semisync_master.cc
@@ -22,6 +22,9 @@
#define TIME_MILLION 1000000
#define TIME_BILLION 1000000000
+/* thd_key for per slave thread state */
+static MYSQL_THD_KEY_T thd_key;
+
/* This indicates whether semi-synchronous replication is enabled. */
char rpl_semi_sync_master_enabled;
unsigned long rpl_semi_sync_master_wait_point =
@@ -45,6 +48,18 @@ unsigned long long rpl_semi_sync_master_net_wait_time = 0;
unsigned long long rpl_semi_sync_master_trx_wait_time = 0;
char rpl_semi_sync_master_wait_no_slave = 1;
+unsigned long rpl_semi_sync_master_max_unacked_event_count = 0;
+unsigned long rpl_semi_sync_master_max_unacked_event_bytes = 4096;
+
+unsigned long rpl_semi_sync_master_slave_lag_clients = 0;
+unsigned long long rpl_semi_sync_master_estimated_slave_lag = 0;
+unsigned long rpl_semi_sync_master_slave_lag_heartbeat_frequency_us = 500000;
+unsigned long rpl_semi_sync_master_max_slave_lag = 0;
+unsigned long rpl_semi_sync_master_slave_lag_wait_sessions = 0;
+
+unsigned long rpl_semi_sync_master_avg_trx_slave_lag_wait_time = 0;
+unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_num = 0;
+unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_time = 0;
static int getWaitTime(const struct timespec& start_ts);
@@ -150,6 +165,15 @@ int ActiveTranx::insert_tranx_node(const char *log_file_name,
ins_node->log_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
ins_node->log_pos_ = log_file_pos;
+ {
+ /**
+ * set trans commit time
+ * this is called when writing into binlog, which is not
+ * exactly right, but close enough for our purposes
+ */
+ ins_node->tranx_commit_time_us = my_hrtime().val;
+ }
+
if (!trx_front_)
{
/* The list is empty. */
@@ -193,12 +217,11 @@ int ActiveTranx::insert_tranx_node(const char *log_file_name,
return function_exit(kWho, result);
}
-bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
- my_off_t log_file_pos)
+TranxNode* ActiveTranx::lookup_tranx_end_pos(const char *log_file_name,
+ my_off_t log_file_pos)
{
- const char *kWho = "ActiveTranx::is_tranx_end_pos";
+ const char *kWho = "ActiveTranx::lookup_tranx_end_pos";
function_enter(kWho);
-
unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
TranxNode *entry = trx_htb_[hash_val];
@@ -211,38 +234,24 @@ bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
}
if (trace_level_ & kTraceDetail)
- sql_print_information("%s: probe (%s, %lu) in entry(%u)", kWho,
- log_file_name, (unsigned long)log_file_pos, hash_val);
+ sql_print_information("%s: probe (%s, %lu)", kWho,
+ log_file_name, (unsigned long)log_file_pos);
function_exit(kWho, (entry != NULL));
- return (entry != NULL);
+ return entry;
}
-int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
- my_off_t log_file_pos)
+int ActiveTranx::clear_active_tranx_nodes()
{
- const char *kWho = "ActiveTranx::::clear_active_tranx_nodes";
- TranxNode *new_front;
+ set_new_front(NULL);
+ return 0;
+}
+void ActiveTranx::set_new_front(TranxNode *new_front)
+{
+ const char *kWho = "ActiveTranx::set_new_front";
function_enter(kWho);
- if (log_file_name != NULL)
- {
- new_front = trx_front_;
-
- while (new_front)
- {
- if (compare(new_front, log_file_name, log_file_pos) > 0)
- break;
- new_front = new_front->next_;
- }
- }
- else
- {
- /* If log_file_name is NULL, clear everything. */
- new_front = NULL;
- }
-
if (new_front == NULL)
{
/* No active transaction nodes after the call. */
@@ -257,7 +266,6 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
trx_front_ = NULL;
trx_rear_ = NULL;
}
-
if (trace_level_ & kTraceDetail)
sql_print_information("%s: cleared all nodes", kWho);
}
@@ -291,14 +299,40 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
trx_front_ = new_front;
allocator_.free_nodes_before(trx_front_);
-
if (trace_level_ & kTraceDetail)
sql_print_information("%s: cleared %d nodes back until pos (%s, %lu)",
kWho, n_frees,
trx_front_->log_name_, (unsigned long)trx_front_->log_pos_);
}
+ function_exit(kWho, 0);
+}
- return function_exit(kWho, 0);
+bool ActiveTranx::prune_active_tranx_nodes(
+ LogPosPtr pos,
+ ulonglong *oldest_tranx_commit_time_us)
+{
+ TranxNode *old_front = trx_front_;
+ TranxNode *new_front;
+
+ new_front = trx_front_;
+ while (new_front)
+ {
+ if (compare(new_front, pos.file_name, pos.file_pos) > 0)
+ break;
+ new_front = new_front->next_;
+ }
+
+ set_new_front(new_front);
+
+ if (oldest_tranx_commit_time_us)
+ {
+ if (trx_front_ == NULL)
+ *oldest_tranx_commit_time_us = 0;
+ else
+ *oldest_tranx_commit_time_us = trx_front_->tranx_commit_time_us;
+ }
+
+ return ! (old_front == trx_front_);
}
@@ -334,7 +368,8 @@ ReplSemiSyncMaster::ReplSemiSyncMaster()
wait_file_pos_(0),
master_enabled_(false),
wait_timeout_(0L),
- state_(0)
+ state_(0),
+ oldest_unapplied_tranx_commit_time_us_(0)
{
strcpy(reply_file_name_, "");
strcpy(wait_file_name_, "");
@@ -362,11 +397,19 @@ int ReplSemiSyncMaster::initObject()
mysql_cond_init(key_ss_cond_COND_binlog_send_,
&COND_binlog_send_, NULL);
+ /* Mutex initialization can only be done after MY_INIT(). */
+ mysql_mutex_init(key_ss_mutex_LOCK_slave_lag_,
+ &LOCK_slave_lag_, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_ss_cond_COND_slave_lag_,
+ &COND_slave_lag_, NULL);
+
if (rpl_semi_sync_master_enabled)
result = enableMaster();
else
result = disableMaster();
+ thd_key_create(&thd_key);
+
return result;
}
@@ -437,6 +480,8 @@ void ReplSemiSyncMaster::cleanup()
{
mysql_mutex_destroy(&LOCK_binlog_);
mysql_cond_destroy(&COND_binlog_send_);
+ mysql_mutex_destroy(&LOCK_slave_lag_);
+ mysql_cond_destroy(&COND_slave_lag_);
init_done_= 0;
}
@@ -473,7 +518,34 @@ void ReplSemiSyncMaster::add_slave()
{
lock();
rpl_semi_sync_master_clients++;
+ if (has_semi_sync_slave_lag())
+ rpl_semi_sync_master_slave_lag_clients++;
unlock();
+
+ if (has_semi_sync_slave_lag())
+ {
+ int null_val = 0;
+ longlong new_val =
+ rpl_semi_sync_master_slave_lag_heartbeat_frequency_us * 1000;
+ longlong old_val = new_val + 1;
+
+ get_user_var_int("master_heartbeat_period", &old_val, &null_val);
+ if (old_val > new_val || null_val)
+ {
+ /* if there no old value or it's bigger than what we want */
+ int res = set_user_var_int("master_heartbeat_period",new_val, &old_val);
+ if (res == -1)
+ {
+ sql_print_error(
+ "Repl_semi_sync::failed to set master_heartbeat_period");
+ }
+ }
+ }
+
+ /**
+ * create per slave-state and store it in thread-local-storage */
+ ReplSemiSyncMasterPerSlaveState *state = new ReplSemiSyncMasterPerSlaveState;
+ thd_setspecific(current_thd, thd_key, state);
}
void ReplSemiSyncMaster::remove_slave()
@@ -492,7 +564,31 @@ void ReplSemiSyncMaster::remove_slave()
rpl_semi_sync_master_clients == 0)
switch_off();
}
+
+ bool no_slave_lag_clients = false;
+ if (has_semi_sync_slave_lag())
+ {
+ if (--rpl_semi_sync_master_slave_lag_clients == 0)
+ {
+ no_slave_lag_clients = true;
+ }
+ }
+
unlock();
+
+ ReplSemiSyncMasterPerSlaveState *state =
+ (ReplSemiSyncMasterPerSlaveState*)thd_getspecific(current_thd, thd_key);
+ thd_setspecific(current_thd, thd_key, NULL);
+
+ if (state != NULL)
+ {
+ delete state;
+ }
+
+ if (no_slave_lag_clients)
+ {
+ wake_slave_lag_waiters(0);
+ }
}
bool ReplSemiSyncMaster::is_semi_sync_slave()
@@ -503,14 +599,115 @@ bool ReplSemiSyncMaster::is_semi_sync_slave()
return val;
}
+bool ReplSemiSyncMaster::has_semi_sync_slave_lag()
+{
+ int null_value;
+ long long val= 0;
+ get_user_var_int(kRplSemiSyncSlaveReportExec, &val, &null_value);
+ return val;
+}
+
+int ReplSemiSyncMaster::checkSyncReq(const LogPosPtr *log_pos)
+{
+ if (log_pos == NULL)
+ {
+ /* heartbeat events does not have logpos (since they are not actually
+ * stored in the binlog).
+ */
+ if (!has_semi_sync_slave_lag())
+ {
+ /* don't semi-sync them if we haven't enabled slave-lag handling */
+ return 0;
+ }
+ else
+ {
+ /* else ask for both IO and exec position */
+ return 2;
+ }
+ }
+
+ /**
+ * check if this log-pos is a candidate for semi-syncing event
+ */
+ TranxNode *entry = active_tranxs_->lookup_tranx_end_pos(log_pos->file_name,
+ log_pos->file_pos);
+
+ if (entry == NULL)
+ return 0;
+
+ ReplSemiSyncMasterPerSlaveState *state =
+ (ReplSemiSyncMasterPerSlaveState*)thd_getspecific(current_thd,
+ thd_key);
+ do
+ {
+ state->unacked_event_count_++;
+
+ if (active_tranxs_->is_rear(entry))
+ {
+ /* always ask for ack on last event in tranx list */
+ break;
+ }
+
+ if (state->unacked_event_count_ >=
+ rpl_semi_sync_master_max_unacked_event_count)
+ {
+ /* enough events passed that it's time for another ack */
+ break;
+ }
+
+ if (!state->sync_req_pos_.IsInited())
+ {
+ /* first event => time for ack */
+ break;
+ }
+
+ if (strcmp(log_pos->file_name, state->sync_req_pos_.file_name) != 0)
+ {
+ /* new file => time for ack */
+ break;
+ }
+
+ if (log_pos->file_pos >= (state->sync_req_pos_.file_pos +
+ rpl_semi_sync_master_max_unacked_event_bytes))
+ {
+ /* enough bytes => time for ack */
+ break;
+ }
+
+ /* we skip asking for semi-sync ack on this event */
+ return 0;
+
+ } while (0);
+
+ /* keep track on when we last asked for semi-sync-ack */
+ state->unacked_event_count_ = 0;
+ state->sync_req_pos_.Assign(log_pos);
+
+ /**
+ * check if this slave can report back exec position
+ */
+ if (!has_semi_sync_slave_lag())
+ {
+ /* slave can't report back SQL position */
+ return 1;
+ }
+
+ /* ask for both IO and SQL position */
+ return 2;
+}
+
int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
const char *log_file_name,
- my_off_t log_file_pos)
+ my_off_t log_file_pos,
+ const LogPos *exec_pos)
{
const char *kWho = "ReplSemiSyncMaster::reportReplyBinlog";
int cmp;
bool can_release_threads = false;
bool need_copy_send_pos = true;
+ bool pruned_trx_list = false;
+ ulonglong oldest_tranx_commit_time_us = 0;
+
if (!(getMasterEnabled()))
return 0;
@@ -559,15 +756,29 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
reply_file_pos_ = log_file_pos;
reply_file_name_inited_ = true;
- /* Remove all active transaction nodes before this point. */
- assert(active_tranxs_ != NULL);
- active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos);
-
if (trace_level_ & kTraceDetail)
sql_print_information("%s: Got reply at (%s, %lu)", kWho,
log_file_name, (unsigned long)log_file_pos);
}
+ assert(active_tranxs_ != NULL);
+ if (exec_pos != NULL)
+ {
+ /* prune using exec_pos */
+ LogPosPtr ptr(*exec_pos);
+ pruned_trx_list = active_tranxs_->prune_active_tranx_nodes(
+ ptr, &oldest_tranx_commit_time_us);
+ }
+ else if (rpl_semi_sync_master_slave_lag_clients == 0 && need_copy_send_pos)
+ {
+ /**
+ * if we don't have any slaves that can do exec_pos reporting,
+ * prune by IO position as "plain old semi sync"
+ */
+ LogPosPtr ptr(log_file_name, log_file_pos);
+ active_tranxs_->prune_active_tranx_nodes(ptr, NULL);
+ }
+
if (rpl_semi_sync_master_wait_sessions > 0)
{
/* Let us check if some of the waiting threads doing a trx
@@ -596,6 +807,15 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
cond_broadcast();
}
+ if (pruned_trx_list)
+ {
+ /**
+ * if we did prune trx list, it might be that we should wake up
+ * threads waiting for slave-lag to decrease
+ */
+ wake_slave_lag_waiters(oldest_tranx_commit_time_us);
+ }
+
return function_exit(kWho, 0);
}
@@ -743,16 +963,6 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
}
}
- /*
- At this point, the binlog file and position of this transaction
- must have been removed from ActiveTranx.
- active_tranxs_ may be NULL if someone disabled semi sync during
- cond_timewait()
- */
- assert(thd_killed(current_thd) || !active_tranxs_ ||
- !active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name,
- trx_wait_binlog_pos));
-
l_end:
/* Update the status counter. */
if (is_on())
@@ -796,7 +1006,7 @@ int ReplSemiSyncMaster::switch_off()
/* Clear the active transaction list. */
assert(active_tranxs_ != NULL);
- result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
+ result = active_tranxs_->clear_active_tranx_nodes();
rpl_semi_sync_master_off_times++;
wait_file_name_inited_ = false;
@@ -886,7 +1096,7 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
{
const char *kWho = "ReplSemiSyncMaster::updateSyncHeader";
int cmp = 0;
- bool sync = false;
+ int sync = 0;
/* If the semi-sync master is not enabled, or the slave is not a semi-sync
* target, do not request replies from the slave.
@@ -907,6 +1117,13 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
/* semi-sync is ON */
/* sync= false; No sync unless a transaction is involved. */
+ if (log_file_name == NULL)
+ {
+ /* this is heartbeat, request io_pos and exec_pos */
+ sync = checkSyncReq(0);
+ goto l_end;
+ }
+
if (reply_file_name_inited_)
{
cmp = ActiveTranx::compare(log_file_name, log_file_pos,
@@ -935,12 +1152,12 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
*/
if (cmp >= 0)
{
- /*
+ /*
* We only wait if the event is a transaction's ending event.
*/
assert(active_tranxs_ != NULL);
- sync = active_tranxs_->is_tranx_end_pos(log_file_name,
- log_file_pos);
+ LogPosPtr pos(log_file_name, log_file_pos);
+ sync = checkSyncReq(&pos);
}
}
else
@@ -953,7 +1170,7 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
}
else
{
- sync = true;
+ sync = 1;
}
}
@@ -968,10 +1185,14 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
/* We do not need to clear sync flag because we set it to 0 when we
* reserve the packet header.
*/
- if (sync)
+ if (sync == 1)
{
(packet)[2] = kPacketFlagSync;
}
+ else if (sync == 2)
+ {
+ (packet)[2] = kPacketFlagSyncAndReport;
+ }
return function_exit(kWho, 0);
}
@@ -1020,7 +1241,8 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
if (is_on())
{
assert(active_tranxs_ != NULL);
- if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
+ bool empty = active_tranxs_->is_empty();
+ if (active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
{
/*
if insert tranx_node failed, print a warning message
@@ -1030,6 +1252,14 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
log_file_name, (ulong)log_file_pos);
switch_off();
}
+ else if (empty && rpl_semi_sync_master_slave_lag_clients > 0)
+ {
+ /* if the list of transactions was empty,
+ * we need to init the oldest_tranx_commit_time_us
+ */
+ oldest_unapplied_tranx_commit_time_us_ =
+ active_tranxs_->get_oldest_tranx_commit_time_us();
+ }
}
l_end:
@@ -1039,10 +1269,10 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
}
int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
- const char *event_buf)
+ const char *event_buf_)
{
const char *kWho = "ReplSemiSyncMaster::readSlaveReply";
- const unsigned char *packet;
+ const unsigned char *packet, *packet_start;
char log_file_name[FN_REFLEN];
my_off_t log_file_pos;
ulong log_file_len = 0;
@@ -1050,12 +1280,15 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
int result = -1;
struct timespec start_ts;
ulong trc_level = trace_level_;
+ const unsigned char *event_buf = (const unsigned char*)event_buf_;
+ bool exec_pos_present = false; // is SQL exec pos present in reply
+ LogPos exec_pos; // position of SQL thread
LINT_INIT_STRUCT(start_ts);
function_enter(kWho);
- assert((unsigned char)event_buf[1] == kPacketMagicNum);
- if ((unsigned char)event_buf[2] != kPacketFlagSync)
+ assert(event_buf[1] == kPacketMagicNum);
+ if ((event_buf[2] & (kPacketFlagSync | kPacketFlagSyncAndReport)) == 0)
{
/* current event does not require reply */
result = 0;
@@ -1113,28 +1346,60 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
goto l_end;
}
- packet = net->read_pos;
+ packet_start = packet = net->read_pos;
if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
{
sql_print_error("Read semi-sync reply magic number error");
goto l_end;
}
+ /* we determine if this semisync ack contains a sql-thread exec-pos
+ * by checking if last byte == 0, since the packet then contains
+ * \0-terminated filenames */
+ exec_pos_present = packet[packet_len - 1] == 0;
+
log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
- log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
+ if (exec_pos_present == false)
+ {
+ log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
+ }
+ else
+ {
+ log_file_len = strnlen((char*)packet + REPLY_BINLOG_NAME_OFFSET,
+ MY_MIN((ulong)FN_REFLEN,
+ packet_len - REPLY_BINLOG_NAME_OFFSET));
+ }
if (log_file_len >= FN_REFLEN)
{
sql_print_error("Read semi-sync reply binlog file length too large");
goto l_end;
}
- strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
+ packet+= REPLY_BINLOG_NAME_OFFSET;
+
+ strncpy(log_file_name, (const char*)packet, log_file_len);
log_file_name[log_file_len] = 0;
+ if (exec_pos_present)
+ {
+ packet += log_file_len + 1;
+ if (packet + 8 + 1 >= (packet_start + packet_len))
+ {
+ sql_print_error("Read semi-sync reply binlog. "
+ "Packet to short to contain exec-position!");
+ goto l_end;
+ }
+ exec_pos.file_pos = uint8korr(packet);
+ packet += 8;
+ strncpy(exec_pos.file_name, (char*)packet,
+ (packet_start + packet_len) - packet);
+ }
+
if (trc_level & kTraceDetail)
sql_print_information("%s: Got reply (%s, %lu)",
kWho, log_file_name, (ulong)log_file_pos);
- result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
+ result = reportReplyBinlog(server_id, log_file_name, log_file_pos,
+ exec_pos_present ? &exec_pos : NULL);
l_end:
return function_exit(kWho, result);
@@ -1156,6 +1421,16 @@ int ReplSemiSyncMaster::resetMaster()
wait_file_name_inited_ = false;
reply_file_name_inited_ = false;
commit_file_name_inited_ = false;
+ if (active_tranxs_ != NULL)
+ {
+ /**
+ * make sure to empty transaction hash/list
+ * with slave-lag reporting this container does
+ * not have to be empty even if no transaction is
+ * currently running
+ */
+ active_tranxs_->clear_active_tranx_nodes();
+ }
rpl_semi_sync_master_yes_transactions = 0;
rpl_semi_sync_master_no_transactions = 0;
@@ -1170,6 +1445,13 @@ int ReplSemiSyncMaster::resetMaster()
unlock();
+ mysql_mutex_lock(&LOCK_slave_lag_);
+ rpl_semi_sync_master_slave_lag_wait_sessions = 0;
+ oldest_unapplied_tranx_commit_time_us_ = 0;
+ rpl_semi_sync_master_trx_slave_lag_wait_num = 0;
+ rpl_semi_sync_master_trx_slave_lag_wait_time = 0;
+ mysql_mutex_unlock(&LOCK_slave_lag_);
+
return function_exit(kWho, result);
}
@@ -1188,6 +1470,29 @@ void ReplSemiSyncMaster::setExportStats()
((double)rpl_semi_sync_master_net_wait_num)) : 0);
unlock();
+
+ if (oldest_unapplied_tranx_commit_time_us_ != 0)
+ {
+ rpl_semi_sync_master_estimated_slave_lag = my_hrtime().val -
+ oldest_unapplied_tranx_commit_time_us_;
+ }
+ else
+ {
+ rpl_semi_sync_master_estimated_slave_lag = 0;
+ }
+
+ mysql_mutex_lock(&LOCK_slave_lag_);
+ if (rpl_semi_sync_master_trx_slave_lag_wait_num)
+ {
+ rpl_semi_sync_master_avg_trx_slave_lag_wait_time =
+ (unsigned long)((double)rpl_semi_sync_master_trx_slave_lag_wait_time /
+ (double)rpl_semi_sync_master_trx_slave_lag_wait_num);
+ }
+ else
+ {
+ rpl_semi_sync_master_avg_trx_slave_lag_wait_time = 0;
+ }
+ mysql_mutex_unlock(&LOCK_slave_lag_);
}
/* Get the waiting time given the wait's staring time.
@@ -1215,3 +1520,117 @@ static int getWaitTime(const struct timespec& start_ts)
return (int)(end_usecs - start_usecs);
}
+
+void ReplSemiSyncMaster::wake_slave_lag_waiters(
+ ulonglong oldest_unapplied_tranx_commit_time_us)
+{
+ mysql_mutex_lock(&LOCK_slave_lag_);
+ oldest_unapplied_tranx_commit_time_us_ =
+ oldest_unapplied_tranx_commit_time_us;
+
+ if (rpl_semi_sync_master_slave_lag_wait_sessions > 0)
+ {
+ mysql_cond_broadcast(&COND_slave_lag_);
+ }
+ mysql_mutex_unlock(&LOCK_slave_lag_);
+}
+
+int ReplSemiSyncMaster::wait_slave_lag(ulong timeout_sec)
+{
+ int error = 0;
+ PSI_stage_info old_stage;
+
+ /* slave lag waiting not enabled, return directly */
+ if (rpl_semi_sync_master_max_slave_lag == 0)
+ return 0;
+
+ /* there is no slave that can report slave lag, return directly */
+ if (rpl_semi_sync_master_slave_lag_clients == 0)
+ return 0;
+
+ /* compute start_time and end_time */
+ struct timespec end_time;
+ set_timespec(end_time, 0);
+ ulonglong start_time_us = timespec_to_usec(&end_time);
+ end_time.tv_sec += timeout_sec;
+
+ mysql_mutex_lock(&LOCK_slave_lag_);
+
+ if (oldest_unapplied_tranx_commit_time_us_ == 0)
+ {
+ /* no slave lag, atleast one slave is up to date */
+ mysql_mutex_unlock(&LOCK_slave_lag_);
+ return 0;
+ }
+
+ if (rpl_semi_sync_master_max_slave_lag == 0)
+ {
+ /* slave lag waiting not enabled */
+ mysql_mutex_unlock(&LOCK_slave_lag_);
+ return 0;
+ }
+
+ /* This must be called after acquired the lock */
+ THD_ENTER_COND(NULL, &COND_slave_lag_, &LOCK_slave_lag_,
+ &stage_waiting_for_semi_sync_slave_lag,
+ &old_stage);
+
+ bool waited = false;
+ ulonglong lag = 0;
+ ulonglong max_lag = 0;
+ while (oldest_unapplied_tranx_commit_time_us_ != 0)
+ {
+ /* check kill_level after THD_ENTER_COND but *before* cond_wait
+ * to avoid missing kills */
+ if (! (getMasterEnabled() && is_on() &&
+ thd_kill_level(current_thd) == THD_IS_NOT_KILLED))
+ break;
+
+ lag = start_time_us - oldest_unapplied_tranx_commit_time_us_;
+ max_lag = 1000000 * rpl_semi_sync_master_max_slave_lag;
+ if (lag <= max_lag)
+ break;
+
+ waited = true;
+ rpl_semi_sync_master_slave_lag_wait_sessions++;
+ int wait_result = mysql_cond_timedwait(&COND_slave_lag_, &LOCK_slave_lag_,
+ &end_time);
+ rpl_semi_sync_master_slave_lag_wait_sessions--;
+
+ bool thd_was_killed = thd_kill_level(current_thd) != THD_IS_NOT_KILLED;
+ if (wait_result != 0 || thd_was_killed)
+ {
+ break;
+ }
+ }
+
+ if (thd_kill_level(current_thd) != THD_IS_NOT_KILLED)
+ {
+ /* Return error to client. */
+ error = 1;
+ my_printf_error(ER_ERROR_DURING_COMMIT,
+ "Killed while waiting for replication semi-sync slave-lag.",
+ MYF(0));
+ }
+ else if (lag > max_lag)
+ {
+ /* Return error to client. */
+ error = 1;
+ my_printf_error(ER_ERROR_DURING_COMMIT,
+ "Slave-lag timeout",
+ MYF(0));
+ }
+
+ if (waited)
+ {
+ rpl_semi_sync_master_trx_slave_lag_wait_num++;
+ rpl_semi_sync_master_trx_slave_lag_wait_time +=
+ (my_hrtime().val - start_time_us);
+ }
+
+ /* The lock held will be released by thd_exit_cond, so no need to
+ call unlock() here */
+ THD_EXIT_COND(NULL, & old_stage);
+
+ return error;
+}
diff --git a/plugin/semisync/semisync_master.h b/plugin/semisync/semisync_master.h
index c2862476ec8..0167e26bcc8 100644
--- a/plugin/semisync/semisync_master.h
+++ b/plugin/semisync/semisync_master.h
@@ -24,17 +24,101 @@
#ifdef HAVE_PSI_INTERFACE
extern PSI_mutex_key key_ss_mutex_LOCK_binlog_;
extern PSI_cond_key key_ss_cond_COND_binlog_send_;
+
+extern PSI_mutex_key key_ss_mutex_LOCK_slave_lag_;
+extern PSI_cond_key key_ss_cond_COND_slave_lag_;
#endif
extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave;
+extern PSI_stage_info stage_waiting_for_semi_sync_slave_lag;
struct TranxNode {
char log_name_[FN_REFLEN];
- my_off_t log_pos_;
+ my_off_t log_pos_;
+ ulonglong tranx_commit_time_us;
struct TranxNode *next_; /* the next node in the sorted list */
struct TranxNode *hash_next_; /* the next node during hash collision */
};
+struct LogPos;
+
+/* This represent a log position */
+struct LogPosPtr {
+ LogPosPtr() { Uninit();}
+ LogPosPtr(const char *name, my_off_t pos) : file_name(name), file_pos(pos){}
+ explicit LogPosPtr(const LogPos& pos) { Assign(&pos); }
+
+ const char *file_name;
+ my_off_t file_pos;
+
+ LogPosPtr& Assign(const LogPosPtr *src) {
+ file_name = src->file_name;
+ file_pos = src->file_pos;
+ return *this;
+ }
+
+ LogPosPtr& Assign(const LogPos *src);
+
+ void Uninit() { file_name = NULL;}
+ bool IsInited() const { return file_name != NULL; }
+};
+
+struct LogPos {
+ char file_name[FN_REFLEN];
+ my_off_t file_pos;
+
+ LogPos() { Uninit(); }
+
+ LogPosPtr ToLogPosPtr() const {
+ if (IsInited()){
+ LogPosPtr p(file_name, file_pos);
+ return p;
+ } else {
+ LogPosPtr p;
+ return p;
+ }
+ }
+
+ LogPos& Assign(const LogPosPtr *src) {
+ if (src->IsInited()) {
+ strcpy(file_name, src->file_name);
+ file_pos = src->file_pos;
+ } else {
+ Uninit();
+ }
+ return *this;
+ }
+
+ LogPos& Assign(const LogPos* src) {
+ LogPosPtr p = src->ToLogPosPtr();
+ Assign(&p);
+ return *this;
+ }
+
+ void Uninit() { file_name[0] = 0; }
+ bool IsInited() const { return file_name[0] != 0; }
+};
+
+inline LogPosPtr& LogPosPtr::Assign(const LogPos* src) {
+ LogPosPtr p = src->ToLogPosPtr();
+ Assign(&p);
+ return *this;
+}
+
+inline int CompareLogPos(const LogPosPtr *pos1, const LogPosPtr *pos2) {
+ int cmp = strcmp(pos1->file_name, pos2->file_name);
+
+ if (cmp != 0)
+ return cmp;
+
+ if (pos1->file_pos > pos2->file_pos)
+ return 1;
+ else if (pos1->file_pos < pos2->file_pos)
+ return -1;
+ else
+ return 0;
+}
+
/**
@class TranxNodeAllocator
@@ -329,10 +413,14 @@ private:
node2->log_name_, node2->log_pos_);
}
+ void set_new_front(TranxNode* new_front);
+
public:
ActiveTranx(mysql_mutex_t *lock, unsigned long trace_level);
~ActiveTranx();
+ bool is_empty() const { return trx_front_ == NULL; }
+
/* Insert an active transaction node with the specified position.
*
* Return:
@@ -340,21 +428,42 @@ public:
*/
int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
- /* Clear the active transaction nodes until(inclusive) the specified
- * position.
- * If log_file_name is NULL, everything will be cleared: the sorted
+ /* Clear the active transaction
+ * Everything will be cleared: the sorted
* list and the hash table will be reset to empty.
- *
+ *
* Return:
- * 0: success; non-zero: error
+ * 0 success; non-zero: error
+ */
+ int clear_active_tranx_nodes();
+
+ /* Prune the active transaction nodes until the specified
+ * position (inclusive).
+ *
+ * Return:
+ * true if any transaction was removed
+ * false if list was left unchanged
*/
- int clear_active_tranx_nodes(const char *log_file_name,
- my_off_t log_file_pos);
+ bool prune_active_tranx_nodes(LogPosPtr logpos,
+ ulonglong *oldest_tranx_commit_time_us);
- /* Given a position, check to see whether the position is an active
- * transaction's ending position by probing the hash table.
+ /* Lookup a transaction's ending position by probing the hash table.
+ *
+ * return entry if found or NULL otherwise
*/
- bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos);
+ TranxNode* lookup_tranx_end_pos(const char *log_file_name,
+ my_off_t log_file_pos);
+
+
+ /* Check if an entry is rear (i.e last) */
+ bool is_rear(TranxNode* entry) const { return entry == trx_rear_; }
+
+ /**
+ * return timestamp of oldest transaction in list
+ */
+ ulonglong get_oldest_tranx_commit_time_us() const {
+ return trx_front_->tranx_commit_time_us;
+ }
/* Given two binlog positions, compare which one is bigger based on
* (file_name, file_position).
@@ -365,6 +474,24 @@ public:
};
/**
+ * State that semisync master keeps per slave
+ */
+struct ReplSemiSyncMasterPerSlaveState
+{
+ ReplSemiSyncMasterPerSlaveState() : unacked_event_count_(0) {}
+
+ /**
+ * No of events that has not been semi-sync acked
+ */
+ unsigned unacked_event_count_;
+
+ /**
+ * Position of last event that was semisync'ed
+ */
+ LogPos sync_req_pos_;
+};
+
+/**
The extension class for the master of semi-synchronous replication
*/
class ReplSemiSyncMaster
@@ -432,6 +559,16 @@ class ReplSemiSyncMaster
bool state_; /* whether semi-sync is switched */
+ /* This cond variable is signaled when slave lag has decreased */
+ mysql_cond_t COND_slave_lag_;
+
+ /* Mutex that protects oldest_unapplied_tranx_commit_time_us */
+ mysql_mutex_t LOCK_slave_lag_;
+
+ /* this is commit time of oldest transaction that has not been applied
+ * on any slave */
+ ulonglong oldest_unapplied_tranx_commit_time_us_;
+
void lock();
void unlock();
void cond_broadcast();
@@ -493,6 +630,9 @@ class ReplSemiSyncMaster
/* Is the slave servered by the thread requested semi-sync */
bool is_semi_sync_slave();
+ /* Does this slave have slave lag reporting capabilities */
+ bool has_semi_sync_slave_lag();
+
/* In semi-sync replication, reports up to which binlog position we have
* received replies from the slave indicating that it already get the events.
*
@@ -501,13 +641,15 @@ class ReplSemiSyncMaster
* log_file_name - (IN) binlog file name
* end_offset - (IN) the offset in the binlog file up to which we have
* the replies from the slave
+ * exec_position - (IN) position of SQL thread or NULL if not present
*
* Return:
* 0: success; non-zero: error
*/
int reportReplyBinlog(uint32 server_id,
const char* log_file_name,
- my_off_t end_offset);
+ my_off_t end_offset,
+ const LogPos *exec_position);
/* Commit a transaction in the final step. This function is called from
* InnoDB before returning from the low commit. If semi-sync is switch on,
@@ -540,6 +682,16 @@ class ReplSemiSyncMaster
*/
int reserveSyncHeader(unsigned char *header, unsigned long size);
+ /*
+ * check if an event should be semi synced and optionally
+ * if it should report back position of SQL thread on slave
+ *
+ * return 0 - no semi sync
+ * 1 - semi sync
+ * 2 - semi sync and report exec position
+ */
+ int checkSyncReq(const LogPosPtr *log_pos);
+
/* Update the sync bit in the packet header to indicate to the slave whether
* the master will wait for the reply of the event. If semi-sync is switched
* off and we detect that the slave is catching up, we switch semi-sync on.
@@ -592,6 +744,21 @@ class ReplSemiSyncMaster
* go off for that.
*/
int resetMaster();
+
+ /**
+ * wake potential slave-lag waiters
+ * called by binlog dump-thread(s)
+ */
+ void wake_slave_lag_waiters(ulonglong oldest_unapplied_tranx_commit_time_us);
+
+ /**
+ * wait for slave lag to get below threshold
+ * called by user-thread(s)
+ *
+ * return 0 - success
+ * 1 - timeout
+ */
+ int wait_slave_lag(ulong max_wait_time_sec);
};
enum rpl_semi_sync_master_wait_point_t {
@@ -621,6 +788,17 @@ extern unsigned long long rpl_semi_sync_master_trx_wait_num;
extern unsigned long long rpl_semi_sync_master_net_wait_time;
extern unsigned long long rpl_semi_sync_master_trx_wait_time;
+extern unsigned long rpl_semi_sync_master_max_unacked_event_count;
+extern unsigned long rpl_semi_sync_master_max_unacked_event_bytes;
+extern unsigned long rpl_semi_sync_master_max_slave_lag;
+extern unsigned long rpl_semi_sync_master_slave_lag_heartbeat_frequency_us;
+extern unsigned long rpl_semi_sync_master_slave_lag_wait_sessions;
+extern unsigned long long rpl_semi_sync_master_estimated_slave_lag;
+
+extern unsigned long rpl_semi_sync_master_avg_trx_slave_lag_wait_time;
+extern unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_num;
+extern unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_time;
+
/*
This indicates whether we should keep waiting if no semi-sync slave
is available.
diff --git a/plugin/semisync/semisync_master_plugin.cc b/plugin/semisync/semisync_master_plugin.cc
index 309910312c4..58bddf019bd 100644
--- a/plugin/semisync/semisync_master_plugin.cc
+++ b/plugin/semisync/semisync_master_plugin.cc
@@ -21,6 +21,11 @@
static ReplSemiSyncMaster repl_semisync;
+// forward declaration
+static inline ulong get_slave_lag_wait_timeout(THD* thd);
+
+static char rpl_semi_sync_master_group_commit = 0;
+
C_MODE_START
int repl_semi_report_binlog_update(Binlog_storage_param *param,
@@ -31,6 +36,13 @@ int repl_semi_report_binlog_update(Binlog_storage_param *param,
if (repl_semisync.getMasterEnabled())
{
+ if (rpl_semi_sync_master_group_commit &&
+ ((flags & BINLOG_GROUP_COMMIT_TRAILER) == 0))
+ {
+ /** there are transactions more coming... */
+ return 0;
+ }
+
/*
Let us store the binlog file name and the position, so that
we know how long to wait for the binlog to the replicated to
@@ -43,8 +55,11 @@ int repl_semi_report_binlog_update(Binlog_storage_param *param,
return error;
}
-int repl_semi_request_commit(Trans_param *param)
+int repl_semi_before_commit(Trans_param *param, int *error)
{
+ *error = repl_semisync.wait_slave_lag(
+ get_slave_lag_wait_timeout(current_thd));
+
return 0;
}
@@ -53,6 +68,14 @@ int repl_semi_report_binlog_sync(Binlog_storage_param *param,
my_off_t log_pos, uint32 flags)
{
int error= 0;
+
+ if (rpl_semi_sync_master_group_commit &&
+ ((flags & BINLOG_GROUP_COMMIT_TRAILER) == 0))
+ {
+ /** there are transactions more coming... */
+ return 0;
+ }
+
if (rpl_semi_sync_master_wait_point ==
SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC)
{
@@ -100,7 +123,7 @@ int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
Let's assume this semi-sync slave has already received all
binlog events before the filename and position it requests.
*/
- repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos);
+ repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos, NULL);
}
sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
semi_sync_slave ? "semi-sync" : "asynchronous",
@@ -242,15 +265,72 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
&fix_rpl_semi_sync_master_trace_level, // update
32, 0, ~0UL, 1);
+static MYSQL_SYSVAR_ULONG(max_unacked_event_count,
+ rpl_semi_sync_master_max_unacked_event_count,
+ PLUGIN_VAR_OPCMDARG,
+ "Maximum unacked replication events",
+ NULL, // check
+ NULL, // update
+ rpl_semi_sync_master_max_unacked_event_count, 0, ~0UL, 1);
+
+static MYSQL_SYSVAR_ULONG(max_unacked_event_bytes,
+ rpl_semi_sync_master_max_unacked_event_bytes,
+ PLUGIN_VAR_OPCMDARG,
+ "Maximum unacked replication bytes",
+ NULL, // check
+ NULL, // update
+ rpl_semi_sync_master_max_unacked_event_bytes, 0, ~0UL, 1);
+
+static MYSQL_SYSVAR_ULONG(max_slave_lag, rpl_semi_sync_master_max_slave_lag,
+ PLUGIN_VAR_OPCMDARG,
+ "Maximum allowed lag of fastest semi-sync slave (in seconds), "
+ "checked before commit.",
+ NULL, // check
+ NULL, // update
+ rpl_semi_sync_master_max_slave_lag, 0, ~0UL, 1);
+
+static MYSQL_THDVAR_ULONG(slave_lag_wait_timeout,
+ PLUGIN_VAR_RQCMDARG,
+ "Timeout in seconds a rw-transaction may wait for max slave lag before "
+ "being rolled back.",
+ NULL, NULL, 50, 1, 1024 * 1024 * 1024, 0);
+
+static MYSQL_SYSVAR_ULONG(
+ slave_lag_heartbeat_frequency_us,
+ rpl_semi_sync_master_slave_lag_heartbeat_frequency_us,
+ PLUGIN_VAR_RQCMDARG,
+ "Heartbeat frequency when slave-lag is enabled (in microseconds).",
+ NULL, // check
+ NULL, // update
+ 500000, /* 500 ms */
+ 1, ~0UL, 1);
+
+static MYSQL_SYSVAR_BOOL(group_commit, rpl_semi_sync_master_group_commit,
+ PLUGIN_VAR_OPCMDARG,
+ "Group commit for semi sync",
+ NULL, // check
+ NULL,
+ 0);
+
static SYS_VAR* semi_sync_master_system_vars[]= {
MYSQL_SYSVAR(enabled),
MYSQL_SYSVAR(wait_point),
MYSQL_SYSVAR(timeout),
MYSQL_SYSVAR(wait_no_slave),
MYSQL_SYSVAR(trace_level),
+ MYSQL_SYSVAR(max_unacked_event_count),
+ MYSQL_SYSVAR(max_unacked_event_bytes),
+ MYSQL_SYSVAR(max_slave_lag),
+ MYSQL_SYSVAR(slave_lag_wait_timeout),
+ MYSQL_SYSVAR(slave_lag_heartbeat_frequency_us),
+ MYSQL_SYSVAR(group_commit),
NULL,
};
+static inline ulong get_slave_lag_wait_timeout(THD* thd)
+{
+ return THDVAR(thd, slave_lag_wait_timeout);
+}
static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
SYS_VAR *var,
@@ -297,6 +377,7 @@ Trans_observer trans_observer = {
repl_semi_report_commit, // after_commit
repl_semi_report_rollback, // after_rollback
+ repl_semi_before_commit, // before commit
};
Binlog_storage_observer storage_observer = {
@@ -339,7 +420,11 @@ DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG)
DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG)
DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG)
-
+DEF_SHOW_FUNC(slave_lag_wait_sessions, SHOW_LONG)
+DEF_SHOW_FUNC(estimated_slave_lag, SHOW_LONGLONG)
+DEF_SHOW_FUNC(trx_slave_lag_wait_time, SHOW_LONGLONG)
+DEF_SHOW_FUNC(trx_slave_lag_wait_num, SHOW_LONGLONG)
+DEF_SHOW_FUNC(avg_trx_slave_lag_wait_time, SHOW_LONG)
/* plugin status variables */
static SHOW_VAR semi_sync_master_status_vars[]= {
@@ -385,32 +470,55 @@ static SHOW_VAR semi_sync_master_status_vars[]= {
{"Rpl_semi_sync_master_net_avg_wait_time",
(char*) &SHOW_FNAME(avg_net_wait_time),
SHOW_SIMPLE_FUNC},
+ {"Rpl_semi_sync_master_slave_lag_wait_sessions",
+ (char*) &SHOW_FNAME(slave_lag_wait_sessions),
+ SHOW_SIMPLE_FUNC},
+ {"Rpl_semi_sync_master_estimated_slave_lag",
+ (char*) &SHOW_FNAME(estimated_slave_lag),
+ SHOW_SIMPLE_FUNC},
+ {"Rpl_semi_sync_master_tx_slave_lag_wait_time",
+ (char*) &SHOW_FNAME(trx_slave_lag_wait_time),
+ SHOW_SIMPLE_FUNC},
+ {"Rpl_semi_sync_master_tx_slave_lag_waits",
+ (char*) &SHOW_FNAME(trx_slave_lag_wait_num),
+ SHOW_SIMPLE_FUNC},
+ {"Rpl_semi_sync_master_tx_avg_slave_lag_wait_time",
+ (char*) &SHOW_FNAME(avg_trx_slave_lag_wait_time),
+ SHOW_SIMPLE_FUNC},
{NULL, NULL, SHOW_LONG},
};
#ifdef HAVE_PSI_INTERFACE
PSI_mutex_key key_ss_mutex_LOCK_binlog_;
+PSI_mutex_key key_ss_mutex_LOCK_slave_lag_;
static PSI_mutex_info all_semisync_mutexes[]=
{
- { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0}
+ { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0 },
+ { &key_ss_mutex_LOCK_slave_lag_, "LOCK_slave_lag_", 0 }
};
PSI_cond_key key_ss_cond_COND_binlog_send_;
+PSI_cond_key key_ss_cond_COND_slave_lag_;
static PSI_cond_info all_semisync_conds[]=
{
- { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0}
+ { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0 },
+ { &key_ss_cond_COND_slave_lag_, "COND_slave_lag_", 0 }
};
#endif /* HAVE_PSI_INTERFACE */
PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave=
{ 0, "Waiting for semi-sync ACK from slave", 0};
+PSI_stage_info stage_waiting_for_semi_sync_slave_lag=
+{ 0, "Waiting for semi-sync slave lag", 0};
+
#ifdef HAVE_PSI_INTERFACE
PSI_stage_info *all_semisync_stages[]=
{
- & stage_waiting_for_semi_sync_ack_from_slave
+ & stage_waiting_for_semi_sync_ack_from_slave,
+ & stage_waiting_for_semi_sync_slave_lag
};
static void init_semisync_psi_keys(void)
@@ -492,4 +600,3 @@ maria_declare_plugin(semisync_master)
MariaDB_PLUGIN_MATURITY_STABLE
}
maria_declare_plugin_end;
-
diff --git a/plugin/semisync/semisync_slave.cc b/plugin/semisync/semisync_slave.cc
index 5f98472d5d7..839e0cce29d 100644
--- a/plugin/semisync/semisync_slave.cc
+++ b/plugin/semisync/semisync_slave.cc
@@ -20,6 +20,7 @@
char rpl_semi_sync_slave_enabled;
char rpl_semi_sync_slave_status= 0;
unsigned long rpl_semi_sync_slave_trace_level;
+char rpl_semi_sync_slave_lag_enabled= 0;
int ReplSemiSyncSlave::initObject()
{
@@ -42,7 +43,7 @@ int ReplSemiSyncSlave::initObject()
int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
unsigned long total_len,
- bool *need_reply,
+ unsigned char *need_reply,
const char **payload,
unsigned long *payload_len)
{
@@ -52,7 +53,7 @@ int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
if ((unsigned char)(header[0]) == kPacketMagicNum)
{
- *need_reply = (header[1] & kPacketFlagSync);
+ *need_reply = (header[1] & (kPacketFlagSync | kPacketFlagSyncAndReport));
*payload_len = total_len - 2;
*payload = header + 2;
@@ -95,16 +96,20 @@ int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param)
return 0;
}
-int ReplSemiSyncSlave::slaveReply(MYSQL *mysql,
- const char *binlog_filename,
- my_off_t binlog_filepos)
+int ReplSemiSyncSlave::slaveReply(unsigned char header_byte,
+ MYSQL *mysql,
+ const char *binlog_filename,
+ my_off_t binlog_filepos,
+ Master_info * mi)
{
const char *kWho = "ReplSemiSyncSlave::slaveReply";
NET *net= &mysql->net;
- uchar reply_buffer[REPLY_MAGIC_NUM_LEN
- + REPLY_BINLOG_POS_LEN
- + REPLY_BINLOG_NAME_LEN];
+ uchar reply_buffer[REPLY_MAGIC_NUM_LEN +
+ 2 * ( REPLY_BINLOG_POS_LEN +
+ REPLY_BINLOG_NAME_LEN +
+ /* '\0' */ 1) ];
int reply_res, name_len = strlen(binlog_filename);
+ int msg_len = name_len + REPLY_BINLOG_NAME_OFFSET;
function_enter(kWho);
@@ -119,10 +124,29 @@ int ReplSemiSyncSlave::slaveReply(MYSQL *mysql,
sql_print_information("%s: reply (%s, %lu)", kWho,
binlog_filename, (ulong)binlog_filepos);
+ if (header_byte & kPacketFlagSyncAndReport)
+ {
+ /**
+ * master requests that we also report back SQL-thread position
+ */
+
+ // where to store sql filename/position
+ char *bufptr = (char*)reply_buffer + msg_len;
+ bufptr[0] = 0; // '\0' terminate previous filename
+ bufptr++;
+
+ my_off_t sql_file_pos;
+ // get file/position and store the filename directly info bufptr+8
+ size_t name_len2 = get_master_log_pos(mi, bufptr + 8, &sql_file_pos);
+ int8store(bufptr, sql_file_pos); // store position
+
+ msg_len += /* '\0' */ 1 + /* position */ 8 + name_len2 + /* '\0' */ 1;
+ }
+
net_clear(net, 0);
/* Send the reply. */
- reply_res = my_net_write(net, reply_buffer,
- name_len + REPLY_BINLOG_NAME_OFFSET);
+ reply_res = my_net_write(net, reply_buffer, msg_len);
+
if (!reply_res)
{
reply_res = net_flush(net);
diff --git a/plugin/semisync/semisync_slave.h b/plugin/semisync/semisync_slave.h
index 1bf8cf31972..c91847d828d 100644
--- a/plugin/semisync/semisync_slave.h
+++ b/plugin/semisync/semisync_slave.h
@@ -60,23 +60,30 @@ public:
* Return:
* 0: success; non-zero: error
*/
- int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply,
+ int slaveReadSyncHeader(const char *header, unsigned long total_len,
+ unsigned char *need_reply_byte,
const char **payload, unsigned long *payload_len);
/* A slave replies to the master indicating its replication process. It
* indicates that the slave has received all events before the specified
* binlog position.
- *
+ *
* Input:
+ * need_reply_byte - (IN) the header byte
* mysql - (IN) the mysql network connection
* binlog_filename - (IN) the reply point's binlog file name
* binlog_filepos - (IN) the reply point's binlog file offset
+ * master_info - (IN) the master info struct so that we can get more
+ * info if needed
*
* Return:
* 0: success; non-zero: error
*/
- int slaveReply(MYSQL *mysql, const char *binlog_filename,
- my_off_t binlog_filepos);
+ int slaveReply(unsigned char need_reply_byte,
+ MYSQL *mysql,
+ const char *binlog_filename,
+ my_off_t binlog_filepos,
+ Master_info* master_info);
int slaveStart(Binlog_relay_IO_param *param);
int slaveStop(Binlog_relay_IO_param *param);
@@ -93,5 +100,6 @@ private:
extern char rpl_semi_sync_slave_enabled;
extern unsigned long rpl_semi_sync_slave_trace_level;
extern char rpl_semi_sync_slave_status;
+extern char rpl_semi_sync_slave_lag_enabled;
#endif /* SEMISYNC_SLAVE_H */
diff --git a/plugin/semisync/semisync_slave_plugin.cc b/plugin/semisync/semisync_slave_plugin.cc
index 3a6c7625d93..6c85a07ae2a 100644
--- a/plugin/semisync/semisync_slave_plugin.cc
+++ b/plugin/semisync/semisync_slave_plugin.cc
@@ -28,7 +28,7 @@ static ReplSemiSyncSlave repl_semisync;
event read is the last event of a transaction. And the value is
checked in repl_semi_slave_queue_event.
*/
-bool semi_sync_need_reply= false;
+unsigned char semi_sync_need_reply= 0;
C_MODE_START
@@ -81,6 +81,23 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
return 1;
}
mysql_free_result(mysql_store_result(mysql));
+
+ if (rpl_semi_sync_slave_lag_enabled)
+ {
+ char buf[100];
+ /*
+ Tell master that we can do exec-position reporting
+ */
+ snprintf(buf, sizeof(buf), "SET @%s= 1",
+ ReplSemiSyncBase::kRplSemiSyncSlaveReportExec);
+ if (mysql_real_query(mysql, buf, strlen(buf)))
+ {
+ sql_print_error("query: %s on master failed", buf);
+ return 1;
+ }
+ mysql_free_result(mysql_store_result(mysql));
+ }
+
rpl_semi_sync_slave_status= 1;
return 0;
}
@@ -110,9 +127,11 @@ int repl_semi_slave_queue_event(Binlog_relay_IO_param *param,
should not cause the slave IO thread to stop, and the error
messages are already reported.
*/
- (void) repl_semisync.slaveReply(param->mysql,
+ (void) repl_semisync.slaveReply(semi_sync_need_reply,
+ param->mysql,
param->master_log_name,
- param->master_log_pos);
+ param->master_log_pos,
+ param->mi);
}
return 0;
}
@@ -164,9 +183,17 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_slave_trace_level,
&fix_rpl_semi_sync_trace_level, // update
32, 0, ~0UL, 1);
+static MYSQL_SYSVAR_BOOL(lag_enabled, rpl_semi_sync_slave_lag_enabled,
+ PLUGIN_VAR_OPCMDARG,
+ "Enable semi-synchronous replication slave lag reporting. ",
+ NULL, // check
+ NULL, // update
+ 0);
+
static SYS_VAR* semi_sync_slave_system_vars[]= {
MYSQL_SYSVAR(enabled),
MYSQL_SYSVAR(trace_level),
+ MYSQL_SYSVAR(lag_enabled),
NULL,
};
@@ -230,4 +257,3 @@ maria_declare_plugin(semisync_slave)
MariaDB_PLUGIN_MATURITY_STABLE
}
maria_declare_plugin_end;
-