diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/log_event.cc | 83 | ||||
-rw-r--r-- | sql/log_event.h | 5 | ||||
-rw-r--r-- | sql/rpl_gtid.cc | 28 | ||||
-rw-r--r-- | sql/rpl_gtid.h | 2 | ||||
-rw-r--r-- | sql/rpl_rli.h | 2 | ||||
-rw-r--r-- | sql/slave.cc | 145 |
6 files changed, 220 insertions, 45 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index 791afeb4e1d..5e30918106a 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -6334,7 +6334,7 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len, const Format_description_log_event *description_event) - : Log_event(buf, description_event), count(0), list(0) + : Log_event(buf, description_event), count(0), list(0), sub_id_list(0) { uint32 i; uint32 val; @@ -6363,6 +6363,31 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len, list[i].seq_no= uint8korr(buf); buf+= 8; } + +#ifdef MYSQL_SERVER + if ((gl_flags & FLAG_IGN_GTIDS)) + { + uint32 i; + if (!(sub_id_list= (uint64 *)my_malloc(count*sizeof(uint64), MYF(MY_WME)))) + { + my_free(list); + list= NULL; + return; + } + for (i= 0; i < count; ++i) + { + if (!(sub_id_list[i]= + rpl_global_gtid_slave_state.next_sub_id(list[i].domain_id))) + { + my_free(list); + my_free(sub_id_list); + list= NULL; + sub_id_list= NULL; + return; + } + } + } +#endif } @@ -6370,7 +6395,7 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len, Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set, uint32 gl_flags_) - : count(gtid_set->count()), gl_flags(gl_flags_), list(0) + : count(gtid_set->count()), gl_flags(gl_flags_), list(0), sub_id_list(0) { cache_type= EVENT_NO_CACHE; /* Failure to allocate memory will be caught by is_valid() returning false. */ @@ -6381,6 +6406,45 @@ Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set, } +Gtid_list_log_event::Gtid_list_log_event(slave_connection_state *gtid_set, + uint32 gl_flags_) + : count(gtid_set->count()), gl_flags(gl_flags_), list(0), sub_id_list(0) +{ + cache_type= EVENT_NO_CACHE; + /* Failure to allocate memory will be caught by is_valid() returning false. */ + if (count < (1<<28) && + (list = (rpl_gtid *)my_malloc(count * sizeof(*list) + (count == 0), + MYF(MY_WME)))) + { + gtid_set->get_gtid_list(list, count); + if (gl_flags & FLAG_IGN_GTIDS) + { + uint32 i; + + if (!(sub_id_list= (uint64 *)my_malloc(count * sizeof(uint64), + MYF(MY_WME)))) + { + my_free(list); + list= NULL; + return; + } + for (i= 0; i < count; ++i) + { + if (!(sub_id_list[i]= + rpl_global_gtid_slave_state.next_sub_id(list[i].domain_id))) + { + my_free(list); + my_free(sub_id_list); + list= NULL; + sub_id_list= NULL; + return; + } + } + } + } +} + + #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) bool Gtid_list_log_event::to_packet(String *packet) @@ -6432,7 +6496,20 @@ Gtid_list_log_event::write(IO_CACHE *file) int Gtid_list_log_event::do_apply_event(Relay_log_info const *rli) { - int ret= Log_event::do_apply_event(rli); + int ret; + if (gl_flags & FLAG_IGN_GTIDS) + { + uint32 i; + for (i= 0; i < count; ++i) + { + if ((ret= rpl_global_gtid_slave_state.record_gtid(thd, &list[i], + sub_id_list[i], + false, false))) + return ret; + rpl_global_gtid_slave_state.update_state_hash(sub_id_list[i], &list[i]); + } + } + ret= Log_event::do_apply_event(rli); if (rli->until_condition == Relay_log_info::UNTIL_GTID && (gl_flags & FLAG_UNTIL_REACHED)) { diff --git a/sql/log_event.h b/sql/log_event.h index b73c0e71f77..abb3b96bac4 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -3197,12 +3197,15 @@ public: uint32 count; uint32 gl_flags; struct rpl_gtid *list; + uint64 *sub_id_list; static const uint element_size= 4+4+8; static const uint32 FLAG_UNTIL_REACHED= (1<<28); + static const uint32 FLAG_IGN_GTIDS= (1<<29); #ifdef MYSQL_SERVER Gtid_list_log_event(rpl_binlog_state *gtid_set, uint32 gl_flags); + Gtid_list_log_event(slave_connection_state *gtid_set, uint32 gl_flags); #ifdef HAVE_REPLICATION void pack_info(THD *thd, Protocol *protocol); #endif @@ -3211,7 +3214,7 @@ public: #endif Gtid_list_log_event(const char *buf, uint event_len, const Format_description_log_event *description_event); - ~Gtid_list_log_event() { my_free(list); } + ~Gtid_list_log_event() { my_free(list); my_free(sub_id_list); } Log_event_type get_type_code() { return GTID_LIST_EVENT; } int get_data_size() { /* diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index c8a00cd78ae..96dade4d390 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -1419,6 +1419,15 @@ slave_connection_state::remove(const rpl_gtid *in_gtid) } +void +slave_connection_state::remove_if_present(const rpl_gtid *in_gtid) +{ + uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0); + if (rec) + my_hash_delete(&hash, rec); +} + + int slave_connection_state::to_string(String *out_str) { @@ -1442,3 +1451,22 @@ slave_connection_state::append_to_string(String *out_str) } return 0; } + + +int +slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size) +{ + uint32 i, pos; + + pos= 0; + for (i= 0; i < hash.records; ++i) + { + entry *e; + if (pos >= list_size) + return 1; + e= (entry *)my_hash_element(&hash, i); + memcpy(>id_list[pos++], &e->gtid, sizeof(e->gtid)); + } + + return 0; +} diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h index fcb9f08795a..8d2c98d54d3 100644 --- a/sql/rpl_gtid.h +++ b/sql/rpl_gtid.h @@ -195,9 +195,11 @@ struct slave_connection_state entry *find_entry(uint32 domain_id); int update(const rpl_gtid *in_gtid); void remove(const rpl_gtid *gtid); + void remove_if_present(const rpl_gtid *in_gtid); ulong count() const { return hash.records; } int to_string(String *out_str); int append_to_string(String *out_str); + int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size); }; extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 6dd757343fd..9ab5dcb30a5 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -303,6 +303,8 @@ public: */ char ign_master_log_name_end[FN_REFLEN]; ulonglong ign_master_log_pos_end; + /* Similar for ignored GTID events. */ + slave_connection_state ign_gtids; /* Indentifies where the SQL Thread should create temporary files for the diff --git a/sql/slave.cc b/sql/slave.cc index b10d1a17c23..29eee1cf135 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -2222,34 +2222,66 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi) DBUG_ASSERT(thd == mi->io_thd); mysql_mutex_lock(log_lock); - if (rli->ign_master_log_name_end[0]) - { - DBUG_PRINT("info",("writing a Rotate event to track down ignored events")); - Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end, - 0, rli->ign_master_log_pos_end, - Rotate_log_event::DUP_NAME); - rli->ign_master_log_name_end[0]= 0; - /* can unlock before writing as slave SQL thd will soon see our Rotate */ + if (rli->ign_master_log_name_end[0] || rli->ign_gtids.count()) + { + Rotate_log_event *rev; + Gtid_list_log_event *glev; + if (rli->ign_master_log_name_end[0]) + { + rev= new Rotate_log_event(rli->ign_master_log_name_end, + 0, rli->ign_master_log_pos_end, + Rotate_log_event::DUP_NAME); + rli->ign_master_log_name_end[0]= 0; + if (unlikely(!(bool)rev)) + mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE, + ER(ER_SLAVE_CREATE_EVENT_FAILURE), + "Rotate_event (out of memory?)," + " SHOW SLAVE STATUS may be inaccurate"); + } + if (rli->ign_gtids.count()) + { + glev= new Gtid_list_log_event(&rli->ign_gtids, + Gtid_list_log_event::FLAG_IGN_GTIDS); + rli->ign_gtids.reset(); + if (unlikely(!(bool)glev)) + mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE, + ER(ER_SLAVE_CREATE_EVENT_FAILURE), + "Gtid_list_event (out of memory?)," + " gtid_slave_pos may be inaccurate"); + } + + /* Can unlock before writing as slave SQL thd will soon see our event. */ mysql_mutex_unlock(log_lock); - if (likely((bool)ev)) + if (rev) { - ev->server_id= 0; // don't be ignored by slave SQL thread - if (unlikely(rli->relay_log.append(ev))) + DBUG_PRINT("info",("writing a Rotate event to track down ignored events")); + rev->server_id= 0; // don't be ignored by slave SQL thread + if (unlikely(rli->relay_log.append(rev))) mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE), "failed to write a Rotate event" " to the relay log, SHOW SLAVE STATUS may be" " inaccurate"); + delete rev; + } + if (glev) + { + DBUG_PRINT("info",("writing a Gtid_list event to track down ignored events")); + glev->server_id= 0; // don't be ignored by slave SQL thread + glev->set_artificial_event(); // Don't mess up Exec_Master_Log_Pos + if (unlikely(rli->relay_log.append(glev))) + mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, + ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE), + "failed to write a Gtid_list event to the relay log, " + "gtid_slave_pos may be inaccurate"); + delete glev; + } + if (likely (rev || glev)) + { rli->relay_log.harvest_bytes_written(&rli->log_space_total); if (flush_master_info(mi, TRUE, TRUE)) sql_print_error("Failed to flush master info file"); - delete ev; } - else - mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE, - ER(ER_SLAVE_CREATE_EVENT_FAILURE), - "Rotate_event (out of memory?)," - " SHOW SLAVE STATUS may be inaccurate"); } else mysql_mutex_unlock(log_lock); @@ -3097,6 +3129,12 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) rli->slave_skip_counter--; } mysql_mutex_unlock(&rli->data_lock); + DBUG_EXECUTE_IF("inject_slave_sql_before_apply_event", + { + DBUG_ASSERT(!debug_sync_set_action + (thd, STRING_WITH_LEN("now WAIT_FOR continue"))); + DBUG_SET_INITIAL("-d,inject_slave_sql_before_apply_event"); + };); if (reason == Log_event::EVENT_SKIP_NOT) exec_res= ev->apply_event(rli); @@ -4822,6 +4860,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ulong s_id; bool unlock_data_lock= TRUE; bool gtid_skip_enqueue= false; + bool got_gtid_event= false; + rpl_gtid event_gtid; /* FD_q must have been prepared for the first R_a event @@ -5140,6 +5180,14 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) { uchar dummy_flag; + if (Gtid_log_event::peek(buf, event_len, checksum_alg, + &event_gtid.domain_id, &event_gtid.server_id, + &event_gtid.seq_no, &dummy_flag)) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; + } + got_gtid_event= true; if (mi->using_gtid == Master_info::USE_GTID_NO) goto default_action; if (unlikely(!mi->gtid_event_seen)) @@ -5147,8 +5195,6 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mi->gtid_event_seen= true; if (mi->gtid_reconnect_event_skip_count) { - rpl_gtid gtid; - /* If we are reconnecting, and we need to skip a partial event group already queued to the relay log before the reconnect, then we check @@ -5157,21 +5203,14 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) The only way we should be able to receive a different GTID than what we expect is if the binlog on the master (or more likely the whole - master server) was replaced with a different one, one the same IP + master server) was replaced with a different one, on the same IP address, _and_ the new master happens to have domains in a different order so we get the GTID from a different domain first. Still, it is best to protect against this case. */ - if (Gtid_log_event::peek(buf, event_len, checksum_alg, - >id.domain_id, >id.server_id, - >id.seq_no, &dummy_flag)) - { - error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; - goto err; - } - if (gtid.domain_id != mi->last_queued_gtid.domain_id || - gtid.server_id != mi->last_queued_gtid.server_id || - gtid.seq_no != mi->last_queued_gtid.seq_no) + if (event_gtid.domain_id != mi->last_queued_gtid.domain_id || + event_gtid.server_id != mi->last_queued_gtid.server_id || + event_gtid.seq_no != mi->last_queued_gtid.seq_no) { bool first; error= ER_SLAVE_UNEXPECTED_MASTER_SWITCH; @@ -5181,7 +5220,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) &first); error_msg.append(STRING_WITH_LEN(", received: ")); first= true; - rpl_slave_state_tostring_helper(&error_msg, >id, &first); + rpl_slave_state_tostring_helper(&error_msg, &event_gtid, &first); goto err; } } @@ -5261,6 +5300,16 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mysql_mutex_lock(log_lock); s_id= uint4korr(buf + SERVER_ID_OFFSET); + /* + Write the event to the relay log, unless we reconnected in the middle + of an event group and now need to skip the initial part of the group that + we already wrote before reconnecting. + */ + if (unlikely(gtid_skip_enqueue)) + { + mi->master_log_pos+= inc_pos; + } + else if ((s_id == global_system_variables.server_id && !mi->rli.replicate_same_server_id) || /* @@ -5303,6 +5352,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN); DBUG_ASSERT(rli->ign_master_log_name_end[0]); rli->ign_master_log_pos_end= mi->master_log_pos; + if (got_gtid_event) + rli->ign_gtids.update(&event_gtid); } rli->relay_log.signal_update(); // the slave SQL thread needs to re-check DBUG_PRINT("info", ("master_log_pos: %lu, event originating from %u server, ignored", @@ -5310,16 +5361,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } else { - /* - Write the event to the relay log, unless we reconnected in the middle - of an event group and now need to skip the initial part of the group that - we already wrote before reconnecting. - */ - if (unlikely(gtid_skip_enqueue)) - { - mi->master_log_pos+= inc_pos; - } - else if (likely(!(rli->relay_log.appendv(buf,event_len,0)))) + if (likely(!(rli->relay_log.appendv(buf,event_len,0)))) { mi->master_log_pos+= inc_pos; DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos)); @@ -5330,6 +5372,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; } rli->ign_master_log_name_end[0]= 0; // last event is not ignored + if (got_gtid_event) + rli->ign_gtids.remove_if_present(&event_gtid); if (save_buf != NULL) buf= save_buf; } @@ -5932,6 +5976,25 @@ static Log_event* next_event(Relay_log_info* rli) DBUG_RETURN(ev); } + if (rli->ign_gtids.count()) + { + /* We generate and return a Gtid_list, to update gtid_slave_pos. */ + DBUG_PRINT("info",("seeing ignored end gtids")); + ev= new Gtid_list_log_event(&rli->ign_gtids, + Gtid_list_log_event::FLAG_IGN_GTIDS); + rli->ign_gtids.reset(); + mysql_mutex_unlock(log_lock); + if (unlikely(!ev)) + { + errmsg= "Slave SQL thread failed to create a Gtid_list event " + "(out of memory?), gtid_slave_pos may be inaccurate"; + goto err; + } + ev->server_id= 0; // don't be ignored by slave SQL thread + ev->set_artificial_event(); // Don't mess up Exec_Master_Log_Pos + DBUG_RETURN(ev); + } + /* We can, and should release data_lock while we are waiting for update. If we do not, show slave status will block |