summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
authorSergei Golubchik <serg@mariadb.org>2022-05-18 15:04:50 +0200
committerSergei Golubchik <serg@mariadb.org>2022-05-19 14:07:55 +0200
commitbf2bdd1a1a112c3bbdf53da7a663a59fafa62c7d (patch)
tree615e56754c44190551cea0381494b675108b6ae9 /sql/slave.cc
parent5dba54bfef31d91c082362065cd091086e20ee9a (diff)
parentb7ffccf49b5563d3078359bddf438c9d20674513 (diff)
downloadmariadb-git-bf2bdd1a1a112c3bbdf53da7a663a59fafa62c7d.tar.gz
Merge branch '10.8' into 10.9mariadb-10.9.1
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc143
1 files changed, 123 insertions, 20 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index f6f243d7f03..19037525422 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -3307,9 +3307,9 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
protocol->store((uint32) mi->master_id);
// SQL_Delay
// Master_Ssl_Crl
- protocol->store_string_or_null(mi->ssl_ca, &my_charset_bin);
+ protocol->store_string_or_null(mi->ssl_crl, &my_charset_bin);
// Master_Ssl_Crlpath
- protocol->store_string_or_null(mi->ssl_capath, &my_charset_bin);
+ protocol->store_string_or_null(mi->ssl_crlpath, &my_charset_bin);
// Using_Gtid
protocol->store_string_or_null(mi->using_gtid_astext(mi->using_gtid),
&my_charset_bin);
@@ -3402,7 +3402,7 @@ bool show_all_master_info(THD* thd)
String gtid_pos;
Master_info **tmp;
List<Item> field_list;
- DBUG_ENTER("show_master_info");
+ DBUG_ENTER("show_all_master_info");
mysql_mutex_assert_owner(&LOCK_active_mi);
gtid_pos.length(0);
@@ -4991,6 +4991,7 @@ Stopping slave I/O thread due to out-of-memory error from master");
not cause the slave IO thread to stop, and the error messages are
already reported.
*/
+ DBUG_EXECUTE_IF("simulate_delay_semisync_slave_reply", my_sleep(800000););
(void)repl_semisync_slave.slave_reply(mi);
}
@@ -6777,17 +6778,75 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
mi->gtid_event_seen= true;
/*
- We have successfully queued to relay log everything before this GTID, so
+ Unless the previous group is malformed,
+ we have successfully queued to relay log everything before this GTID, so
in case of reconnect we can start from after any previous GTID.
- (Normally we would have updated gtid_current_pos earlier at the end of
- the previous event group, but better leave an extra check here for
- safety).
+ (We must have updated gtid_current_pos earlier at the end of
+ the previous event group. Unless ...)
*/
- if (mi->events_queued_since_last_gtid)
+ if (unlikely(mi->events_queued_since_last_gtid > 0))
{
- mi->gtid_current_pos.update(&mi->last_queued_gtid);
- mi->events_queued_since_last_gtid= 0;
+ /*
+ ...unless the last group has not been completed. An assert below
+ can be satisfied only with the strict mode that ensures
+ against "genuine" gtid duplicates.
+ */
+ rpl_gtid *gtid_in_slave_state=
+ mi->gtid_current_pos.find(mi->last_queued_gtid.domain_id);
+
+ // Slave gtid state must not have updated yet to the last received gtid.
+ DBUG_ASSERT((mi->using_gtid == Master_info::USE_GTID_NO ||
+ !opt_gtid_strict_mode) ||
+ (!gtid_in_slave_state ||
+ !(*gtid_in_slave_state == mi->last_queued_gtid)));
+
+ DBUG_EXECUTE_IF("slave_discard_xid_for_gtid_0_x_1000",
+ {
+ /* Inject an event group that is missing its XID commit event. */
+ if ((mi->last_queued_gtid.domain_id == 0 &&
+ mi->last_queued_gtid.seq_no == 1000) ||
+ (mi->last_queued_gtid.domain_id == 1 &&
+ mi->last_queued_gtid.seq_no == 32))
+ {
+ sql_print_warning(
+ "Unexpected break of being relay-logged GTID %u-%u-%llu "
+ "event group by the current GTID event %u-%u-%llu",
+ PARAM_GTID(mi->last_queued_gtid),PARAM_GTID(event_gtid));
+ DBUG_SET("-d,slave_discard_xid_for_gtid_0_x_1000");
+ goto dbug_gtid_accept;
+ }
+ });
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ sql_print_error("Unexpected break of being relay-logged GTID %u-%u-%llu "
+ "event group by the current GTID event %u-%u-%llu",
+ PARAM_GTID(mi->last_queued_gtid),PARAM_GTID(event_gtid));
+ goto err;
}
+ else if (unlikely(mi->gtid_reconnect_event_skip_count > 0))
+ {
+ if (mi->gtid_reconnect_event_skip_count ==
+ mi->events_queued_since_last_gtid)
+ {
+ DBUG_ASSERT(event_gtid == mi->last_queued_gtid);
+
+ goto default_action;
+ }
+
+ DBUG_ASSERT(0);
+ }
+ // else_likely{...
+#ifndef DBUG_OFF
+dbug_gtid_accept:
+ DBUG_EXECUTE_IF("slave_discard_gtid_0_x_1002",
+ {
+ if (mi->last_queued_gtid.server_id == 27697 &&
+ mi->last_queued_gtid.seq_no == 1002)
+ {
+ DBUG_SET("-d,slave_discard_gtid_0_x_1002");
+ goto skip_relay_logging;
+ }
+ });
+#endif
mi->last_queued_gtid= event_gtid;
mi->last_queued_gtid_standalone=
(gtid_flag & Gtid_log_event::FL_STANDALONE) != 0;
@@ -6797,7 +6856,7 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
++mi->events_queued_since_last_gtid;
inc_pos= event_len;
-
+ // ...} eof else_likely
}
break;
/*
@@ -6877,6 +6936,12 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
case XID_EVENT:
DBUG_EXECUTE_IF("slave_discard_xid_for_gtid_0_x_1000",
{
+ if (mi->last_queued_gtid.server_id == 27697 &&
+ mi->last_queued_gtid.seq_no == 1000)
+ {
+ DBUG_SET("-d,slave_discard_xid_for_gtid_0_x_1000");
+ goto skip_relay_logging;
+ }
/* Inject an event group that is missing its XID commit event. */
if (mi->last_queued_gtid.domain_id == 0 &&
mi->last_queued_gtid.seq_no == 1000)
@@ -6923,15 +6988,48 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
}
};);
- if (mi->using_gtid != Master_info::USE_GTID_NO && mi->gtid_event_seen)
+ if (mi->using_gtid != Master_info::USE_GTID_NO)
{
- if (unlikely(mi->gtid_reconnect_event_skip_count))
+ if (likely(mi->gtid_event_seen))
{
- --mi->gtid_reconnect_event_skip_count;
- gtid_skip_enqueue= true;
+ if (unlikely(mi->gtid_reconnect_event_skip_count))
+ {
+ if (!got_gtid_event &&
+ mi->gtid_reconnect_event_skip_count ==
+ mi->events_queued_since_last_gtid)
+ goto gtid_not_start; // the 1st re-sent must be gtid
+
+ --mi->gtid_reconnect_event_skip_count;
+ gtid_skip_enqueue= true;
+ }
+ else if (likely(mi->events_queued_since_last_gtid))
+ {
+ DBUG_ASSERT(!got_gtid_event);
+
+ ++mi->events_queued_since_last_gtid;
+ }
+ else if (Log_event::is_group_event((Log_event_type) (uchar)
+ buf[EVENT_TYPE_OFFSET]))
+ {
+ goto gtid_not_start; // no first gtid event in this group
+ }
+ }
+ else if (Log_event::is_group_event((Log_event_type) (uchar)
+ buf[EVENT_TYPE_OFFSET]))
+ {
+ gtid_not_start:
+
+ DBUG_ASSERT(!got_gtid_event);
+
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ sql_print_error("The current group of events starts with "
+ "a non-GTID %s event; "
+ "the last seen GTID is %u-%u-%llu",
+ Log_event::get_type_str((Log_event_type) (uchar)
+ buf[EVENT_TYPE_OFFSET]),
+ mi->last_queued_gtid);
+ goto err;
}
- else if (mi->events_queued_since_last_gtid)
- ++mi->events_queued_since_last_gtid;
}
if (!is_compress_event)
@@ -7129,8 +7227,9 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
mi->using_gtid != Master_info::USE_GTID_NO &&
mi->events_queued_since_last_gtid > 0 &&
( (mi->last_queued_gtid_standalone &&
- !Log_event::is_part_of_group((Log_event_type)(uchar)
- buf[EVENT_TYPE_OFFSET])) ||
+ (LOG_EVENT_IS_QUERY((Log_event_type)(uchar)
+ buf[EVENT_TYPE_OFFSET]) ||
+ (uchar)buf[EVENT_TYPE_OFFSET] == INCIDENT_EVENT)) ||
(!mi->last_queued_gtid_standalone &&
((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT ||
(uchar)buf[EVENT_TYPE_OFFSET] == XA_PREPARE_LOG_EVENT ||
@@ -7142,11 +7241,13 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
The whole of the current event group is queued. So in case of
reconnect we can start from after the current GTID.
*/
- if (mi->gtid_reconnect_event_skip_count)
+ if (gtid_skip_enqueue)
{
bool first= true;
StringBuffer<1024> gtid_text;
+ DBUG_ASSERT(mi->events_queued_since_last_gtid > 1);
+
rpl_slave_state_tostring_helper(&gtid_text, &mi->last_queued_gtid,
&first);
sql_print_error("Slave IO thread received a terminal event from "
@@ -7156,6 +7257,8 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
gtid_text.ptr(),
mi->gtid_reconnect_event_skip_count,
mi->events_queued_since_last_gtid);
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ goto err;
}
mi->gtid_current_pos.update(&mi->last_queued_gtid);
mi->events_queued_since_last_gtid= 0;