summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/log_event.cc83
-rw-r--r--sql/log_event.h5
-rw-r--r--sql/rpl_gtid.cc28
-rw-r--r--sql/rpl_gtid.h2
-rw-r--r--sql/rpl_rli.h2
-rw-r--r--sql/slave.cc145
6 files changed, 220 insertions, 45 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 791afeb4e1d..5e30918106a 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -6334,7 +6334,7 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
const Format_description_log_event *description_event)
- : Log_event(buf, description_event), count(0), list(0)
+ : Log_event(buf, description_event), count(0), list(0), sub_id_list(0)
{
uint32 i;
uint32 val;
@@ -6363,6 +6363,31 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
list[i].seq_no= uint8korr(buf);
buf+= 8;
}
+
+#ifdef MYSQL_SERVER
+ if ((gl_flags & FLAG_IGN_GTIDS))
+ {
+ uint32 i;
+ if (!(sub_id_list= (uint64 *)my_malloc(count*sizeof(uint64), MYF(MY_WME))))
+ {
+ my_free(list);
+ list= NULL;
+ return;
+ }
+ for (i= 0; i < count; ++i)
+ {
+ if (!(sub_id_list[i]=
+ rpl_global_gtid_slave_state.next_sub_id(list[i].domain_id)))
+ {
+ my_free(list);
+ my_free(sub_id_list);
+ list= NULL;
+ sub_id_list= NULL;
+ return;
+ }
+ }
+ }
+#endif
}
@@ -6370,7 +6395,7 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set,
uint32 gl_flags_)
- : count(gtid_set->count()), gl_flags(gl_flags_), list(0)
+ : count(gtid_set->count()), gl_flags(gl_flags_), list(0), sub_id_list(0)
{
cache_type= EVENT_NO_CACHE;
/* Failure to allocate memory will be caught by is_valid() returning false. */
@@ -6381,6 +6406,45 @@ Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set,
}
+Gtid_list_log_event::Gtid_list_log_event(slave_connection_state *gtid_set,
+ uint32 gl_flags_)
+ : count(gtid_set->count()), gl_flags(gl_flags_), list(0), sub_id_list(0)
+{
+ cache_type= EVENT_NO_CACHE;
+ /* Failure to allocate memory will be caught by is_valid() returning false. */
+ if (count < (1<<28) &&
+ (list = (rpl_gtid *)my_malloc(count * sizeof(*list) + (count == 0),
+ MYF(MY_WME))))
+ {
+ gtid_set->get_gtid_list(list, count);
+ if (gl_flags & FLAG_IGN_GTIDS)
+ {
+ uint32 i;
+
+ if (!(sub_id_list= (uint64 *)my_malloc(count * sizeof(uint64),
+ MYF(MY_WME))))
+ {
+ my_free(list);
+ list= NULL;
+ return;
+ }
+ for (i= 0; i < count; ++i)
+ {
+ if (!(sub_id_list[i]=
+ rpl_global_gtid_slave_state.next_sub_id(list[i].domain_id)))
+ {
+ my_free(list);
+ my_free(sub_id_list);
+ list= NULL;
+ sub_id_list= NULL;
+ return;
+ }
+ }
+ }
+ }
+}
+
+
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
bool
Gtid_list_log_event::to_packet(String *packet)
@@ -6432,7 +6496,20 @@ Gtid_list_log_event::write(IO_CACHE *file)
int
Gtid_list_log_event::do_apply_event(Relay_log_info const *rli)
{
- int ret= Log_event::do_apply_event(rli);
+ int ret;
+ if (gl_flags & FLAG_IGN_GTIDS)
+ {
+ uint32 i;
+ for (i= 0; i < count; ++i)
+ {
+ if ((ret= rpl_global_gtid_slave_state.record_gtid(thd, &list[i],
+ sub_id_list[i],
+ false, false)))
+ return ret;
+ rpl_global_gtid_slave_state.update_state_hash(sub_id_list[i], &list[i]);
+ }
+ }
+ ret= Log_event::do_apply_event(rli);
if (rli->until_condition == Relay_log_info::UNTIL_GTID &&
(gl_flags & FLAG_UNTIL_REACHED))
{
diff --git a/sql/log_event.h b/sql/log_event.h
index b73c0e71f77..abb3b96bac4 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -3197,12 +3197,15 @@ public:
uint32 count;
uint32 gl_flags;
struct rpl_gtid *list;
+ uint64 *sub_id_list;
static const uint element_size= 4+4+8;
static const uint32 FLAG_UNTIL_REACHED= (1<<28);
+ static const uint32 FLAG_IGN_GTIDS= (1<<29);
#ifdef MYSQL_SERVER
Gtid_list_log_event(rpl_binlog_state *gtid_set, uint32 gl_flags);
+ Gtid_list_log_event(slave_connection_state *gtid_set, uint32 gl_flags);
#ifdef HAVE_REPLICATION
void pack_info(THD *thd, Protocol *protocol);
#endif
@@ -3211,7 +3214,7 @@ public:
#endif
Gtid_list_log_event(const char *buf, uint event_len,
const Format_description_log_event *description_event);
- ~Gtid_list_log_event() { my_free(list); }
+ ~Gtid_list_log_event() { my_free(list); my_free(sub_id_list); }
Log_event_type get_type_code() { return GTID_LIST_EVENT; }
int get_data_size() {
/*
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index c8a00cd78ae..96dade4d390 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -1419,6 +1419,15 @@ slave_connection_state::remove(const rpl_gtid *in_gtid)
}
+void
+slave_connection_state::remove_if_present(const rpl_gtid *in_gtid)
+{
+ uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
+ if (rec)
+ my_hash_delete(&hash, rec);
+}
+
+
int
slave_connection_state::to_string(String *out_str)
{
@@ -1442,3 +1451,22 @@ slave_connection_state::append_to_string(String *out_str)
}
return 0;
}
+
+
+int
+slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
+{
+ uint32 i, pos;
+
+ pos= 0;
+ for (i= 0; i < hash.records; ++i)
+ {
+ entry *e;
+ if (pos >= list_size)
+ return 1;
+ e= (entry *)my_hash_element(&hash, i);
+ memcpy(&gtid_list[pos++], &e->gtid, sizeof(e->gtid));
+ }
+
+ return 0;
+}
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index fcb9f08795a..8d2c98d54d3 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -195,9 +195,11 @@ struct slave_connection_state
entry *find_entry(uint32 domain_id);
int update(const rpl_gtid *in_gtid);
void remove(const rpl_gtid *gtid);
+ void remove_if_present(const rpl_gtid *in_gtid);
ulong count() const { return hash.records; }
int to_string(String *out_str);
int append_to_string(String *out_str);
+ int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size);
};
extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid,
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 6dd757343fd..9ab5dcb30a5 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -303,6 +303,8 @@ public:
*/
char ign_master_log_name_end[FN_REFLEN];
ulonglong ign_master_log_pos_end;
+ /* Similar for ignored GTID events. */
+ slave_connection_state ign_gtids;
/*
Indentifies where the SQL Thread should create temporary files for the
diff --git a/sql/slave.cc b/sql/slave.cc
index b10d1a17c23..29eee1cf135 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -2222,34 +2222,66 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
DBUG_ASSERT(thd == mi->io_thd);
mysql_mutex_lock(log_lock);
- if (rli->ign_master_log_name_end[0])
- {
- DBUG_PRINT("info",("writing a Rotate event to track down ignored events"));
- Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
- 0, rli->ign_master_log_pos_end,
- Rotate_log_event::DUP_NAME);
- rli->ign_master_log_name_end[0]= 0;
- /* can unlock before writing as slave SQL thd will soon see our Rotate */
+ if (rli->ign_master_log_name_end[0] || rli->ign_gtids.count())
+ {
+ Rotate_log_event *rev;
+ Gtid_list_log_event *glev;
+ if (rli->ign_master_log_name_end[0])
+ {
+ rev= new Rotate_log_event(rli->ign_master_log_name_end,
+ 0, rli->ign_master_log_pos_end,
+ Rotate_log_event::DUP_NAME);
+ rli->ign_master_log_name_end[0]= 0;
+ if (unlikely(!(bool)rev))
+ mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
+ ER(ER_SLAVE_CREATE_EVENT_FAILURE),
+ "Rotate_event (out of memory?),"
+ " SHOW SLAVE STATUS may be inaccurate");
+ }
+ if (rli->ign_gtids.count())
+ {
+ glev= new Gtid_list_log_event(&rli->ign_gtids,
+ Gtid_list_log_event::FLAG_IGN_GTIDS);
+ rli->ign_gtids.reset();
+ if (unlikely(!(bool)glev))
+ mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
+ ER(ER_SLAVE_CREATE_EVENT_FAILURE),
+ "Gtid_list_event (out of memory?),"
+ " gtid_slave_pos may be inaccurate");
+ }
+
+ /* Can unlock before writing as slave SQL thd will soon see our event. */
mysql_mutex_unlock(log_lock);
- if (likely((bool)ev))
+ if (rev)
{
- ev->server_id= 0; // don't be ignored by slave SQL thread
- if (unlikely(rli->relay_log.append(ev)))
+ DBUG_PRINT("info",("writing a Rotate event to track down ignored events"));
+ rev->server_id= 0; // don't be ignored by slave SQL thread
+ if (unlikely(rli->relay_log.append(rev)))
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"failed to write a Rotate event"
" to the relay log, SHOW SLAVE STATUS may be"
" inaccurate");
+ delete rev;
+ }
+ if (glev)
+ {
+ DBUG_PRINT("info",("writing a Gtid_list event to track down ignored events"));
+ glev->server_id= 0; // don't be ignored by slave SQL thread
+ glev->set_artificial_event(); // Don't mess up Exec_Master_Log_Pos
+ if (unlikely(rli->relay_log.append(glev)))
+ mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
+ ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
+ "failed to write a Gtid_list event to the relay log, "
+ "gtid_slave_pos may be inaccurate");
+ delete glev;
+ }
+ if (likely (rev || glev))
+ {
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
if (flush_master_info(mi, TRUE, TRUE))
sql_print_error("Failed to flush master info file");
- delete ev;
}
- else
- mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
- ER(ER_SLAVE_CREATE_EVENT_FAILURE),
- "Rotate_event (out of memory?),"
- " SHOW SLAVE STATUS may be inaccurate");
}
else
mysql_mutex_unlock(log_lock);
@@ -3097,6 +3129,12 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
rli->slave_skip_counter--;
}
mysql_mutex_unlock(&rli->data_lock);
+ DBUG_EXECUTE_IF("inject_slave_sql_before_apply_event",
+ {
+ DBUG_ASSERT(!debug_sync_set_action
+ (thd, STRING_WITH_LEN("now WAIT_FOR continue")));
+ DBUG_SET_INITIAL("-d,inject_slave_sql_before_apply_event");
+ };);
if (reason == Log_event::EVENT_SKIP_NOT)
exec_res= ev->apply_event(rli);
@@ -4822,6 +4860,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
ulong s_id;
bool unlock_data_lock= TRUE;
bool gtid_skip_enqueue= false;
+ bool got_gtid_event= false;
+ rpl_gtid event_gtid;
/*
FD_q must have been prepared for the first R_a event
@@ -5140,6 +5180,14 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
{
uchar dummy_flag;
+ if (Gtid_log_event::peek(buf, event_len, checksum_alg,
+ &event_gtid.domain_id, &event_gtid.server_id,
+ &event_gtid.seq_no, &dummy_flag))
+ {
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ goto err;
+ }
+ got_gtid_event= true;
if (mi->using_gtid == Master_info::USE_GTID_NO)
goto default_action;
if (unlikely(!mi->gtid_event_seen))
@@ -5147,8 +5195,6 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mi->gtid_event_seen= true;
if (mi->gtid_reconnect_event_skip_count)
{
- rpl_gtid gtid;
-
/*
If we are reconnecting, and we need to skip a partial event group
already queued to the relay log before the reconnect, then we check
@@ -5157,21 +5203,14 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
The only way we should be able to receive a different GTID than what
we expect is if the binlog on the master (or more likely the whole
- master server) was replaced with a different one, one the same IP
+ master server) was replaced with a different one, on the same IP
address, _and_ the new master happens to have domains in a different
order so we get the GTID from a different domain first. Still, it is
best to protect against this case.
*/
- if (Gtid_log_event::peek(buf, event_len, checksum_alg,
- &gtid.domain_id, &gtid.server_id,
- &gtid.seq_no, &dummy_flag))
- {
- error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
- goto err;
- }
- if (gtid.domain_id != mi->last_queued_gtid.domain_id ||
- gtid.server_id != mi->last_queued_gtid.server_id ||
- gtid.seq_no != mi->last_queued_gtid.seq_no)
+ if (event_gtid.domain_id != mi->last_queued_gtid.domain_id ||
+ event_gtid.server_id != mi->last_queued_gtid.server_id ||
+ event_gtid.seq_no != mi->last_queued_gtid.seq_no)
{
bool first;
error= ER_SLAVE_UNEXPECTED_MASTER_SWITCH;
@@ -5181,7 +5220,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
&first);
error_msg.append(STRING_WITH_LEN(", received: "));
first= true;
- rpl_slave_state_tostring_helper(&error_msg, &gtid, &first);
+ rpl_slave_state_tostring_helper(&error_msg, &event_gtid, &first);
goto err;
}
}
@@ -5261,6 +5300,16 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mysql_mutex_lock(log_lock);
s_id= uint4korr(buf + SERVER_ID_OFFSET);
+ /*
+ Write the event to the relay log, unless we reconnected in the middle
+ of an event group and now need to skip the initial part of the group that
+ we already wrote before reconnecting.
+ */
+ if (unlikely(gtid_skip_enqueue))
+ {
+ mi->master_log_pos+= inc_pos;
+ }
+ else
if ((s_id == global_system_variables.server_id &&
!mi->rli.replicate_same_server_id) ||
/*
@@ -5303,6 +5352,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
DBUG_ASSERT(rli->ign_master_log_name_end[0]);
rli->ign_master_log_pos_end= mi->master_log_pos;
+ if (got_gtid_event)
+ rli->ign_gtids.update(&event_gtid);
}
rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
DBUG_PRINT("info", ("master_log_pos: %lu, event originating from %u server, ignored",
@@ -5310,16 +5361,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
}
else
{
- /*
- Write the event to the relay log, unless we reconnected in the middle
- of an event group and now need to skip the initial part of the group that
- we already wrote before reconnecting.
- */
- if (unlikely(gtid_skip_enqueue))
- {
- mi->master_log_pos+= inc_pos;
- }
- else if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
+ if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
{
mi->master_log_pos+= inc_pos;
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
@@ -5330,6 +5372,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
}
rli->ign_master_log_name_end[0]= 0; // last event is not ignored
+ if (got_gtid_event)
+ rli->ign_gtids.remove_if_present(&event_gtid);
if (save_buf != NULL)
buf= save_buf;
}
@@ -5932,6 +5976,25 @@ static Log_event* next_event(Relay_log_info* rli)
DBUG_RETURN(ev);
}
+ if (rli->ign_gtids.count())
+ {
+ /* We generate and return a Gtid_list, to update gtid_slave_pos. */
+ DBUG_PRINT("info",("seeing ignored end gtids"));
+ ev= new Gtid_list_log_event(&rli->ign_gtids,
+ Gtid_list_log_event::FLAG_IGN_GTIDS);
+ rli->ign_gtids.reset();
+ mysql_mutex_unlock(log_lock);
+ if (unlikely(!ev))
+ {
+ errmsg= "Slave SQL thread failed to create a Gtid_list event "
+ "(out of memory?), gtid_slave_pos may be inaccurate";
+ goto err;
+ }
+ ev->server_id= 0; // don't be ignored by slave SQL thread
+ ev->set_artificial_event(); // Don't mess up Exec_Master_Log_Pos
+ DBUG_RETURN(ev);
+ }
+
/*
We can, and should release data_lock while we are waiting for
update. If we do not, show slave status will block