diff options
Diffstat (limited to 'plugin')
-rw-r--r-- | plugin/semisync/semisync.cc | 4 | ||||
-rw-r--r-- | plugin/semisync/semisync.h | 12 | ||||
-rw-r--r-- | plugin/semisync/semisync_master.cc | 545 | ||||
-rw-r--r-- | plugin/semisync/semisync_master.h | 202 | ||||
-rw-r--r-- | plugin/semisync/semisync_master_plugin.cc | 121 | ||||
-rw-r--r-- | plugin/semisync/semisync_slave.cc | 44 | ||||
-rw-r--r-- | plugin/semisync/semisync_slave.h | 16 | ||||
-rw-r--r-- | plugin/semisync/semisync_slave_plugin.cc | 34 |
8 files changed, 877 insertions, 101 deletions
diff --git a/plugin/semisync/semisync.cc b/plugin/semisync/semisync.cc index 4a80360ba4d..fc02b9d95b6 100644 --- a/plugin/semisync/semisync.cc +++ b/plugin/semisync/semisync.cc @@ -20,6 +20,7 @@ const unsigned char ReplSemiSyncBase::kPacketMagicNum = 0xef; const unsigned char ReplSemiSyncBase::kPacketFlagSync = 0x01; +const unsigned char ReplSemiSyncBase::kPacketFlagSyncAndReport = 0x02; const unsigned long Trace::kTraceGeneral = 0x0001; @@ -29,3 +30,6 @@ const unsigned long Trace::kTraceFunction = 0x0040; const unsigned char ReplSemiSyncBase::kSyncHeader[2] = {ReplSemiSyncBase::kPacketMagicNum, 0}; + +const char* const ReplSemiSyncBase::kRplSemiSyncSlaveReportExec = + "rpl_semi_sync_slave_report_exec"; diff --git a/plugin/semisync/semisync.h b/plugin/semisync/semisync.h index 28577296817..78faba9e773 100644 --- a/plugin/semisync/semisync.h +++ b/plugin/semisync/semisync.h @@ -75,13 +75,23 @@ public: /* Constants in network packet header. */ static const unsigned char kPacketMagicNum; + /* this event should be semisync acked */ static const unsigned char kPacketFlagSync; + /* this event should be semisync acked including the current SQL position */ + static const unsigned char kPacketFlagSyncAndReport; + + /* user variable for enabling exec-pos reporting */ + static const char* const kRplSemiSyncSlaveReportExec; }; /* The layout of a semisync slave reply packet: 1 byte for the magic num 8 bytes for the binlog positon - n bytes for the binlog filename, terminated with a '\0' + n bytes for the binlog filename, NOT terminated with a '\0' + [ optionally ] + 1 byte == 0 + 8 bytes for the sql-thread position + n bytes for the sql-thread filename, terminated with a '\0' */ #define REPLY_MAGIC_NUM_LEN 1 #define REPLY_BINLOG_POS_LEN 8 diff --git a/plugin/semisync/semisync_master.cc b/plugin/semisync/semisync_master.cc index 7901853c3f8..54a3a908820 100644 --- a/plugin/semisync/semisync_master.cc +++ b/plugin/semisync/semisync_master.cc @@ -22,6 +22,9 @@ #define TIME_MILLION 1000000 #define TIME_BILLION 1000000000 +/* thd_key for per slave thread state */ +static MYSQL_THD_KEY_T thd_key; + /* This indicates whether semi-synchronous replication is enabled. */ char rpl_semi_sync_master_enabled; unsigned long rpl_semi_sync_master_wait_point = @@ -45,6 +48,18 @@ unsigned long long rpl_semi_sync_master_net_wait_time = 0; unsigned long long rpl_semi_sync_master_trx_wait_time = 0; char rpl_semi_sync_master_wait_no_slave = 1; +unsigned long rpl_semi_sync_master_max_unacked_event_count = 0; +unsigned long rpl_semi_sync_master_max_unacked_event_bytes = 4096; + +unsigned long rpl_semi_sync_master_slave_lag_clients = 0; +unsigned long long rpl_semi_sync_master_estimated_slave_lag = 0; +unsigned long rpl_semi_sync_master_slave_lag_heartbeat_frequency_us = 500000; +unsigned long rpl_semi_sync_master_max_slave_lag = 0; +unsigned long rpl_semi_sync_master_slave_lag_wait_sessions = 0; + +unsigned long rpl_semi_sync_master_avg_trx_slave_lag_wait_time = 0; +unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_num = 0; +unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_time = 0; static int getWaitTime(const struct timespec& start_ts); @@ -150,6 +165,15 @@ int ActiveTranx::insert_tranx_node(const char *log_file_name, ins_node->log_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */ ins_node->log_pos_ = log_file_pos; + { + /** + * set trans commit time + * this is called when writing into binlog, which is not + * exactly right, but close enough for our purposes + */ + ins_node->tranx_commit_time_us = my_hrtime().val; + } + if (!trx_front_) { /* The list is empty. */ @@ -193,12 +217,11 @@ int ActiveTranx::insert_tranx_node(const char *log_file_name, return function_exit(kWho, result); } -bool ActiveTranx::is_tranx_end_pos(const char *log_file_name, - my_off_t log_file_pos) +TranxNode* ActiveTranx::lookup_tranx_end_pos(const char *log_file_name, + my_off_t log_file_pos) { - const char *kWho = "ActiveTranx::is_tranx_end_pos"; + const char *kWho = "ActiveTranx::lookup_tranx_end_pos"; function_enter(kWho); - unsigned int hash_val = get_hash_value(log_file_name, log_file_pos); TranxNode *entry = trx_htb_[hash_val]; @@ -211,38 +234,24 @@ bool ActiveTranx::is_tranx_end_pos(const char *log_file_name, } if (trace_level_ & kTraceDetail) - sql_print_information("%s: probe (%s, %lu) in entry(%u)", kWho, - log_file_name, (unsigned long)log_file_pos, hash_val); + sql_print_information("%s: probe (%s, %lu)", kWho, + log_file_name, (unsigned long)log_file_pos); function_exit(kWho, (entry != NULL)); - return (entry != NULL); + return entry; } -int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name, - my_off_t log_file_pos) +int ActiveTranx::clear_active_tranx_nodes() { - const char *kWho = "ActiveTranx::::clear_active_tranx_nodes"; - TranxNode *new_front; + set_new_front(NULL); + return 0; +} +void ActiveTranx::set_new_front(TranxNode *new_front) +{ + const char *kWho = "ActiveTranx::set_new_front"; function_enter(kWho); - if (log_file_name != NULL) - { - new_front = trx_front_; - - while (new_front) - { - if (compare(new_front, log_file_name, log_file_pos) > 0) - break; - new_front = new_front->next_; - } - } - else - { - /* If log_file_name is NULL, clear everything. */ - new_front = NULL; - } - if (new_front == NULL) { /* No active transaction nodes after the call. */ @@ -257,7 +266,6 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name, trx_front_ = NULL; trx_rear_ = NULL; } - if (trace_level_ & kTraceDetail) sql_print_information("%s: cleared all nodes", kWho); } @@ -291,14 +299,40 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name, trx_front_ = new_front; allocator_.free_nodes_before(trx_front_); - if (trace_level_ & kTraceDetail) sql_print_information("%s: cleared %d nodes back until pos (%s, %lu)", kWho, n_frees, trx_front_->log_name_, (unsigned long)trx_front_->log_pos_); } + function_exit(kWho, 0); +} - return function_exit(kWho, 0); +bool ActiveTranx::prune_active_tranx_nodes( + LogPosPtr pos, + ulonglong *oldest_tranx_commit_time_us) +{ + TranxNode *old_front = trx_front_; + TranxNode *new_front; + + new_front = trx_front_; + while (new_front) + { + if (compare(new_front, pos.file_name, pos.file_pos) > 0) + break; + new_front = new_front->next_; + } + + set_new_front(new_front); + + if (oldest_tranx_commit_time_us) + { + if (trx_front_ == NULL) + *oldest_tranx_commit_time_us = 0; + else + *oldest_tranx_commit_time_us = trx_front_->tranx_commit_time_us; + } + + return ! (old_front == trx_front_); } @@ -334,7 +368,8 @@ ReplSemiSyncMaster::ReplSemiSyncMaster() wait_file_pos_(0), master_enabled_(false), wait_timeout_(0L), - state_(0) + state_(0), + oldest_unapplied_tranx_commit_time_us_(0) { strcpy(reply_file_name_, ""); strcpy(wait_file_name_, ""); @@ -362,11 +397,19 @@ int ReplSemiSyncMaster::initObject() mysql_cond_init(key_ss_cond_COND_binlog_send_, &COND_binlog_send_, NULL); + /* Mutex initialization can only be done after MY_INIT(). */ + mysql_mutex_init(key_ss_mutex_LOCK_slave_lag_, + &LOCK_slave_lag_, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_ss_cond_COND_slave_lag_, + &COND_slave_lag_, NULL); + if (rpl_semi_sync_master_enabled) result = enableMaster(); else result = disableMaster(); + thd_key_create(&thd_key); + return result; } @@ -437,6 +480,8 @@ void ReplSemiSyncMaster::cleanup() { mysql_mutex_destroy(&LOCK_binlog_); mysql_cond_destroy(&COND_binlog_send_); + mysql_mutex_destroy(&LOCK_slave_lag_); + mysql_cond_destroy(&COND_slave_lag_); init_done_= 0; } @@ -473,7 +518,34 @@ void ReplSemiSyncMaster::add_slave() { lock(); rpl_semi_sync_master_clients++; + if (has_semi_sync_slave_lag()) + rpl_semi_sync_master_slave_lag_clients++; unlock(); + + if (has_semi_sync_slave_lag()) + { + int null_val = 0; + longlong new_val = + rpl_semi_sync_master_slave_lag_heartbeat_frequency_us * 1000; + longlong old_val = new_val + 1; + + get_user_var_int("master_heartbeat_period", &old_val, &null_val); + if (old_val > new_val || null_val) + { + /* if there no old value or it's bigger than what we want */ + int res = set_user_var_int("master_heartbeat_period",new_val, &old_val); + if (res == -1) + { + sql_print_error( + "Repl_semi_sync::failed to set master_heartbeat_period"); + } + } + } + + /** + * create per slave-state and store it in thread-local-storage */ + ReplSemiSyncMasterPerSlaveState *state = new ReplSemiSyncMasterPerSlaveState; + thd_setspecific(current_thd, thd_key, state); } void ReplSemiSyncMaster::remove_slave() @@ -492,7 +564,31 @@ void ReplSemiSyncMaster::remove_slave() rpl_semi_sync_master_clients == 0) switch_off(); } + + bool no_slave_lag_clients = false; + if (has_semi_sync_slave_lag()) + { + if (--rpl_semi_sync_master_slave_lag_clients == 0) + { + no_slave_lag_clients = true; + } + } + unlock(); + + ReplSemiSyncMasterPerSlaveState *state = + (ReplSemiSyncMasterPerSlaveState*)thd_getspecific(current_thd, thd_key); + thd_setspecific(current_thd, thd_key, NULL); + + if (state != NULL) + { + delete state; + } + + if (no_slave_lag_clients) + { + wake_slave_lag_waiters(0); + } } bool ReplSemiSyncMaster::is_semi_sync_slave() @@ -503,14 +599,115 @@ bool ReplSemiSyncMaster::is_semi_sync_slave() return val; } +bool ReplSemiSyncMaster::has_semi_sync_slave_lag() +{ + int null_value; + long long val= 0; + get_user_var_int(kRplSemiSyncSlaveReportExec, &val, &null_value); + return val; +} + +int ReplSemiSyncMaster::checkSyncReq(const LogPosPtr *log_pos) +{ + if (log_pos == NULL) + { + /* heartbeat events does not have logpos (since they are not actually + * stored in the binlog). + */ + if (!has_semi_sync_slave_lag()) + { + /* don't semi-sync them if we haven't enabled slave-lag handling */ + return 0; + } + else + { + /* else ask for both IO and exec position */ + return 2; + } + } + + /** + * check if this log-pos is a candidate for semi-syncing event + */ + TranxNode *entry = active_tranxs_->lookup_tranx_end_pos(log_pos->file_name, + log_pos->file_pos); + + if (entry == NULL) + return 0; + + ReplSemiSyncMasterPerSlaveState *state = + (ReplSemiSyncMasterPerSlaveState*)thd_getspecific(current_thd, + thd_key); + do + { + state->unacked_event_count_++; + + if (active_tranxs_->is_rear(entry)) + { + /* always ask for ack on last event in tranx list */ + break; + } + + if (state->unacked_event_count_ >= + rpl_semi_sync_master_max_unacked_event_count) + { + /* enough events passed that it's time for another ack */ + break; + } + + if (!state->sync_req_pos_.IsInited()) + { + /* first event => time for ack */ + break; + } + + if (strcmp(log_pos->file_name, state->sync_req_pos_.file_name) != 0) + { + /* new file => time for ack */ + break; + } + + if (log_pos->file_pos >= (state->sync_req_pos_.file_pos + + rpl_semi_sync_master_max_unacked_event_bytes)) + { + /* enough bytes => time for ack */ + break; + } + + /* we skip asking for semi-sync ack on this event */ + return 0; + + } while (0); + + /* keep track on when we last asked for semi-sync-ack */ + state->unacked_event_count_ = 0; + state->sync_req_pos_.Assign(log_pos); + + /** + * check if this slave can report back exec position + */ + if (!has_semi_sync_slave_lag()) + { + /* slave can't report back SQL position */ + return 1; + } + + /* ask for both IO and SQL position */ + return 2; +} + int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id, const char *log_file_name, - my_off_t log_file_pos) + my_off_t log_file_pos, + const LogPos *exec_pos) { const char *kWho = "ReplSemiSyncMaster::reportReplyBinlog"; int cmp; bool can_release_threads = false; bool need_copy_send_pos = true; + bool pruned_trx_list = false; + ulonglong oldest_tranx_commit_time_us = 0; + if (!(getMasterEnabled())) return 0; @@ -559,15 +756,29 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id, reply_file_pos_ = log_file_pos; reply_file_name_inited_ = true; - /* Remove all active transaction nodes before this point. */ - assert(active_tranxs_ != NULL); - active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos); - if (trace_level_ & kTraceDetail) sql_print_information("%s: Got reply at (%s, %lu)", kWho, log_file_name, (unsigned long)log_file_pos); } + assert(active_tranxs_ != NULL); + if (exec_pos != NULL) + { + /* prune using exec_pos */ + LogPosPtr ptr(*exec_pos); + pruned_trx_list = active_tranxs_->prune_active_tranx_nodes( + ptr, &oldest_tranx_commit_time_us); + } + else if (rpl_semi_sync_master_slave_lag_clients == 0 && need_copy_send_pos) + { + /** + * if we don't have any slaves that can do exec_pos reporting, + * prune by IO position as "plain old semi sync" + */ + LogPosPtr ptr(log_file_name, log_file_pos); + active_tranxs_->prune_active_tranx_nodes(ptr, NULL); + } + if (rpl_semi_sync_master_wait_sessions > 0) { /* Let us check if some of the waiting threads doing a trx @@ -596,6 +807,15 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id, cond_broadcast(); } + if (pruned_trx_list) + { + /** + * if we did prune trx list, it might be that we should wake up + * threads waiting for slave-lag to decrease + */ + wake_slave_lag_waiters(oldest_tranx_commit_time_us); + } + return function_exit(kWho, 0); } @@ -743,16 +963,6 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, } } - /* - At this point, the binlog file and position of this transaction - must have been removed from ActiveTranx. - active_tranxs_ may be NULL if someone disabled semi sync during - cond_timewait() - */ - assert(thd_killed(current_thd) || !active_tranxs_ || - !active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name, - trx_wait_binlog_pos)); - l_end: /* Update the status counter. */ if (is_on()) @@ -796,7 +1006,7 @@ int ReplSemiSyncMaster::switch_off() /* Clear the active transaction list. */ assert(active_tranxs_ != NULL); - result = active_tranxs_->clear_active_tranx_nodes(NULL, 0); + result = active_tranxs_->clear_active_tranx_nodes(); rpl_semi_sync_master_off_times++; wait_file_name_inited_ = false; @@ -886,7 +1096,7 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet, { const char *kWho = "ReplSemiSyncMaster::updateSyncHeader"; int cmp = 0; - bool sync = false; + int sync = 0; /* If the semi-sync master is not enabled, or the slave is not a semi-sync * target, do not request replies from the slave. @@ -907,6 +1117,13 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet, /* semi-sync is ON */ /* sync= false; No sync unless a transaction is involved. */ + if (log_file_name == NULL) + { + /* this is heartbeat, request io_pos and exec_pos */ + sync = checkSyncReq(0); + goto l_end; + } + if (reply_file_name_inited_) { cmp = ActiveTranx::compare(log_file_name, log_file_pos, @@ -935,12 +1152,12 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet, */ if (cmp >= 0) { - /* + /* * We only wait if the event is a transaction's ending event. */ assert(active_tranxs_ != NULL); - sync = active_tranxs_->is_tranx_end_pos(log_file_name, - log_file_pos); + LogPosPtr pos(log_file_name, log_file_pos); + sync = checkSyncReq(&pos); } } else @@ -953,7 +1170,7 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet, } else { - sync = true; + sync = 1; } } @@ -968,10 +1185,14 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet, /* We do not need to clear sync flag because we set it to 0 when we * reserve the packet header. */ - if (sync) + if (sync == 1) { (packet)[2] = kPacketFlagSync; } + else if (sync == 2) + { + (packet)[2] = kPacketFlagSyncAndReport; + } return function_exit(kWho, 0); } @@ -1020,7 +1241,8 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name, if (is_on()) { assert(active_tranxs_ != NULL); - if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos)) + bool empty = active_tranxs_->is_empty(); + if (active_tranxs_->insert_tranx_node(log_file_name, log_file_pos)) { /* if insert tranx_node failed, print a warning message @@ -1030,6 +1252,14 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name, log_file_name, (ulong)log_file_pos); switch_off(); } + else if (empty && rpl_semi_sync_master_slave_lag_clients > 0) + { + /* if the list of transactions was empty, + * we need to init the oldest_tranx_commit_time_us + */ + oldest_unapplied_tranx_commit_time_us_ = + active_tranxs_->get_oldest_tranx_commit_time_us(); + } } l_end: @@ -1039,10 +1269,10 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name, } int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id, - const char *event_buf) + const char *event_buf_) { const char *kWho = "ReplSemiSyncMaster::readSlaveReply"; - const unsigned char *packet; + const unsigned char *packet, *packet_start; char log_file_name[FN_REFLEN]; my_off_t log_file_pos; ulong log_file_len = 0; @@ -1050,12 +1280,15 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id, int result = -1; struct timespec start_ts; ulong trc_level = trace_level_; + const unsigned char *event_buf = (const unsigned char*)event_buf_; + bool exec_pos_present = false; // is SQL exec pos present in reply + LogPos exec_pos; // position of SQL thread LINT_INIT_STRUCT(start_ts); function_enter(kWho); - assert((unsigned char)event_buf[1] == kPacketMagicNum); - if ((unsigned char)event_buf[2] != kPacketFlagSync) + assert(event_buf[1] == kPacketMagicNum); + if ((event_buf[2] & (kPacketFlagSync | kPacketFlagSyncAndReport)) == 0) { /* current event does not require reply */ result = 0; @@ -1113,28 +1346,60 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id, goto l_end; } - packet = net->read_pos; + packet_start = packet = net->read_pos; if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum) { sql_print_error("Read semi-sync reply magic number error"); goto l_end; } + /* we determine if this semisync ack contains a sql-thread exec-pos + * by checking if last byte == 0, since the packet then contains + * \0-terminated filenames */ + exec_pos_present = packet[packet_len - 1] == 0; + log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET); - log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET; + if (exec_pos_present == false) + { + log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET; + } + else + { + log_file_len = strnlen((char*)packet + REPLY_BINLOG_NAME_OFFSET, + MY_MIN((ulong)FN_REFLEN, + packet_len - REPLY_BINLOG_NAME_OFFSET)); + } if (log_file_len >= FN_REFLEN) { sql_print_error("Read semi-sync reply binlog file length too large"); goto l_end; } - strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len); + packet+= REPLY_BINLOG_NAME_OFFSET; + + strncpy(log_file_name, (const char*)packet, log_file_len); log_file_name[log_file_len] = 0; + if (exec_pos_present) + { + packet += log_file_len + 1; + if (packet + 8 + 1 >= (packet_start + packet_len)) + { + sql_print_error("Read semi-sync reply binlog. " + "Packet to short to contain exec-position!"); + goto l_end; + } + exec_pos.file_pos = uint8korr(packet); + packet += 8; + strncpy(exec_pos.file_name, (char*)packet, + (packet_start + packet_len) - packet); + } + if (trc_level & kTraceDetail) sql_print_information("%s: Got reply (%s, %lu)", kWho, log_file_name, (ulong)log_file_pos); - result = reportReplyBinlog(server_id, log_file_name, log_file_pos); + result = reportReplyBinlog(server_id, log_file_name, log_file_pos, + exec_pos_present ? &exec_pos : NULL); l_end: return function_exit(kWho, result); @@ -1156,6 +1421,16 @@ int ReplSemiSyncMaster::resetMaster() wait_file_name_inited_ = false; reply_file_name_inited_ = false; commit_file_name_inited_ = false; + if (active_tranxs_ != NULL) + { + /** + * make sure to empty transaction hash/list + * with slave-lag reporting this container does + * not have to be empty even if no transaction is + * currently running + */ + active_tranxs_->clear_active_tranx_nodes(); + } rpl_semi_sync_master_yes_transactions = 0; rpl_semi_sync_master_no_transactions = 0; @@ -1170,6 +1445,13 @@ int ReplSemiSyncMaster::resetMaster() unlock(); + mysql_mutex_lock(&LOCK_slave_lag_); + rpl_semi_sync_master_slave_lag_wait_sessions = 0; + oldest_unapplied_tranx_commit_time_us_ = 0; + rpl_semi_sync_master_trx_slave_lag_wait_num = 0; + rpl_semi_sync_master_trx_slave_lag_wait_time = 0; + mysql_mutex_unlock(&LOCK_slave_lag_); + return function_exit(kWho, result); } @@ -1188,6 +1470,29 @@ void ReplSemiSyncMaster::setExportStats() ((double)rpl_semi_sync_master_net_wait_num)) : 0); unlock(); + + if (oldest_unapplied_tranx_commit_time_us_ != 0) + { + rpl_semi_sync_master_estimated_slave_lag = my_hrtime().val - + oldest_unapplied_tranx_commit_time_us_; + } + else + { + rpl_semi_sync_master_estimated_slave_lag = 0; + } + + mysql_mutex_lock(&LOCK_slave_lag_); + if (rpl_semi_sync_master_trx_slave_lag_wait_num) + { + rpl_semi_sync_master_avg_trx_slave_lag_wait_time = + (unsigned long)((double)rpl_semi_sync_master_trx_slave_lag_wait_time / + (double)rpl_semi_sync_master_trx_slave_lag_wait_num); + } + else + { + rpl_semi_sync_master_avg_trx_slave_lag_wait_time = 0; + } + mysql_mutex_unlock(&LOCK_slave_lag_); } /* Get the waiting time given the wait's staring time. @@ -1215,3 +1520,117 @@ static int getWaitTime(const struct timespec& start_ts) return (int)(end_usecs - start_usecs); } + +void ReplSemiSyncMaster::wake_slave_lag_waiters( + ulonglong oldest_unapplied_tranx_commit_time_us) +{ + mysql_mutex_lock(&LOCK_slave_lag_); + oldest_unapplied_tranx_commit_time_us_ = + oldest_unapplied_tranx_commit_time_us; + + if (rpl_semi_sync_master_slave_lag_wait_sessions > 0) + { + mysql_cond_broadcast(&COND_slave_lag_); + } + mysql_mutex_unlock(&LOCK_slave_lag_); +} + +int ReplSemiSyncMaster::wait_slave_lag(ulong timeout_sec) +{ + int error = 0; + PSI_stage_info old_stage; + + /* slave lag waiting not enabled, return directly */ + if (rpl_semi_sync_master_max_slave_lag == 0) + return 0; + + /* there is no slave that can report slave lag, return directly */ + if (rpl_semi_sync_master_slave_lag_clients == 0) + return 0; + + /* compute start_time and end_time */ + struct timespec end_time; + set_timespec(end_time, 0); + ulonglong start_time_us = timespec_to_usec(&end_time); + end_time.tv_sec += timeout_sec; + + mysql_mutex_lock(&LOCK_slave_lag_); + + if (oldest_unapplied_tranx_commit_time_us_ == 0) + { + /* no slave lag, atleast one slave is up to date */ + mysql_mutex_unlock(&LOCK_slave_lag_); + return 0; + } + + if (rpl_semi_sync_master_max_slave_lag == 0) + { + /* slave lag waiting not enabled */ + mysql_mutex_unlock(&LOCK_slave_lag_); + return 0; + } + + /* This must be called after acquired the lock */ + THD_ENTER_COND(NULL, &COND_slave_lag_, &LOCK_slave_lag_, + &stage_waiting_for_semi_sync_slave_lag, + &old_stage); + + bool waited = false; + ulonglong lag = 0; + ulonglong max_lag = 0; + while (oldest_unapplied_tranx_commit_time_us_ != 0) + { + /* check kill_level after THD_ENTER_COND but *before* cond_wait + * to avoid missing kills */ + if (! (getMasterEnabled() && is_on() && + thd_kill_level(current_thd) == THD_IS_NOT_KILLED)) + break; + + lag = start_time_us - oldest_unapplied_tranx_commit_time_us_; + max_lag = 1000000 * rpl_semi_sync_master_max_slave_lag; + if (lag <= max_lag) + break; + + waited = true; + rpl_semi_sync_master_slave_lag_wait_sessions++; + int wait_result = mysql_cond_timedwait(&COND_slave_lag_, &LOCK_slave_lag_, + &end_time); + rpl_semi_sync_master_slave_lag_wait_sessions--; + + bool thd_was_killed = thd_kill_level(current_thd) != THD_IS_NOT_KILLED; + if (wait_result != 0 || thd_was_killed) + { + break; + } + } + + if (thd_kill_level(current_thd) != THD_IS_NOT_KILLED) + { + /* Return error to client. */ + error = 1; + my_printf_error(ER_ERROR_DURING_COMMIT, + "Killed while waiting for replication semi-sync slave-lag.", + MYF(0)); + } + else if (lag > max_lag) + { + /* Return error to client. */ + error = 1; + my_printf_error(ER_ERROR_DURING_COMMIT, + "Slave-lag timeout", + MYF(0)); + } + + if (waited) + { + rpl_semi_sync_master_trx_slave_lag_wait_num++; + rpl_semi_sync_master_trx_slave_lag_wait_time += + (my_hrtime().val - start_time_us); + } + + /* The lock held will be released by thd_exit_cond, so no need to + call unlock() here */ + THD_EXIT_COND(NULL, & old_stage); + + return error; +} diff --git a/plugin/semisync/semisync_master.h b/plugin/semisync/semisync_master.h index c2862476ec8..0167e26bcc8 100644 --- a/plugin/semisync/semisync_master.h +++ b/plugin/semisync/semisync_master.h @@ -24,17 +24,101 @@ #ifdef HAVE_PSI_INTERFACE extern PSI_mutex_key key_ss_mutex_LOCK_binlog_; extern PSI_cond_key key_ss_cond_COND_binlog_send_; + +extern PSI_mutex_key key_ss_mutex_LOCK_slave_lag_; +extern PSI_cond_key key_ss_cond_COND_slave_lag_; #endif extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave; +extern PSI_stage_info stage_waiting_for_semi_sync_slave_lag; struct TranxNode { char log_name_[FN_REFLEN]; - my_off_t log_pos_; + my_off_t log_pos_; + ulonglong tranx_commit_time_us; struct TranxNode *next_; /* the next node in the sorted list */ struct TranxNode *hash_next_; /* the next node during hash collision */ }; +struct LogPos; + +/* This represent a log position */ +struct LogPosPtr { + LogPosPtr() { Uninit();} + LogPosPtr(const char *name, my_off_t pos) : file_name(name), file_pos(pos){} + explicit LogPosPtr(const LogPos& pos) { Assign(&pos); } + + const char *file_name; + my_off_t file_pos; + + LogPosPtr& Assign(const LogPosPtr *src) { + file_name = src->file_name; + file_pos = src->file_pos; + return *this; + } + + LogPosPtr& Assign(const LogPos *src); + + void Uninit() { file_name = NULL;} + bool IsInited() const { return file_name != NULL; } +}; + +struct LogPos { + char file_name[FN_REFLEN]; + my_off_t file_pos; + + LogPos() { Uninit(); } + + LogPosPtr ToLogPosPtr() const { + if (IsInited()){ + LogPosPtr p(file_name, file_pos); + return p; + } else { + LogPosPtr p; + return p; + } + } + + LogPos& Assign(const LogPosPtr *src) { + if (src->IsInited()) { + strcpy(file_name, src->file_name); + file_pos = src->file_pos; + } else { + Uninit(); + } + return *this; + } + + LogPos& Assign(const LogPos* src) { + LogPosPtr p = src->ToLogPosPtr(); + Assign(&p); + return *this; + } + + void Uninit() { file_name[0] = 0; } + bool IsInited() const { return file_name[0] != 0; } +}; + +inline LogPosPtr& LogPosPtr::Assign(const LogPos* src) { + LogPosPtr p = src->ToLogPosPtr(); + Assign(&p); + return *this; +} + +inline int CompareLogPos(const LogPosPtr *pos1, const LogPosPtr *pos2) { + int cmp = strcmp(pos1->file_name, pos2->file_name); + + if (cmp != 0) + return cmp; + + if (pos1->file_pos > pos2->file_pos) + return 1; + else if (pos1->file_pos < pos2->file_pos) + return -1; + else + return 0; +} + /** @class TranxNodeAllocator @@ -329,10 +413,14 @@ private: node2->log_name_, node2->log_pos_); } + void set_new_front(TranxNode* new_front); + public: ActiveTranx(mysql_mutex_t *lock, unsigned long trace_level); ~ActiveTranx(); + bool is_empty() const { return trx_front_ == NULL; } + /* Insert an active transaction node with the specified position. * * Return: @@ -340,21 +428,42 @@ public: */ int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos); - /* Clear the active transaction nodes until(inclusive) the specified - * position. - * If log_file_name is NULL, everything will be cleared: the sorted + /* Clear the active transaction + * Everything will be cleared: the sorted * list and the hash table will be reset to empty. - * + * * Return: - * 0: success; non-zero: error + * 0 success; non-zero: error + */ + int clear_active_tranx_nodes(); + + /* Prune the active transaction nodes until the specified + * position (inclusive). + * + * Return: + * true if any transaction was removed + * false if list was left unchanged */ - int clear_active_tranx_nodes(const char *log_file_name, - my_off_t log_file_pos); + bool prune_active_tranx_nodes(LogPosPtr logpos, + ulonglong *oldest_tranx_commit_time_us); - /* Given a position, check to see whether the position is an active - * transaction's ending position by probing the hash table. + /* Lookup a transaction's ending position by probing the hash table. + * + * return entry if found or NULL otherwise */ - bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos); + TranxNode* lookup_tranx_end_pos(const char *log_file_name, + my_off_t log_file_pos); + + + /* Check if an entry is rear (i.e last) */ + bool is_rear(TranxNode* entry) const { return entry == trx_rear_; } + + /** + * return timestamp of oldest transaction in list + */ + ulonglong get_oldest_tranx_commit_time_us() const { + return trx_front_->tranx_commit_time_us; + } /* Given two binlog positions, compare which one is bigger based on * (file_name, file_position). @@ -365,6 +474,24 @@ public: }; /** + * State that semisync master keeps per slave + */ +struct ReplSemiSyncMasterPerSlaveState +{ + ReplSemiSyncMasterPerSlaveState() : unacked_event_count_(0) {} + + /** + * No of events that has not been semi-sync acked + */ + unsigned unacked_event_count_; + + /** + * Position of last event that was semisync'ed + */ + LogPos sync_req_pos_; +}; + +/** The extension class for the master of semi-synchronous replication */ class ReplSemiSyncMaster @@ -432,6 +559,16 @@ class ReplSemiSyncMaster bool state_; /* whether semi-sync is switched */ + /* This cond variable is signaled when slave lag has decreased */ + mysql_cond_t COND_slave_lag_; + + /* Mutex that protects oldest_unapplied_tranx_commit_time_us */ + mysql_mutex_t LOCK_slave_lag_; + + /* this is commit time of oldest transaction that has not been applied + * on any slave */ + ulonglong oldest_unapplied_tranx_commit_time_us_; + void lock(); void unlock(); void cond_broadcast(); @@ -493,6 +630,9 @@ class ReplSemiSyncMaster /* Is the slave servered by the thread requested semi-sync */ bool is_semi_sync_slave(); + /* Does this slave have slave lag reporting capabilities */ + bool has_semi_sync_slave_lag(); + /* In semi-sync replication, reports up to which binlog position we have * received replies from the slave indicating that it already get the events. * @@ -501,13 +641,15 @@ class ReplSemiSyncMaster * log_file_name - (IN) binlog file name * end_offset - (IN) the offset in the binlog file up to which we have * the replies from the slave + * exec_position - (IN) position of SQL thread or NULL if not present * * Return: * 0: success; non-zero: error */ int reportReplyBinlog(uint32 server_id, const char* log_file_name, - my_off_t end_offset); + my_off_t end_offset, + const LogPos *exec_position); /* Commit a transaction in the final step. This function is called from * InnoDB before returning from the low commit. If semi-sync is switch on, @@ -540,6 +682,16 @@ class ReplSemiSyncMaster */ int reserveSyncHeader(unsigned char *header, unsigned long size); + /* + * check if an event should be semi synced and optionally + * if it should report back position of SQL thread on slave + * + * return 0 - no semi sync + * 1 - semi sync + * 2 - semi sync and report exec position + */ + int checkSyncReq(const LogPosPtr *log_pos); + /* Update the sync bit in the packet header to indicate to the slave whether * the master will wait for the reply of the event. If semi-sync is switched * off and we detect that the slave is catching up, we switch semi-sync on. @@ -592,6 +744,21 @@ class ReplSemiSyncMaster * go off for that. */ int resetMaster(); + + /** + * wake potential slave-lag waiters + * called by binlog dump-thread(s) + */ + void wake_slave_lag_waiters(ulonglong oldest_unapplied_tranx_commit_time_us); + + /** + * wait for slave lag to get below threshold + * called by user-thread(s) + * + * return 0 - success + * 1 - timeout + */ + int wait_slave_lag(ulong max_wait_time_sec); }; enum rpl_semi_sync_master_wait_point_t { @@ -621,6 +788,17 @@ extern unsigned long long rpl_semi_sync_master_trx_wait_num; extern unsigned long long rpl_semi_sync_master_net_wait_time; extern unsigned long long rpl_semi_sync_master_trx_wait_time; +extern unsigned long rpl_semi_sync_master_max_unacked_event_count; +extern unsigned long rpl_semi_sync_master_max_unacked_event_bytes; +extern unsigned long rpl_semi_sync_master_max_slave_lag; +extern unsigned long rpl_semi_sync_master_slave_lag_heartbeat_frequency_us; +extern unsigned long rpl_semi_sync_master_slave_lag_wait_sessions; +extern unsigned long long rpl_semi_sync_master_estimated_slave_lag; + +extern unsigned long rpl_semi_sync_master_avg_trx_slave_lag_wait_time; +extern unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_num; +extern unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_time; + /* This indicates whether we should keep waiting if no semi-sync slave is available. diff --git a/plugin/semisync/semisync_master_plugin.cc b/plugin/semisync/semisync_master_plugin.cc index 309910312c4..58bddf019bd 100644 --- a/plugin/semisync/semisync_master_plugin.cc +++ b/plugin/semisync/semisync_master_plugin.cc @@ -21,6 +21,11 @@ static ReplSemiSyncMaster repl_semisync; +// forward declaration +static inline ulong get_slave_lag_wait_timeout(THD* thd); + +static char rpl_semi_sync_master_group_commit = 0; + C_MODE_START int repl_semi_report_binlog_update(Binlog_storage_param *param, @@ -31,6 +36,13 @@ int repl_semi_report_binlog_update(Binlog_storage_param *param, if (repl_semisync.getMasterEnabled()) { + if (rpl_semi_sync_master_group_commit && + ((flags & BINLOG_GROUP_COMMIT_TRAILER) == 0)) + { + /** there are transactions more coming... */ + return 0; + } + /* Let us store the binlog file name and the position, so that we know how long to wait for the binlog to the replicated to @@ -43,8 +55,11 @@ int repl_semi_report_binlog_update(Binlog_storage_param *param, return error; } -int repl_semi_request_commit(Trans_param *param) +int repl_semi_before_commit(Trans_param *param, int *error) { + *error = repl_semisync.wait_slave_lag( + get_slave_lag_wait_timeout(current_thd)); + return 0; } @@ -53,6 +68,14 @@ int repl_semi_report_binlog_sync(Binlog_storage_param *param, my_off_t log_pos, uint32 flags) { int error= 0; + + if (rpl_semi_sync_master_group_commit && + ((flags & BINLOG_GROUP_COMMIT_TRAILER) == 0)) + { + /** there are transactions more coming... */ + return 0; + } + if (rpl_semi_sync_master_wait_point == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC) { @@ -100,7 +123,7 @@ int repl_semi_binlog_dump_start(Binlog_transmit_param *param, Let's assume this semi-sync slave has already received all binlog events before the filename and position it requests. */ - repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos); + repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos, NULL); } sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)", semi_sync_slave ? "semi-sync" : "asynchronous", @@ -242,15 +265,72 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level, &fix_rpl_semi_sync_master_trace_level, // update 32, 0, ~0UL, 1); +static MYSQL_SYSVAR_ULONG(max_unacked_event_count, + rpl_semi_sync_master_max_unacked_event_count, + PLUGIN_VAR_OPCMDARG, + "Maximum unacked replication events", + NULL, // check + NULL, // update + rpl_semi_sync_master_max_unacked_event_count, 0, ~0UL, 1); + +static MYSQL_SYSVAR_ULONG(max_unacked_event_bytes, + rpl_semi_sync_master_max_unacked_event_bytes, + PLUGIN_VAR_OPCMDARG, + "Maximum unacked replication bytes", + NULL, // check + NULL, // update + rpl_semi_sync_master_max_unacked_event_bytes, 0, ~0UL, 1); + +static MYSQL_SYSVAR_ULONG(max_slave_lag, rpl_semi_sync_master_max_slave_lag, + PLUGIN_VAR_OPCMDARG, + "Maximum allowed lag of fastest semi-sync slave (in seconds), " + "checked before commit.", + NULL, // check + NULL, // update + rpl_semi_sync_master_max_slave_lag, 0, ~0UL, 1); + +static MYSQL_THDVAR_ULONG(slave_lag_wait_timeout, + PLUGIN_VAR_RQCMDARG, + "Timeout in seconds a rw-transaction may wait for max slave lag before " + "being rolled back.", + NULL, NULL, 50, 1, 1024 * 1024 * 1024, 0); + +static MYSQL_SYSVAR_ULONG( + slave_lag_heartbeat_frequency_us, + rpl_semi_sync_master_slave_lag_heartbeat_frequency_us, + PLUGIN_VAR_RQCMDARG, + "Heartbeat frequency when slave-lag is enabled (in microseconds).", + NULL, // check + NULL, // update + 500000, /* 500 ms */ + 1, ~0UL, 1); + +static MYSQL_SYSVAR_BOOL(group_commit, rpl_semi_sync_master_group_commit, + PLUGIN_VAR_OPCMDARG, + "Group commit for semi sync", + NULL, // check + NULL, + 0); + static SYS_VAR* semi_sync_master_system_vars[]= { MYSQL_SYSVAR(enabled), MYSQL_SYSVAR(wait_point), MYSQL_SYSVAR(timeout), MYSQL_SYSVAR(wait_no_slave), MYSQL_SYSVAR(trace_level), + MYSQL_SYSVAR(max_unacked_event_count), + MYSQL_SYSVAR(max_unacked_event_bytes), + MYSQL_SYSVAR(max_slave_lag), + MYSQL_SYSVAR(slave_lag_wait_timeout), + MYSQL_SYSVAR(slave_lag_heartbeat_frequency_us), + MYSQL_SYSVAR(group_commit), NULL, }; +static inline ulong get_slave_lag_wait_timeout(THD* thd) +{ + return THDVAR(thd, slave_lag_wait_timeout); +} static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd, SYS_VAR *var, @@ -297,6 +377,7 @@ Trans_observer trans_observer = { repl_semi_report_commit, // after_commit repl_semi_report_rollback, // after_rollback + repl_semi_before_commit, // before commit }; Binlog_storage_observer storage_observer = { @@ -339,7 +420,11 @@ DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG) DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG) DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG) DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG) - +DEF_SHOW_FUNC(slave_lag_wait_sessions, SHOW_LONG) +DEF_SHOW_FUNC(estimated_slave_lag, SHOW_LONGLONG) +DEF_SHOW_FUNC(trx_slave_lag_wait_time, SHOW_LONGLONG) +DEF_SHOW_FUNC(trx_slave_lag_wait_num, SHOW_LONGLONG) +DEF_SHOW_FUNC(avg_trx_slave_lag_wait_time, SHOW_LONG) /* plugin status variables */ static SHOW_VAR semi_sync_master_status_vars[]= { @@ -385,32 +470,55 @@ static SHOW_VAR semi_sync_master_status_vars[]= { {"Rpl_semi_sync_master_net_avg_wait_time", (char*) &SHOW_FNAME(avg_net_wait_time), SHOW_SIMPLE_FUNC}, + {"Rpl_semi_sync_master_slave_lag_wait_sessions", + (char*) &SHOW_FNAME(slave_lag_wait_sessions), + SHOW_SIMPLE_FUNC}, + {"Rpl_semi_sync_master_estimated_slave_lag", + (char*) &SHOW_FNAME(estimated_slave_lag), + SHOW_SIMPLE_FUNC}, + {"Rpl_semi_sync_master_tx_slave_lag_wait_time", + (char*) &SHOW_FNAME(trx_slave_lag_wait_time), + SHOW_SIMPLE_FUNC}, + {"Rpl_semi_sync_master_tx_slave_lag_waits", + (char*) &SHOW_FNAME(trx_slave_lag_wait_num), + SHOW_SIMPLE_FUNC}, + {"Rpl_semi_sync_master_tx_avg_slave_lag_wait_time", + (char*) &SHOW_FNAME(avg_trx_slave_lag_wait_time), + SHOW_SIMPLE_FUNC}, {NULL, NULL, SHOW_LONG}, }; #ifdef HAVE_PSI_INTERFACE PSI_mutex_key key_ss_mutex_LOCK_binlog_; +PSI_mutex_key key_ss_mutex_LOCK_slave_lag_; static PSI_mutex_info all_semisync_mutexes[]= { - { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0} + { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0 }, + { &key_ss_mutex_LOCK_slave_lag_, "LOCK_slave_lag_", 0 } }; PSI_cond_key key_ss_cond_COND_binlog_send_; +PSI_cond_key key_ss_cond_COND_slave_lag_; static PSI_cond_info all_semisync_conds[]= { - { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0} + { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0 }, + { &key_ss_cond_COND_slave_lag_, "COND_slave_lag_", 0 } }; #endif /* HAVE_PSI_INTERFACE */ PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave= { 0, "Waiting for semi-sync ACK from slave", 0}; +PSI_stage_info stage_waiting_for_semi_sync_slave_lag= +{ 0, "Waiting for semi-sync slave lag", 0}; + #ifdef HAVE_PSI_INTERFACE PSI_stage_info *all_semisync_stages[]= { - & stage_waiting_for_semi_sync_ack_from_slave + & stage_waiting_for_semi_sync_ack_from_slave, + & stage_waiting_for_semi_sync_slave_lag }; static void init_semisync_psi_keys(void) @@ -492,4 +600,3 @@ maria_declare_plugin(semisync_master) MariaDB_PLUGIN_MATURITY_STABLE } maria_declare_plugin_end; - diff --git a/plugin/semisync/semisync_slave.cc b/plugin/semisync/semisync_slave.cc index 5f98472d5d7..839e0cce29d 100644 --- a/plugin/semisync/semisync_slave.cc +++ b/plugin/semisync/semisync_slave.cc @@ -20,6 +20,7 @@ char rpl_semi_sync_slave_enabled; char rpl_semi_sync_slave_status= 0; unsigned long rpl_semi_sync_slave_trace_level; +char rpl_semi_sync_slave_lag_enabled= 0; int ReplSemiSyncSlave::initObject() { @@ -42,7 +43,7 @@ int ReplSemiSyncSlave::initObject() int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header, unsigned long total_len, - bool *need_reply, + unsigned char *need_reply, const char **payload, unsigned long *payload_len) { @@ -52,7 +53,7 @@ int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header, if ((unsigned char)(header[0]) == kPacketMagicNum) { - *need_reply = (header[1] & kPacketFlagSync); + *need_reply = (header[1] & (kPacketFlagSync | kPacketFlagSyncAndReport)); *payload_len = total_len - 2; *payload = header + 2; @@ -95,16 +96,20 @@ int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param) return 0; } -int ReplSemiSyncSlave::slaveReply(MYSQL *mysql, - const char *binlog_filename, - my_off_t binlog_filepos) +int ReplSemiSyncSlave::slaveReply(unsigned char header_byte, + MYSQL *mysql, + const char *binlog_filename, + my_off_t binlog_filepos, + Master_info * mi) { const char *kWho = "ReplSemiSyncSlave::slaveReply"; NET *net= &mysql->net; - uchar reply_buffer[REPLY_MAGIC_NUM_LEN - + REPLY_BINLOG_POS_LEN - + REPLY_BINLOG_NAME_LEN]; + uchar reply_buffer[REPLY_MAGIC_NUM_LEN + + 2 * ( REPLY_BINLOG_POS_LEN + + REPLY_BINLOG_NAME_LEN + + /* '\0' */ 1) ]; int reply_res, name_len = strlen(binlog_filename); + int msg_len = name_len + REPLY_BINLOG_NAME_OFFSET; function_enter(kWho); @@ -119,10 +124,29 @@ int ReplSemiSyncSlave::slaveReply(MYSQL *mysql, sql_print_information("%s: reply (%s, %lu)", kWho, binlog_filename, (ulong)binlog_filepos); + if (header_byte & kPacketFlagSyncAndReport) + { + /** + * master requests that we also report back SQL-thread position + */ + + // where to store sql filename/position + char *bufptr = (char*)reply_buffer + msg_len; + bufptr[0] = 0; // '\0' terminate previous filename + bufptr++; + + my_off_t sql_file_pos; + // get file/position and store the filename directly info bufptr+8 + size_t name_len2 = get_master_log_pos(mi, bufptr + 8, &sql_file_pos); + int8store(bufptr, sql_file_pos); // store position + + msg_len += /* '\0' */ 1 + /* position */ 8 + name_len2 + /* '\0' */ 1; + } + net_clear(net, 0); /* Send the reply. */ - reply_res = my_net_write(net, reply_buffer, - name_len + REPLY_BINLOG_NAME_OFFSET); + reply_res = my_net_write(net, reply_buffer, msg_len); + if (!reply_res) { reply_res = net_flush(net); diff --git a/plugin/semisync/semisync_slave.h b/plugin/semisync/semisync_slave.h index 1bf8cf31972..c91847d828d 100644 --- a/plugin/semisync/semisync_slave.h +++ b/plugin/semisync/semisync_slave.h @@ -60,23 +60,30 @@ public: * Return: * 0: success; non-zero: error */ - int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply, + int slaveReadSyncHeader(const char *header, unsigned long total_len, + unsigned char *need_reply_byte, const char **payload, unsigned long *payload_len); /* A slave replies to the master indicating its replication process. It * indicates that the slave has received all events before the specified * binlog position. - * + * * Input: + * need_reply_byte - (IN) the header byte * mysql - (IN) the mysql network connection * binlog_filename - (IN) the reply point's binlog file name * binlog_filepos - (IN) the reply point's binlog file offset + * master_info - (IN) the master info struct so that we can get more + * info if needed * * Return: * 0: success; non-zero: error */ - int slaveReply(MYSQL *mysql, const char *binlog_filename, - my_off_t binlog_filepos); + int slaveReply(unsigned char need_reply_byte, + MYSQL *mysql, + const char *binlog_filename, + my_off_t binlog_filepos, + Master_info* master_info); int slaveStart(Binlog_relay_IO_param *param); int slaveStop(Binlog_relay_IO_param *param); @@ -93,5 +100,6 @@ private: extern char rpl_semi_sync_slave_enabled; extern unsigned long rpl_semi_sync_slave_trace_level; extern char rpl_semi_sync_slave_status; +extern char rpl_semi_sync_slave_lag_enabled; #endif /* SEMISYNC_SLAVE_H */ diff --git a/plugin/semisync/semisync_slave_plugin.cc b/plugin/semisync/semisync_slave_plugin.cc index 3a6c7625d93..6c85a07ae2a 100644 --- a/plugin/semisync/semisync_slave_plugin.cc +++ b/plugin/semisync/semisync_slave_plugin.cc @@ -28,7 +28,7 @@ static ReplSemiSyncSlave repl_semisync; event read is the last event of a transaction. And the value is checked in repl_semi_slave_queue_event. */ -bool semi_sync_need_reply= false; +unsigned char semi_sync_need_reply= 0; C_MODE_START @@ -81,6 +81,23 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param, return 1; } mysql_free_result(mysql_store_result(mysql)); + + if (rpl_semi_sync_slave_lag_enabled) + { + char buf[100]; + /* + Tell master that we can do exec-position reporting + */ + snprintf(buf, sizeof(buf), "SET @%s= 1", + ReplSemiSyncBase::kRplSemiSyncSlaveReportExec); + if (mysql_real_query(mysql, buf, strlen(buf))) + { + sql_print_error("query: %s on master failed", buf); + return 1; + } + mysql_free_result(mysql_store_result(mysql)); + } + rpl_semi_sync_slave_status= 1; return 0; } @@ -110,9 +127,11 @@ int repl_semi_slave_queue_event(Binlog_relay_IO_param *param, should not cause the slave IO thread to stop, and the error messages are already reported. */ - (void) repl_semisync.slaveReply(param->mysql, + (void) repl_semisync.slaveReply(semi_sync_need_reply, + param->mysql, param->master_log_name, - param->master_log_pos); + param->master_log_pos, + param->mi); } return 0; } @@ -164,9 +183,17 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_slave_trace_level, &fix_rpl_semi_sync_trace_level, // update 32, 0, ~0UL, 1); +static MYSQL_SYSVAR_BOOL(lag_enabled, rpl_semi_sync_slave_lag_enabled, + PLUGIN_VAR_OPCMDARG, + "Enable semi-synchronous replication slave lag reporting. ", + NULL, // check + NULL, // update + 0); + static SYS_VAR* semi_sync_slave_system_vars[]= { MYSQL_SYSVAR(enabled), MYSQL_SYSVAR(trace_level), + MYSQL_SYSVAR(lag_enabled), NULL, }; @@ -230,4 +257,3 @@ maria_declare_plugin(semisync_slave) MariaDB_PLUGIN_MATURITY_STABLE } maria_declare_plugin_end; - |