summaryrefslogtreecommitdiff
path: root/plugin/semisync/semisync_master.h
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/semisync/semisync_master.h')
-rw-r--r--plugin/semisync/semisync_master.h202
1 files changed, 190 insertions, 12 deletions
diff --git a/plugin/semisync/semisync_master.h b/plugin/semisync/semisync_master.h
index c2862476ec8..0167e26bcc8 100644
--- a/plugin/semisync/semisync_master.h
+++ b/plugin/semisync/semisync_master.h
@@ -24,17 +24,101 @@
#ifdef HAVE_PSI_INTERFACE
extern PSI_mutex_key key_ss_mutex_LOCK_binlog_;
extern PSI_cond_key key_ss_cond_COND_binlog_send_;
+
+extern PSI_mutex_key key_ss_mutex_LOCK_slave_lag_;
+extern PSI_cond_key key_ss_cond_COND_slave_lag_;
#endif
extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave;
+extern PSI_stage_info stage_waiting_for_semi_sync_slave_lag;
struct TranxNode {
char log_name_[FN_REFLEN];
- my_off_t log_pos_;
+ my_off_t log_pos_;
+ ulonglong tranx_commit_time_us;
struct TranxNode *next_; /* the next node in the sorted list */
struct TranxNode *hash_next_; /* the next node during hash collision */
};
+struct LogPos;
+
+/* This represent a log position */
+struct LogPosPtr {
+ LogPosPtr() { Uninit();}
+ LogPosPtr(const char *name, my_off_t pos) : file_name(name), file_pos(pos){}
+ explicit LogPosPtr(const LogPos& pos) { Assign(&pos); }
+
+ const char *file_name;
+ my_off_t file_pos;
+
+ LogPosPtr& Assign(const LogPosPtr *src) {
+ file_name = src->file_name;
+ file_pos = src->file_pos;
+ return *this;
+ }
+
+ LogPosPtr& Assign(const LogPos *src);
+
+ void Uninit() { file_name = NULL;}
+ bool IsInited() const { return file_name != NULL; }
+};
+
+struct LogPos {
+ char file_name[FN_REFLEN];
+ my_off_t file_pos;
+
+ LogPos() { Uninit(); }
+
+ LogPosPtr ToLogPosPtr() const {
+ if (IsInited()){
+ LogPosPtr p(file_name, file_pos);
+ return p;
+ } else {
+ LogPosPtr p;
+ return p;
+ }
+ }
+
+ LogPos& Assign(const LogPosPtr *src) {
+ if (src->IsInited()) {
+ strcpy(file_name, src->file_name);
+ file_pos = src->file_pos;
+ } else {
+ Uninit();
+ }
+ return *this;
+ }
+
+ LogPos& Assign(const LogPos* src) {
+ LogPosPtr p = src->ToLogPosPtr();
+ Assign(&p);
+ return *this;
+ }
+
+ void Uninit() { file_name[0] = 0; }
+ bool IsInited() const { return file_name[0] != 0; }
+};
+
+inline LogPosPtr& LogPosPtr::Assign(const LogPos* src) {
+ LogPosPtr p = src->ToLogPosPtr();
+ Assign(&p);
+ return *this;
+}
+
+inline int CompareLogPos(const LogPosPtr *pos1, const LogPosPtr *pos2) {
+ int cmp = strcmp(pos1->file_name, pos2->file_name);
+
+ if (cmp != 0)
+ return cmp;
+
+ if (pos1->file_pos > pos2->file_pos)
+ return 1;
+ else if (pos1->file_pos < pos2->file_pos)
+ return -1;
+ else
+ return 0;
+}
+
/**
@class TranxNodeAllocator
@@ -329,10 +413,14 @@ private:
node2->log_name_, node2->log_pos_);
}
+ void set_new_front(TranxNode* new_front);
+
public:
ActiveTranx(mysql_mutex_t *lock, unsigned long trace_level);
~ActiveTranx();
+ bool is_empty() const { return trx_front_ == NULL; }
+
/* Insert an active transaction node with the specified position.
*
* Return:
@@ -340,21 +428,42 @@ public:
*/
int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
- /* Clear the active transaction nodes until(inclusive) the specified
- * position.
- * If log_file_name is NULL, everything will be cleared: the sorted
+ /* Clear the active transaction
+ * Everything will be cleared: the sorted
* list and the hash table will be reset to empty.
- *
+ *
* Return:
- * 0: success; non-zero: error
+ * 0 success; non-zero: error
+ */
+ int clear_active_tranx_nodes();
+
+ /* Prune the active transaction nodes until the specified
+ * position (inclusive).
+ *
+ * Return:
+ * true if any transaction was removed
+ * false if list was left unchanged
*/
- int clear_active_tranx_nodes(const char *log_file_name,
- my_off_t log_file_pos);
+ bool prune_active_tranx_nodes(LogPosPtr logpos,
+ ulonglong *oldest_tranx_commit_time_us);
- /* Given a position, check to see whether the position is an active
- * transaction's ending position by probing the hash table.
+ /* Lookup a transaction's ending position by probing the hash table.
+ *
+ * return entry if found or NULL otherwise
*/
- bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos);
+ TranxNode* lookup_tranx_end_pos(const char *log_file_name,
+ my_off_t log_file_pos);
+
+
+ /* Check if an entry is rear (i.e last) */
+ bool is_rear(TranxNode* entry) const { return entry == trx_rear_; }
+
+ /**
+ * return timestamp of oldest transaction in list
+ */
+ ulonglong get_oldest_tranx_commit_time_us() const {
+ return trx_front_->tranx_commit_time_us;
+ }
/* Given two binlog positions, compare which one is bigger based on
* (file_name, file_position).
@@ -365,6 +474,24 @@ public:
};
/**
+ * State that semisync master keeps per slave
+ */
+struct ReplSemiSyncMasterPerSlaveState
+{
+ ReplSemiSyncMasterPerSlaveState() : unacked_event_count_(0) {}
+
+ /**
+ * No of events that has not been semi-sync acked
+ */
+ unsigned unacked_event_count_;
+
+ /**
+ * Position of last event that was semisync'ed
+ */
+ LogPos sync_req_pos_;
+};
+
+/**
The extension class for the master of semi-synchronous replication
*/
class ReplSemiSyncMaster
@@ -432,6 +559,16 @@ class ReplSemiSyncMaster
bool state_; /* whether semi-sync is switched */
+ /* This cond variable is signaled when slave lag has decreased */
+ mysql_cond_t COND_slave_lag_;
+
+ /* Mutex that protects oldest_unapplied_tranx_commit_time_us */
+ mysql_mutex_t LOCK_slave_lag_;
+
+ /* this is commit time of oldest transaction that has not been applied
+ * on any slave */
+ ulonglong oldest_unapplied_tranx_commit_time_us_;
+
void lock();
void unlock();
void cond_broadcast();
@@ -493,6 +630,9 @@ class ReplSemiSyncMaster
/* Is the slave servered by the thread requested semi-sync */
bool is_semi_sync_slave();
+ /* Does this slave have slave lag reporting capabilities */
+ bool has_semi_sync_slave_lag();
+
/* In semi-sync replication, reports up to which binlog position we have
* received replies from the slave indicating that it already get the events.
*
@@ -501,13 +641,15 @@ class ReplSemiSyncMaster
* log_file_name - (IN) binlog file name
* end_offset - (IN) the offset in the binlog file up to which we have
* the replies from the slave
+ * exec_position - (IN) position of SQL thread or NULL if not present
*
* Return:
* 0: success; non-zero: error
*/
int reportReplyBinlog(uint32 server_id,
const char* log_file_name,
- my_off_t end_offset);
+ my_off_t end_offset,
+ const LogPos *exec_position);
/* Commit a transaction in the final step. This function is called from
* InnoDB before returning from the low commit. If semi-sync is switch on,
@@ -540,6 +682,16 @@ class ReplSemiSyncMaster
*/
int reserveSyncHeader(unsigned char *header, unsigned long size);
+ /*
+ * check if an event should be semi synced and optionally
+ * if it should report back position of SQL thread on slave
+ *
+ * return 0 - no semi sync
+ * 1 - semi sync
+ * 2 - semi sync and report exec position
+ */
+ int checkSyncReq(const LogPosPtr *log_pos);
+
/* Update the sync bit in the packet header to indicate to the slave whether
* the master will wait for the reply of the event. If semi-sync is switched
* off and we detect that the slave is catching up, we switch semi-sync on.
@@ -592,6 +744,21 @@ class ReplSemiSyncMaster
* go off for that.
*/
int resetMaster();
+
+ /**
+ * wake potential slave-lag waiters
+ * called by binlog dump-thread(s)
+ */
+ void wake_slave_lag_waiters(ulonglong oldest_unapplied_tranx_commit_time_us);
+
+ /**
+ * wait for slave lag to get below threshold
+ * called by user-thread(s)
+ *
+ * return 0 - success
+ * 1 - timeout
+ */
+ int wait_slave_lag(ulong max_wait_time_sec);
};
enum rpl_semi_sync_master_wait_point_t {
@@ -621,6 +788,17 @@ extern unsigned long long rpl_semi_sync_master_trx_wait_num;
extern unsigned long long rpl_semi_sync_master_net_wait_time;
extern unsigned long long rpl_semi_sync_master_trx_wait_time;
+extern unsigned long rpl_semi_sync_master_max_unacked_event_count;
+extern unsigned long rpl_semi_sync_master_max_unacked_event_bytes;
+extern unsigned long rpl_semi_sync_master_max_slave_lag;
+extern unsigned long rpl_semi_sync_master_slave_lag_heartbeat_frequency_us;
+extern unsigned long rpl_semi_sync_master_slave_lag_wait_sessions;
+extern unsigned long long rpl_semi_sync_master_estimated_slave_lag;
+
+extern unsigned long rpl_semi_sync_master_avg_trx_slave_lag_wait_time;
+extern unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_num;
+extern unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_time;
+
/*
This indicates whether we should keep waiting if no semi-sync slave
is available.