diff options
Diffstat (limited to 'plugin/semisync/semisync_master.h')
-rw-r--r-- | plugin/semisync/semisync_master.h | 202 |
1 files changed, 190 insertions, 12 deletions
diff --git a/plugin/semisync/semisync_master.h b/plugin/semisync/semisync_master.h index c2862476ec8..0167e26bcc8 100644 --- a/plugin/semisync/semisync_master.h +++ b/plugin/semisync/semisync_master.h @@ -24,17 +24,101 @@ #ifdef HAVE_PSI_INTERFACE extern PSI_mutex_key key_ss_mutex_LOCK_binlog_; extern PSI_cond_key key_ss_cond_COND_binlog_send_; + +extern PSI_mutex_key key_ss_mutex_LOCK_slave_lag_; +extern PSI_cond_key key_ss_cond_COND_slave_lag_; #endif extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave; +extern PSI_stage_info stage_waiting_for_semi_sync_slave_lag; struct TranxNode { char log_name_[FN_REFLEN]; - my_off_t log_pos_; + my_off_t log_pos_; + ulonglong tranx_commit_time_us; struct TranxNode *next_; /* the next node in the sorted list */ struct TranxNode *hash_next_; /* the next node during hash collision */ }; +struct LogPos; + +/* This represent a log position */ +struct LogPosPtr { + LogPosPtr() { Uninit();} + LogPosPtr(const char *name, my_off_t pos) : file_name(name), file_pos(pos){} + explicit LogPosPtr(const LogPos& pos) { Assign(&pos); } + + const char *file_name; + my_off_t file_pos; + + LogPosPtr& Assign(const LogPosPtr *src) { + file_name = src->file_name; + file_pos = src->file_pos; + return *this; + } + + LogPosPtr& Assign(const LogPos *src); + + void Uninit() { file_name = NULL;} + bool IsInited() const { return file_name != NULL; } +}; + +struct LogPos { + char file_name[FN_REFLEN]; + my_off_t file_pos; + + LogPos() { Uninit(); } + + LogPosPtr ToLogPosPtr() const { + if (IsInited()){ + LogPosPtr p(file_name, file_pos); + return p; + } else { + LogPosPtr p; + return p; + } + } + + LogPos& Assign(const LogPosPtr *src) { + if (src->IsInited()) { + strcpy(file_name, src->file_name); + file_pos = src->file_pos; + } else { + Uninit(); + } + return *this; + } + + LogPos& Assign(const LogPos* src) { + LogPosPtr p = src->ToLogPosPtr(); + Assign(&p); + return *this; + } + + void Uninit() { file_name[0] = 0; } + bool IsInited() const { return file_name[0] != 0; } +}; + +inline LogPosPtr& LogPosPtr::Assign(const LogPos* src) { + LogPosPtr p = src->ToLogPosPtr(); + Assign(&p); + return *this; +} + +inline int CompareLogPos(const LogPosPtr *pos1, const LogPosPtr *pos2) { + int cmp = strcmp(pos1->file_name, pos2->file_name); + + if (cmp != 0) + return cmp; + + if (pos1->file_pos > pos2->file_pos) + return 1; + else if (pos1->file_pos < pos2->file_pos) + return -1; + else + return 0; +} + /** @class TranxNodeAllocator @@ -329,10 +413,14 @@ private: node2->log_name_, node2->log_pos_); } + void set_new_front(TranxNode* new_front); + public: ActiveTranx(mysql_mutex_t *lock, unsigned long trace_level); ~ActiveTranx(); + bool is_empty() const { return trx_front_ == NULL; } + /* Insert an active transaction node with the specified position. * * Return: @@ -340,21 +428,42 @@ public: */ int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos); - /* Clear the active transaction nodes until(inclusive) the specified - * position. - * If log_file_name is NULL, everything will be cleared: the sorted + /* Clear the active transaction + * Everything will be cleared: the sorted * list and the hash table will be reset to empty. - * + * * Return: - * 0: success; non-zero: error + * 0 success; non-zero: error + */ + int clear_active_tranx_nodes(); + + /* Prune the active transaction nodes until the specified + * position (inclusive). + * + * Return: + * true if any transaction was removed + * false if list was left unchanged */ - int clear_active_tranx_nodes(const char *log_file_name, - my_off_t log_file_pos); + bool prune_active_tranx_nodes(LogPosPtr logpos, + ulonglong *oldest_tranx_commit_time_us); - /* Given a position, check to see whether the position is an active - * transaction's ending position by probing the hash table. + /* Lookup a transaction's ending position by probing the hash table. + * + * return entry if found or NULL otherwise */ - bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos); + TranxNode* lookup_tranx_end_pos(const char *log_file_name, + my_off_t log_file_pos); + + + /* Check if an entry is rear (i.e last) */ + bool is_rear(TranxNode* entry) const { return entry == trx_rear_; } + + /** + * return timestamp of oldest transaction in list + */ + ulonglong get_oldest_tranx_commit_time_us() const { + return trx_front_->tranx_commit_time_us; + } /* Given two binlog positions, compare which one is bigger based on * (file_name, file_position). @@ -365,6 +474,24 @@ public: }; /** + * State that semisync master keeps per slave + */ +struct ReplSemiSyncMasterPerSlaveState +{ + ReplSemiSyncMasterPerSlaveState() : unacked_event_count_(0) {} + + /** + * No of events that has not been semi-sync acked + */ + unsigned unacked_event_count_; + + /** + * Position of last event that was semisync'ed + */ + LogPos sync_req_pos_; +}; + +/** The extension class for the master of semi-synchronous replication */ class ReplSemiSyncMaster @@ -432,6 +559,16 @@ class ReplSemiSyncMaster bool state_; /* whether semi-sync is switched */ + /* This cond variable is signaled when slave lag has decreased */ + mysql_cond_t COND_slave_lag_; + + /* Mutex that protects oldest_unapplied_tranx_commit_time_us */ + mysql_mutex_t LOCK_slave_lag_; + + /* this is commit time of oldest transaction that has not been applied + * on any slave */ + ulonglong oldest_unapplied_tranx_commit_time_us_; + void lock(); void unlock(); void cond_broadcast(); @@ -493,6 +630,9 @@ class ReplSemiSyncMaster /* Is the slave servered by the thread requested semi-sync */ bool is_semi_sync_slave(); + /* Does this slave have slave lag reporting capabilities */ + bool has_semi_sync_slave_lag(); + /* In semi-sync replication, reports up to which binlog position we have * received replies from the slave indicating that it already get the events. * @@ -501,13 +641,15 @@ class ReplSemiSyncMaster * log_file_name - (IN) binlog file name * end_offset - (IN) the offset in the binlog file up to which we have * the replies from the slave + * exec_position - (IN) position of SQL thread or NULL if not present * * Return: * 0: success; non-zero: error */ int reportReplyBinlog(uint32 server_id, const char* log_file_name, - my_off_t end_offset); + my_off_t end_offset, + const LogPos *exec_position); /* Commit a transaction in the final step. This function is called from * InnoDB before returning from the low commit. If semi-sync is switch on, @@ -540,6 +682,16 @@ class ReplSemiSyncMaster */ int reserveSyncHeader(unsigned char *header, unsigned long size); + /* + * check if an event should be semi synced and optionally + * if it should report back position of SQL thread on slave + * + * return 0 - no semi sync + * 1 - semi sync + * 2 - semi sync and report exec position + */ + int checkSyncReq(const LogPosPtr *log_pos); + /* Update the sync bit in the packet header to indicate to the slave whether * the master will wait for the reply of the event. If semi-sync is switched * off and we detect that the slave is catching up, we switch semi-sync on. @@ -592,6 +744,21 @@ class ReplSemiSyncMaster * go off for that. */ int resetMaster(); + + /** + * wake potential slave-lag waiters + * called by binlog dump-thread(s) + */ + void wake_slave_lag_waiters(ulonglong oldest_unapplied_tranx_commit_time_us); + + /** + * wait for slave lag to get below threshold + * called by user-thread(s) + * + * return 0 - success + * 1 - timeout + */ + int wait_slave_lag(ulong max_wait_time_sec); }; enum rpl_semi_sync_master_wait_point_t { @@ -621,6 +788,17 @@ extern unsigned long long rpl_semi_sync_master_trx_wait_num; extern unsigned long long rpl_semi_sync_master_net_wait_time; extern unsigned long long rpl_semi_sync_master_trx_wait_time; +extern unsigned long rpl_semi_sync_master_max_unacked_event_count; +extern unsigned long rpl_semi_sync_master_max_unacked_event_bytes; +extern unsigned long rpl_semi_sync_master_max_slave_lag; +extern unsigned long rpl_semi_sync_master_slave_lag_heartbeat_frequency_us; +extern unsigned long rpl_semi_sync_master_slave_lag_wait_sessions; +extern unsigned long long rpl_semi_sync_master_estimated_slave_lag; + +extern unsigned long rpl_semi_sync_master_avg_trx_slave_lag_wait_time; +extern unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_num; +extern unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_time; + /* This indicates whether we should keep waiting if no semi-sync slave is available. |