summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/suite/rpl/r/rpl_gtid_ignored.result70
-rw-r--r--mysql-test/suite/rpl/r/rpl_gtid_mdev4484.result3
-rw-r--r--mysql-test/suite/rpl/t/rpl_gtid_ignored.test136
-rw-r--r--mysql-test/suite/rpl/t/rpl_gtid_mdev4484.test6
-rw-r--r--sql/log_event.cc83
-rw-r--r--sql/log_event.h5
-rw-r--r--sql/rpl_gtid.cc28
-rw-r--r--sql/rpl_gtid.h2
-rw-r--r--sql/rpl_rli.h2
-rw-r--r--sql/slave.cc145
10 files changed, 433 insertions, 47 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_gtid_ignored.result b/mysql-test/suite/rpl/r/rpl_gtid_ignored.result
new file mode 100644
index 00000000000..7d6e65bcb6f
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_gtid_ignored.result
@@ -0,0 +1,70 @@
+include/rpl_init.inc [topology=1->2]
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+SET @old_gtid_strict_mode= @@GLOBAL.gtid_strict_mode;
+SET GLOBAL gtid_strict_mode= 1;
+include/stop_slave.inc
+SET @old_gtid_strict_mode= @@GLOBAL.gtid_strict_mode;
+SET GLOBAL gtid_strict_mode=1;
+CHANGE MASTER TO master_use_gtid=slave_pos;
+include/start_slave.inc
+CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1);
+**** MDEV-4488: GTID position should be updated for events that are ignored due to server id ***
+include/stop_slave.inc
+CHANGE MASTER TO ignore_server_ids=(1);
+include/start_slave.inc
+INSERT INTO t1 VALUES (2);
+INSERT INTO t1 VALUES (3);
+RESULT
+OK
+SELECT * FROM t1 ORDER BY a;
+a
+1
+include/stop_slave.inc
+CHANGE MASTER TO ignore_server_ids=();
+include/start_slave.inc
+RESULT
+OK
+SELECT * FROM t1 ORDER BY a;
+a
+1
+INSERT INTO t1 VALUES (4);
+INSERT INTO t1 VALUES (5);
+RESULT
+OK
+a
+1
+4
+5
+*** Test the same thing when IO thread exits before SQL thread reaches end of log. ***
+include/stop_slave.inc
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug= "+d,inject_slave_sql_before_apply_event";
+CHANGE MASTER TO ignore_server_ids=(1);
+include/start_slave.inc
+INSERT INTO t1 VALUES (6);
+INSERT INTO t1 VALUES (7);
+include/wait_for_slave_param.inc [Read_Master_Log_Pos]
+STOP SLAVE IO_THREAD;
+SET debug_sync = "now SIGNAL continue";
+RESULT
+OK
+RESULT
+OK
+include/stop_slave.inc
+CHANGE MASTER TO ignore_server_ids=();
+SET GLOBAL debug_dbug= @old_dbug;
+include/start_slave.inc
+INSERT INTO t1 VALUES (8);
+INSERT INTO t1 VALUES (9);
+SELECT * FROM t1 ORDER BY a;
+a
+1
+4
+5
+8
+9
+DROP TABLE t1;
+SET GLOBAL gtid_strict_mode= @old_gtid_strict_mode;
+SET GLOBAL gtid_strict_mode= @old_gtid_strict_mode;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/r/rpl_gtid_mdev4484.result b/mysql-test/suite/rpl/r/rpl_gtid_mdev4484.result
index 1acce9e65ff..4d223452151 100644
--- a/mysql-test/suite/rpl/r/rpl_gtid_mdev4484.result
+++ b/mysql-test/suite/rpl/r/rpl_gtid_mdev4484.result
@@ -7,10 +7,11 @@ INSERT INTO t1 VALUES (1);
include/stop_slave.inc
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,gtid_slave_pos_simulate_failed_delete";
-include/start_slave.inc
SET sql_log_bin= 0;
CALL mtr.add_suppression("Can't find file");
+ALTER TABLE mysql.gtid_slave_pos ENGINE=MyISAM;
SET sql_log_bin= 1;
+include/start_slave.inc
INSERT INTO t1 VALUES (2);
include/wait_for_slave_sql_error.inc [errno=1942]
STOP SLAVE IO_THREAD;
diff --git a/mysql-test/suite/rpl/t/rpl_gtid_ignored.test b/mysql-test/suite/rpl/t/rpl_gtid_ignored.test
new file mode 100644
index 00000000000..ee5ca92b55e
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_gtid_ignored.test
@@ -0,0 +1,136 @@
+--source include/have_innodb.inc
+--source include/have_debug.inc
+--source include/have_debug_sync.inc
+
+--let $rpl_topology=1->2
+--source include/rpl_init.inc
+
+--connection server_1
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+
+SET @old_gtid_strict_mode= @@GLOBAL.gtid_strict_mode;
+SET GLOBAL gtid_strict_mode= 1;
+
+--connection server_2
+--source include/stop_slave.inc
+SET @old_gtid_strict_mode= @@GLOBAL.gtid_strict_mode;
+SET GLOBAL gtid_strict_mode=1;
+CHANGE MASTER TO master_use_gtid=slave_pos;
+--source include/start_slave.inc
+
+--connection server_1
+CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1);
+--save_master_pos
+
+--connection server_2
+--sync_with_master
+
+--echo **** MDEV-4488: GTID position should be updated for events that are ignored due to server id ***
+--source include/stop_slave.inc
+CHANGE MASTER TO ignore_server_ids=(1);
+--source include/start_slave.inc
+
+--connection server_1
+# These inserts should be ignored (not applied) on the slave, but the
+# gtid_slave_pos should still be updated.
+INSERT INTO t1 VALUES (2);
+INSERT INTO t1 VALUES (3);
+--save_master_pos
+--let gtid_pos=`SELECT @@GLOBAL.gtid_binlog_pos`
+
+--connection server_2
+--sync_with_master
+--let $wait_condition= SELECT @@GLOBAL.gtid_slave_pos = '$gtid_pos'
+--source include/wait_condition.inc
+--disable_query_log
+eval SELECT IF(@@GLOBAL.gtid_slave_pos = '$gtid_pos', 'OK', CONCAT("ERROR: Expected $gtid_pos got ", @@GLOBAL.gtid_slave_pos)) AS RESULT;
+--enable_query_log
+
+SELECT * FROM t1 ORDER BY a;
+
+--source include/stop_slave.inc
+CHANGE MASTER TO ignore_server_ids=();
+--source include/start_slave.inc
+--sync_with_master
+--disable_query_log
+eval SELECT IF(@@GLOBAL.gtid_slave_pos = '$gtid_pos', 'OK', CONCAT("ERROR: Expected $gtid_pos got ", @@GLOBAL.gtid_slave_pos)) AS RESULT;
+--enable_query_log
+
+SELECT * FROM t1 ORDER BY a;
+
+--connection server_1
+INSERT INTO t1 VALUES (4);
+INSERT INTO t1 VALUES (5);
+--let gtid_pos=`SELECT @@GLOBAL.gtid_binlog_pos`
+--save_master_pos
+
+--connection server_2
+--sync_with_master
+--disable_query_log
+eval SELECT IF(@@GLOBAL.gtid_slave_pos = '$gtid_pos', 'OK', CONCAT("ERROR: Expected $gtid_pos got ", @@GLOBAL.gtid_slave_pos)) AS RESULT;
+SELECT * FROM t1 ORDER BY a;
+--enable_query_log
+
+
+--echo *** Test the same thing when IO thread exits before SQL thread reaches end of log. ***
+--connection server_2
+--source include/stop_slave.inc
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug= "+d,inject_slave_sql_before_apply_event";
+CHANGE MASTER TO ignore_server_ids=(1);
+--source include/start_slave.inc
+
+--connection server_1
+INSERT INTO t1 VALUES (6);
+INSERT INTO t1 VALUES (7);
+--let $master_pos= query_get_value(SHOW MASTER STATUS, Position, 1)
+--let gtid_pos=`SELECT @@GLOBAL.gtid_binlog_pos`
+--save_master_pos
+
+--connection server_2
+# Wait for IO thread to have read all events from master, and for SQL thread to
+# sit in the debug_sync point.
+
+--let $slave_param= Read_Master_Log_Pos
+--let $slave_param_value= $master_pos
+--source include/wait_for_slave_param.inc
+
+# Now stop the IO thread, and let the SQL thread continue. The IO thread
+# should write a Gtid_list event that the SQL thread can use to update the
+# gtid_slave_pos with the GTIDs of the skipped events.
+STOP SLAVE IO_THREAD;
+SET debug_sync = "now SIGNAL continue";
+
+--sync_with_master
+--let $wait_condition= SELECT @@GLOBAL.gtid_slave_pos = '$gtid_pos'
+--source include/wait_condition.inc
+--disable_query_log
+eval SELECT IF(@@GLOBAL.gtid_slave_pos = '$gtid_pos', 'OK', CONCAT("ERROR: Expected $gtid_pos got ", @@GLOBAL.gtid_slave_pos)) AS RESULT;
+--let $slave_pos= query_get_value(SHOW SLAVE STATUS, Exec_Master_Log_Pos, 1)
+eval SELECT IF('$slave_pos' = '$master_pos', 'OK', "ERROR: Expected $master_pos got $slave_pos") AS RESULT;
+--enable_query_log
+
+
+--source include/stop_slave.inc
+CHANGE MASTER TO ignore_server_ids=();
+SET GLOBAL debug_dbug= @old_dbug;
+--source include/start_slave.inc
+
+--connection server_1
+INSERT INTO t1 VALUES (8);
+INSERT INTO t1 VALUES (9);
+--save_master_pos
+
+--connection server_2
+--sync_with_master
+SELECT * FROM t1 ORDER BY a;
+
+# Clean up.
+--connection server_1
+DROP TABLE t1;
+SET GLOBAL gtid_strict_mode= @old_gtid_strict_mode;
+--connection server_2
+SET GLOBAL gtid_strict_mode= @old_gtid_strict_mode;
+
+--source include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_gtid_mdev4484.test b/mysql-test/suite/rpl/t/rpl_gtid_mdev4484.test
index b3ff76c2afe..43634ec1528 100644
--- a/mysql-test/suite/rpl/t/rpl_gtid_mdev4484.test
+++ b/mysql-test/suite/rpl/t/rpl_gtid_mdev4484.test
@@ -18,10 +18,14 @@ INSERT INTO t1 VALUES (1);
--source include/stop_slave.inc
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,gtid_slave_pos_simulate_failed_delete";
---source include/start_slave.inc
SET sql_log_bin= 0;
CALL mtr.add_suppression("Can't find file");
+# Since we inject an error updating mysql.gtid_slave_pos, we will get different
+# output depending on whether it is InnoDB or MyISAM (roll back or no roll
+# back). So fix it to make sure we are consistent.
+ALTER TABLE mysql.gtid_slave_pos ENGINE=MyISAM;
SET sql_log_bin= 1;
+--source include/start_slave.inc
--connection master
INSERT INTO t1 VALUES (2);
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 791afeb4e1d..5e30918106a 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -6334,7 +6334,7 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
const Format_description_log_event *description_event)
- : Log_event(buf, description_event), count(0), list(0)
+ : Log_event(buf, description_event), count(0), list(0), sub_id_list(0)
{
uint32 i;
uint32 val;
@@ -6363,6 +6363,31 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
list[i].seq_no= uint8korr(buf);
buf+= 8;
}
+
+#ifdef MYSQL_SERVER
+ if ((gl_flags & FLAG_IGN_GTIDS))
+ {
+ uint32 i;
+ if (!(sub_id_list= (uint64 *)my_malloc(count*sizeof(uint64), MYF(MY_WME))))
+ {
+ my_free(list);
+ list= NULL;
+ return;
+ }
+ for (i= 0; i < count; ++i)
+ {
+ if (!(sub_id_list[i]=
+ rpl_global_gtid_slave_state.next_sub_id(list[i].domain_id)))
+ {
+ my_free(list);
+ my_free(sub_id_list);
+ list= NULL;
+ sub_id_list= NULL;
+ return;
+ }
+ }
+ }
+#endif
}
@@ -6370,7 +6395,7 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set,
uint32 gl_flags_)
- : count(gtid_set->count()), gl_flags(gl_flags_), list(0)
+ : count(gtid_set->count()), gl_flags(gl_flags_), list(0), sub_id_list(0)
{
cache_type= EVENT_NO_CACHE;
/* Failure to allocate memory will be caught by is_valid() returning false. */
@@ -6381,6 +6406,45 @@ Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set,
}
+Gtid_list_log_event::Gtid_list_log_event(slave_connection_state *gtid_set,
+ uint32 gl_flags_)
+ : count(gtid_set->count()), gl_flags(gl_flags_), list(0), sub_id_list(0)
+{
+ cache_type= EVENT_NO_CACHE;
+ /* Failure to allocate memory will be caught by is_valid() returning false. */
+ if (count < (1<<28) &&
+ (list = (rpl_gtid *)my_malloc(count * sizeof(*list) + (count == 0),
+ MYF(MY_WME))))
+ {
+ gtid_set->get_gtid_list(list, count);
+ if (gl_flags & FLAG_IGN_GTIDS)
+ {
+ uint32 i;
+
+ if (!(sub_id_list= (uint64 *)my_malloc(count * sizeof(uint64),
+ MYF(MY_WME))))
+ {
+ my_free(list);
+ list= NULL;
+ return;
+ }
+ for (i= 0; i < count; ++i)
+ {
+ if (!(sub_id_list[i]=
+ rpl_global_gtid_slave_state.next_sub_id(list[i].domain_id)))
+ {
+ my_free(list);
+ my_free(sub_id_list);
+ list= NULL;
+ sub_id_list= NULL;
+ return;
+ }
+ }
+ }
+ }
+}
+
+
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
bool
Gtid_list_log_event::to_packet(String *packet)
@@ -6432,7 +6496,20 @@ Gtid_list_log_event::write(IO_CACHE *file)
int
Gtid_list_log_event::do_apply_event(Relay_log_info const *rli)
{
- int ret= Log_event::do_apply_event(rli);
+ int ret;
+ if (gl_flags & FLAG_IGN_GTIDS)
+ {
+ uint32 i;
+ for (i= 0; i < count; ++i)
+ {
+ if ((ret= rpl_global_gtid_slave_state.record_gtid(thd, &list[i],
+ sub_id_list[i],
+ false, false)))
+ return ret;
+ rpl_global_gtid_slave_state.update_state_hash(sub_id_list[i], &list[i]);
+ }
+ }
+ ret= Log_event::do_apply_event(rli);
if (rli->until_condition == Relay_log_info::UNTIL_GTID &&
(gl_flags & FLAG_UNTIL_REACHED))
{
diff --git a/sql/log_event.h b/sql/log_event.h
index b73c0e71f77..abb3b96bac4 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -3197,12 +3197,15 @@ public:
uint32 count;
uint32 gl_flags;
struct rpl_gtid *list;
+ uint64 *sub_id_list;
static const uint element_size= 4+4+8;
static const uint32 FLAG_UNTIL_REACHED= (1<<28);
+ static const uint32 FLAG_IGN_GTIDS= (1<<29);
#ifdef MYSQL_SERVER
Gtid_list_log_event(rpl_binlog_state *gtid_set, uint32 gl_flags);
+ Gtid_list_log_event(slave_connection_state *gtid_set, uint32 gl_flags);
#ifdef HAVE_REPLICATION
void pack_info(THD *thd, Protocol *protocol);
#endif
@@ -3211,7 +3214,7 @@ public:
#endif
Gtid_list_log_event(const char *buf, uint event_len,
const Format_description_log_event *description_event);
- ~Gtid_list_log_event() { my_free(list); }
+ ~Gtid_list_log_event() { my_free(list); my_free(sub_id_list); }
Log_event_type get_type_code() { return GTID_LIST_EVENT; }
int get_data_size() {
/*
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index c8a00cd78ae..96dade4d390 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -1419,6 +1419,15 @@ slave_connection_state::remove(const rpl_gtid *in_gtid)
}
+void
+slave_connection_state::remove_if_present(const rpl_gtid *in_gtid)
+{
+ uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
+ if (rec)
+ my_hash_delete(&hash, rec);
+}
+
+
int
slave_connection_state::to_string(String *out_str)
{
@@ -1442,3 +1451,22 @@ slave_connection_state::append_to_string(String *out_str)
}
return 0;
}
+
+
+int
+slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
+{
+ uint32 i, pos;
+
+ pos= 0;
+ for (i= 0; i < hash.records; ++i)
+ {
+ entry *e;
+ if (pos >= list_size)
+ return 1;
+ e= (entry *)my_hash_element(&hash, i);
+ memcpy(&gtid_list[pos++], &e->gtid, sizeof(e->gtid));
+ }
+
+ return 0;
+}
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index fcb9f08795a..8d2c98d54d3 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -195,9 +195,11 @@ struct slave_connection_state
entry *find_entry(uint32 domain_id);
int update(const rpl_gtid *in_gtid);
void remove(const rpl_gtid *gtid);
+ void remove_if_present(const rpl_gtid *in_gtid);
ulong count() const { return hash.records; }
int to_string(String *out_str);
int append_to_string(String *out_str);
+ int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size);
};
extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid,
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 6dd757343fd..9ab5dcb30a5 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -303,6 +303,8 @@ public:
*/
char ign_master_log_name_end[FN_REFLEN];
ulonglong ign_master_log_pos_end;
+ /* Similar for ignored GTID events. */
+ slave_connection_state ign_gtids;
/*
Indentifies where the SQL Thread should create temporary files for the
diff --git a/sql/slave.cc b/sql/slave.cc
index b10d1a17c23..29eee1cf135 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -2222,34 +2222,66 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
DBUG_ASSERT(thd == mi->io_thd);
mysql_mutex_lock(log_lock);
- if (rli->ign_master_log_name_end[0])
- {
- DBUG_PRINT("info",("writing a Rotate event to track down ignored events"));
- Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
- 0, rli->ign_master_log_pos_end,
- Rotate_log_event::DUP_NAME);
- rli->ign_master_log_name_end[0]= 0;
- /* can unlock before writing as slave SQL thd will soon see our Rotate */
+ if (rli->ign_master_log_name_end[0] || rli->ign_gtids.count())
+ {
+ Rotate_log_event *rev;
+ Gtid_list_log_event *glev;
+ if (rli->ign_master_log_name_end[0])
+ {
+ rev= new Rotate_log_event(rli->ign_master_log_name_end,
+ 0, rli->ign_master_log_pos_end,
+ Rotate_log_event::DUP_NAME);
+ rli->ign_master_log_name_end[0]= 0;
+ if (unlikely(!(bool)rev))
+ mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
+ ER(ER_SLAVE_CREATE_EVENT_FAILURE),
+ "Rotate_event (out of memory?),"
+ " SHOW SLAVE STATUS may be inaccurate");
+ }
+ if (rli->ign_gtids.count())
+ {
+ glev= new Gtid_list_log_event(&rli->ign_gtids,
+ Gtid_list_log_event::FLAG_IGN_GTIDS);
+ rli->ign_gtids.reset();
+ if (unlikely(!(bool)glev))
+ mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
+ ER(ER_SLAVE_CREATE_EVENT_FAILURE),
+ "Gtid_list_event (out of memory?),"
+ " gtid_slave_pos may be inaccurate");
+ }
+
+ /* Can unlock before writing as slave SQL thd will soon see our event. */
mysql_mutex_unlock(log_lock);
- if (likely((bool)ev))
+ if (rev)
{
- ev->server_id= 0; // don't be ignored by slave SQL thread
- if (unlikely(rli->relay_log.append(ev)))
+ DBUG_PRINT("info",("writing a Rotate event to track down ignored events"));
+ rev->server_id= 0; // don't be ignored by slave SQL thread
+ if (unlikely(rli->relay_log.append(rev)))
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"failed to write a Rotate event"
" to the relay log, SHOW SLAVE STATUS may be"
" inaccurate");
+ delete rev;
+ }
+ if (glev)
+ {
+ DBUG_PRINT("info",("writing a Gtid_list event to track down ignored events"));
+ glev->server_id= 0; // don't be ignored by slave SQL thread
+ glev->set_artificial_event(); // Don't mess up Exec_Master_Log_Pos
+ if (unlikely(rli->relay_log.append(glev)))
+ mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
+ ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
+ "failed to write a Gtid_list event to the relay log, "
+ "gtid_slave_pos may be inaccurate");
+ delete glev;
+ }
+ if (likely (rev || glev))
+ {
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
if (flush_master_info(mi, TRUE, TRUE))
sql_print_error("Failed to flush master info file");
- delete ev;
}
- else
- mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
- ER(ER_SLAVE_CREATE_EVENT_FAILURE),
- "Rotate_event (out of memory?),"
- " SHOW SLAVE STATUS may be inaccurate");
}
else
mysql_mutex_unlock(log_lock);
@@ -3097,6 +3129,12 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
rli->slave_skip_counter--;
}
mysql_mutex_unlock(&rli->data_lock);
+ DBUG_EXECUTE_IF("inject_slave_sql_before_apply_event",
+ {
+ DBUG_ASSERT(!debug_sync_set_action
+ (thd, STRING_WITH_LEN("now WAIT_FOR continue")));
+ DBUG_SET_INITIAL("-d,inject_slave_sql_before_apply_event");
+ };);
if (reason == Log_event::EVENT_SKIP_NOT)
exec_res= ev->apply_event(rli);
@@ -4822,6 +4860,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
ulong s_id;
bool unlock_data_lock= TRUE;
bool gtid_skip_enqueue= false;
+ bool got_gtid_event= false;
+ rpl_gtid event_gtid;
/*
FD_q must have been prepared for the first R_a event
@@ -5140,6 +5180,14 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
{
uchar dummy_flag;
+ if (Gtid_log_event::peek(buf, event_len, checksum_alg,
+ &event_gtid.domain_id, &event_gtid.server_id,
+ &event_gtid.seq_no, &dummy_flag))
+ {
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ goto err;
+ }
+ got_gtid_event= true;
if (mi->using_gtid == Master_info::USE_GTID_NO)
goto default_action;
if (unlikely(!mi->gtid_event_seen))
@@ -5147,8 +5195,6 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mi->gtid_event_seen= true;
if (mi->gtid_reconnect_event_skip_count)
{
- rpl_gtid gtid;
-
/*
If we are reconnecting, and we need to skip a partial event group
already queued to the relay log before the reconnect, then we check
@@ -5157,21 +5203,14 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
The only way we should be able to receive a different GTID than what
we expect is if the binlog on the master (or more likely the whole
- master server) was replaced with a different one, one the same IP
+ master server) was replaced with a different one, on the same IP
address, _and_ the new master happens to have domains in a different
order so we get the GTID from a different domain first. Still, it is
best to protect against this case.
*/
- if (Gtid_log_event::peek(buf, event_len, checksum_alg,
- &gtid.domain_id, &gtid.server_id,
- &gtid.seq_no, &dummy_flag))
- {
- error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
- goto err;
- }
- if (gtid.domain_id != mi->last_queued_gtid.domain_id ||
- gtid.server_id != mi->last_queued_gtid.server_id ||
- gtid.seq_no != mi->last_queued_gtid.seq_no)
+ if (event_gtid.domain_id != mi->last_queued_gtid.domain_id ||
+ event_gtid.server_id != mi->last_queued_gtid.server_id ||
+ event_gtid.seq_no != mi->last_queued_gtid.seq_no)
{
bool first;
error= ER_SLAVE_UNEXPECTED_MASTER_SWITCH;
@@ -5181,7 +5220,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
&first);
error_msg.append(STRING_WITH_LEN(", received: "));
first= true;
- rpl_slave_state_tostring_helper(&error_msg, &gtid, &first);
+ rpl_slave_state_tostring_helper(&error_msg, &event_gtid, &first);
goto err;
}
}
@@ -5261,6 +5300,16 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mysql_mutex_lock(log_lock);
s_id= uint4korr(buf + SERVER_ID_OFFSET);
+ /*
+ Write the event to the relay log, unless we reconnected in the middle
+ of an event group and now need to skip the initial part of the group that
+ we already wrote before reconnecting.
+ */
+ if (unlikely(gtid_skip_enqueue))
+ {
+ mi->master_log_pos+= inc_pos;
+ }
+ else
if ((s_id == global_system_variables.server_id &&
!mi->rli.replicate_same_server_id) ||
/*
@@ -5303,6 +5352,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
DBUG_ASSERT(rli->ign_master_log_name_end[0]);
rli->ign_master_log_pos_end= mi->master_log_pos;
+ if (got_gtid_event)
+ rli->ign_gtids.update(&event_gtid);
}
rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
DBUG_PRINT("info", ("master_log_pos: %lu, event originating from %u server, ignored",
@@ -5310,16 +5361,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
}
else
{
- /*
- Write the event to the relay log, unless we reconnected in the middle
- of an event group and now need to skip the initial part of the group that
- we already wrote before reconnecting.
- */
- if (unlikely(gtid_skip_enqueue))
- {
- mi->master_log_pos+= inc_pos;
- }
- else if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
+ if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
{
mi->master_log_pos+= inc_pos;
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
@@ -5330,6 +5372,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
}
rli->ign_master_log_name_end[0]= 0; // last event is not ignored
+ if (got_gtid_event)
+ rli->ign_gtids.remove_if_present(&event_gtid);
if (save_buf != NULL)
buf= save_buf;
}
@@ -5932,6 +5976,25 @@ static Log_event* next_event(Relay_log_info* rli)
DBUG_RETURN(ev);
}
+ if (rli->ign_gtids.count())
+ {
+ /* We generate and return a Gtid_list, to update gtid_slave_pos. */
+ DBUG_PRINT("info",("seeing ignored end gtids"));
+ ev= new Gtid_list_log_event(&rli->ign_gtids,
+ Gtid_list_log_event::FLAG_IGN_GTIDS);
+ rli->ign_gtids.reset();
+ mysql_mutex_unlock(log_lock);
+ if (unlikely(!ev))
+ {
+ errmsg= "Slave SQL thread failed to create a Gtid_list event "
+ "(out of memory?), gtid_slave_pos may be inaccurate";
+ goto err;
+ }
+ ev->server_id= 0; // don't be ignored by slave SQL thread
+ ev->set_artificial_event(); // Don't mess up Exec_Master_Log_Pos
+ DBUG_RETURN(ev);
+ }
+
/*
We can, and should release data_lock while we are waiting for
update. If we do not, show slave status will block