diff options
author | Sergei Golubchik <serg@mariadb.org> | 2022-05-11 12:21:36 +0200 |
---|---|---|
committer | Sergei Golubchik <serg@mariadb.org> | 2022-05-11 12:21:36 +0200 |
commit | 443c2a715d36968ab55be3b6f4a785cd238ce99c (patch) | |
tree | 2e5b36c7f92ad77777079643e1c4331be0561f85 /sql/slave.cc | |
parent | bde0b31dc59bd1f171f3e62efb5443382c94c141 (diff) | |
parent | fd132be117e44318de04973e8bc825651b220792 (diff) | |
download | mariadb-git-443c2a715d36968ab55be3b6f4a785cd238ce99c.tar.gz |
Merge branch '10.7' into 10.8
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 167 |
1 files changed, 147 insertions, 20 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 23a35c5805f..18262c71e87 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3307,9 +3307,9 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, protocol->store((uint32) mi->master_id); // SQL_Delay // Master_Ssl_Crl - protocol->store_string_or_null(mi->ssl_ca, &my_charset_bin); + protocol->store_string_or_null(mi->ssl_crl, &my_charset_bin); // Master_Ssl_Crlpath - protocol->store_string_or_null(mi->ssl_capath, &my_charset_bin); + protocol->store_string_or_null(mi->ssl_crlpath, &my_charset_bin); // Using_Gtid protocol->store_string_or_null(mi->using_gtid_astext(mi->using_gtid), &my_charset_bin); @@ -3402,7 +3402,7 @@ bool show_all_master_info(THD* thd) String gtid_pos; Master_info **tmp; List<Item> field_list; - DBUG_ENTER("show_master_info"); + DBUG_ENTER("show_all_master_info"); mysql_mutex_assert_owner(&LOCK_active_mi); gtid_pos.length(0); @@ -4991,6 +4991,7 @@ Stopping slave I/O thread due to out-of-memory error from master"); not cause the slave IO thread to stop, and the error messages are already reported. */ + DBUG_EXECUTE_IF("simulate_delay_semisync_slave_reply", my_sleep(800000);); (void)repl_semisync_slave.slave_reply(mi); } @@ -6777,17 +6778,75 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len) mi->gtid_event_seen= true; /* - We have successfully queued to relay log everything before this GTID, so + Unless the previous group is malformed, + we have successfully queued to relay log everything before this GTID, so in case of reconnect we can start from after any previous GTID. - (Normally we would have updated gtid_current_pos earlier at the end of - the previous event group, but better leave an extra check here for - safety). + (We must have updated gtid_current_pos earlier at the end of + the previous event group. Unless ...) */ - if (mi->events_queued_since_last_gtid) + if (unlikely(mi->events_queued_since_last_gtid > 0)) { - mi->gtid_current_pos.update(&mi->last_queued_gtid); - mi->events_queued_since_last_gtid= 0; + /* + ...unless the last group has not been completed. An assert below + can be satisfied only with the strict mode that ensures + against "genuine" gtid duplicates. + */ + rpl_gtid *gtid_in_slave_state= + mi->gtid_current_pos.find(mi->last_queued_gtid.domain_id); + + // Slave gtid state must not have updated yet to the last received gtid. + DBUG_ASSERT((mi->using_gtid == Master_info::USE_GTID_NO || + !opt_gtid_strict_mode) || + (!gtid_in_slave_state || + !(*gtid_in_slave_state == mi->last_queued_gtid))); + + DBUG_EXECUTE_IF("slave_discard_xid_for_gtid_0_x_1000", + { + /* Inject an event group that is missing its XID commit event. */ + if ((mi->last_queued_gtid.domain_id == 0 && + mi->last_queued_gtid.seq_no == 1000) || + (mi->last_queued_gtid.domain_id == 1 && + mi->last_queued_gtid.seq_no == 32)) + { + sql_print_warning( + "Unexpected break of being relay-logged GTID %u-%u-%llu " + "event group by the current GTID event %u-%u-%llu", + PARAM_GTID(mi->last_queued_gtid),PARAM_GTID(event_gtid)); + DBUG_SET("-d,slave_discard_xid_for_gtid_0_x_1000"); + goto dbug_gtid_accept; + } + }); + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + sql_print_error("Unexpected break of being relay-logged GTID %u-%u-%llu " + "event group by the current GTID event %u-%u-%llu", + PARAM_GTID(mi->last_queued_gtid),PARAM_GTID(event_gtid)); + goto err; + } + else if (unlikely(mi->gtid_reconnect_event_skip_count > 0)) + { + if (mi->gtid_reconnect_event_skip_count == + mi->events_queued_since_last_gtid) + { + DBUG_ASSERT(event_gtid == mi->last_queued_gtid); + + goto default_action; + } + + DBUG_ASSERT(0); } + // else_likely{... +#ifndef DBUG_OFF +dbug_gtid_accept: + DBUG_EXECUTE_IF("slave_discard_gtid_0_x_1002", + { + if (mi->last_queued_gtid.server_id == 27697 && + mi->last_queued_gtid.seq_no == 1002) + { + DBUG_SET("-d,slave_discard_gtid_0_x_1002"); + goto skip_relay_logging; + } + }); +#endif mi->last_queued_gtid= event_gtid; mi->last_queued_gtid_standalone= (gtid_flag & Gtid_log_event::FL_STANDALONE) != 0; @@ -6797,7 +6856,7 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len) ++mi->events_queued_since_last_gtid; inc_pos= event_len; - + // ...} eof else_likely } break; /* @@ -6877,6 +6936,12 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len) case XID_EVENT: DBUG_EXECUTE_IF("slave_discard_xid_for_gtid_0_x_1000", { + if (mi->last_queued_gtid.server_id == 27697 && + mi->last_queued_gtid.seq_no == 1000) + { + DBUG_SET("-d,slave_discard_xid_for_gtid_0_x_1000"); + goto skip_relay_logging; + } /* Inject an event group that is missing its XID commit event. */ if (mi->last_queued_gtid.domain_id == 0 && mi->last_queued_gtid.seq_no == 1000) @@ -6923,15 +6988,48 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len) } };); - if (mi->using_gtid != Master_info::USE_GTID_NO && mi->gtid_event_seen) + if (mi->using_gtid != Master_info::USE_GTID_NO) { - if (unlikely(mi->gtid_reconnect_event_skip_count)) + if (likely(mi->gtid_event_seen)) { - --mi->gtid_reconnect_event_skip_count; - gtid_skip_enqueue= true; + if (unlikely(mi->gtid_reconnect_event_skip_count)) + { + if (!got_gtid_event && + mi->gtid_reconnect_event_skip_count == + mi->events_queued_since_last_gtid) + goto gtid_not_start; // the 1st re-sent must be gtid + + --mi->gtid_reconnect_event_skip_count; + gtid_skip_enqueue= true; + } + else if (likely(mi->events_queued_since_last_gtid)) + { + DBUG_ASSERT(!got_gtid_event); + + ++mi->events_queued_since_last_gtid; + } + else if (Log_event::is_group_event((Log_event_type) (uchar) + buf[EVENT_TYPE_OFFSET])) + { + goto gtid_not_start; // no first gtid event in this group + } + } + else if (Log_event::is_group_event((Log_event_type) (uchar) + buf[EVENT_TYPE_OFFSET])) + { + gtid_not_start: + + DBUG_ASSERT(!got_gtid_event); + + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + sql_print_error("The current group of events starts with " + "a non-GTID %s event; " + "the last seen GTID is %u-%u-%llu", + Log_event::get_type_str((Log_event_type) (uchar) + buf[EVENT_TYPE_OFFSET]), + mi->last_queued_gtid); + goto err; } - else if (mi->events_queued_since_last_gtid) - ++mi->events_queued_since_last_gtid; } if (!is_compress_event) @@ -7142,11 +7240,13 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len) The whole of the current event group is queued. So in case of reconnect we can start from after the current GTID. */ - if (mi->gtid_reconnect_event_skip_count) + if (gtid_skip_enqueue) { bool first= true; StringBuffer<1024> gtid_text; + DBUG_ASSERT(mi->events_queued_since_last_gtid > 1); + rpl_slave_state_tostring_helper(>id_text, &mi->last_queued_gtid, &first); sql_print_error("Slave IO thread received a terminal event from " @@ -7156,12 +7256,39 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len) gtid_text.ptr(), mi->gtid_reconnect_event_skip_count, mi->events_queued_since_last_gtid); + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; } mi->gtid_current_pos.update(&mi->last_queued_gtid); mi->events_queued_since_last_gtid= 0; - /* Reset the domain_id_filter flag. */ - mi->domain_id_filter.reset_filter(); + if (unlikely(gtid_skip_enqueue)) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + sql_print_error("Recieved a group closing %s event " + "at %llu position in the group while there are " + "still %llu events to skip upon reconnecting; " + "the last seen GTID is %u-%u-%llu", + Log_event::get_type_str((Log_event_type) (uchar) + buf[EVENT_TYPE_OFFSET]), + (mi->events_queued_since_last_gtid - + mi->gtid_reconnect_event_skip_count), + mi->events_queued_since_last_gtid, + mi->last_queued_gtid); + goto err; + } + else + { + /* + The whole of the current event group is queued. So in case of + reconnect we can start from after the current GTID. + */ + mi->gtid_current_pos.update(&mi->last_queued_gtid); + mi->events_queued_since_last_gtid= 0; + + /* Reset the domain_id_filter flag. */ + mi->domain_id_filter.reset_filter(); + } } skip_relay_logging: |