summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc110
1 files changed, 80 insertions, 30 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 98ad3da90c2..c4c37dd9566 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -1202,7 +1202,6 @@ int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
DBUG_RETURN(1);
}
-
/*
when moving these functions to mysys, don't forget to
remove slave.cc from libmysqld/CMakeLists.txt
@@ -1258,6 +1257,7 @@ int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
going to ignore (to not log them in the relay log).
Items being read are supposed to be decimal output of values of a
type shorter or equal of @c long and separated by the single space.
+ It also used to restore DO_DOMAIN_IDS & IGNORE_DOMAIN_IDS lists.
@param arr @c DYNAMIC_ARRAY pointer to storage for servers id
@param f @c IO_CACHE pointer to the source file
@@ -1278,7 +1278,7 @@ int init_dynarray_intvar_from_file(DYNAMIC_ARRAY* arr, IO_CACHE* f)
if ((read_size= my_b_gets(f, buf_act, sizeof(buf))) == 0)
{
- return 0; // no line in master.info
+ DBUG_RETURN(0); // no line in master.info
}
if (read_size + 1 == sizeof(buf) && buf[sizeof(buf) - 2] != '\n')
{
@@ -2581,6 +2581,10 @@ static bool send_show_master_info_header(THD *thd, bool full,
field_list.push_back(new Item_empty_string("Using_Gtid",
sizeof("Current_Pos")-1));
field_list.push_back(new Item_empty_string("Gtid_IO_Pos", 30));
+ field_list.push_back(new Item_empty_string("Replicate_Do_Domain_Ids",
+ FN_REFLEN));
+ field_list.push_back(new Item_empty_string("Replicate_Ignore_Domain_Ids",
+ FN_REFLEN));
if (full)
{
field_list.push_back(new Item_return_int("Retried_transactions",
@@ -2762,29 +2766,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
// Last_SQL_Error
protocol->store(mi->rli.last_error().message, &my_charset_bin);
// Replicate_Ignore_Server_Ids
- {
- char buff[FN_REFLEN];
- ulong i, cur_len;
- for (i= 0, buff[0]= 0, cur_len= 0;
- i < mi->ignore_server_ids.elements; i++)
- {
- ulong s_id, slen;
- char sbuff[FN_REFLEN];
- get_dynamic(&mi->ignore_server_ids, (uchar*) &s_id, i);
- slen= sprintf(sbuff, (i==0? "%lu" : ", %lu"), s_id);
- if (cur_len + slen + 4 > FN_REFLEN)
- {
- /*
- break the loop whenever remained space could not fit
- ellipses on the next cycle
- */
- sprintf(buff + cur_len, "...");
- break;
- }
- cur_len += sprintf(buff + cur_len, "%s", sbuff);
- }
- protocol->store(buff, &my_charset_bin);
- }
+ prot_store_ids(thd, &mi->ignore_server_ids);
// Master_Server_id
protocol->store((uint32) mi->master_id);
// Master_Ssl_Crl
@@ -2798,6 +2780,10 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
mi->gtid_current_pos.to_string(&tmp);
protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin);
}
+
+ // Replicate_Do_Domain_Ids & Replicate_Ignore_Domain_Ids
+ mi->domain_id_filter.store_ids(thd);
+
if (full)
{
protocol->store((uint32) mi->rli.retried_trans);
@@ -3784,6 +3770,7 @@ pthread_handler_t handle_slave_io(void *arg)
rpl_io_thread_info io_info;
#ifndef DBUG_OFF
uint retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
+ mi->dbug_do_disconnect= false;
#endif
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init();
@@ -5587,6 +5574,12 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
case GTID_EVENT:
{
+ DBUG_EXECUTE_IF("kill_slave_io_after_2_events",
+ {
+ mi->dbug_do_disconnect= true;
+ mi->dbug_event_counter= 2;
+ };);
+
uchar gtid_flag;
if (Gtid_log_event::peek(buf, event_len, checksum_alg,
@@ -5656,6 +5649,10 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mi->last_queued_gtid= event_gtid;
mi->last_queued_gtid_standalone=
(gtid_flag & Gtid_log_event::FL_STANDALONE) != 0;
+
+ /* Should filter all the subsequent events in the current GTID group? */
+ mi->domain_id_filter.do_filter(event_gtid.domain_id);
+
++mi->events_queued_since_last_gtid;
inc_pos= event_len;
}
@@ -5663,6 +5660,47 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
default:
default_action:
+ DBUG_EXECUTE_IF("kill_slave_io_after_2_events",
+ {
+ if (mi->dbug_do_disconnect &&
+ (((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT) ||
+ ((uchar)buf[EVENT_TYPE_OFFSET] == TABLE_MAP_EVENT))
+ && (--mi->dbug_event_counter == 0))
+ {
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ mi->dbug_do_disconnect= false; /* Safety */
+ goto err;
+ }
+ };);
+
+ DBUG_EXECUTE_IF("kill_slave_io_before_commit",
+ {
+ if ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT ||
+ ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT &&
+ Query_log_event::peek_is_commit_rollback(buf, event_len,
+ checksum_alg)))
+ {
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ goto err;
+ }
+ };);
+
+ if (mi->using_gtid != Master_info::USE_GTID_NO &&
+ mi->domain_id_filter.is_group_filtered() &&
+ mi->events_queued_since_last_gtid > 0 &&
+ ((mi->last_queued_gtid_standalone &&
+ !Log_event::is_part_of_group((Log_event_type)(uchar)
+ buf[EVENT_TYPE_OFFSET])) ||
+ (!mi->last_queued_gtid_standalone &&
+ ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT ||
+ ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT &&
+ Query_log_event::peek_is_commit_rollback(buf, event_len,
+ checksum_alg))))))
+ {
+ /* Reset the domain_id_filter flag. */
+ mi->domain_id_filter.reset_filter();
+ }
+
if (mi->using_gtid != Master_info::USE_GTID_NO && mi->gtid_event_seen)
{
if (unlikely(mi->gtid_reconnect_event_skip_count))
@@ -5765,7 +5803,15 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
(s_id != mi->master_id ||
/* for the master meta information is necessary */
(buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
- buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))))
+ buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) ||
+
+ /*
+ Check whether it needs to be filtered based on domain_id
+ (DO_DOMAIN_IDS/IGNORE_DOMAIN_IDS).
+ */
+ (mi->domain_id_filter.is_group_filtered() &&
+ Log_event::is_group_event((Log_event_type)(uchar)
+ buf[EVENT_TYPE_OFFSET])))
{
/*
Do not write it to the relay log.
@@ -5842,16 +5888,20 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
}
skip_relay_logging:
-
+
err:
if (unlock_data_lock)
mysql_mutex_unlock(&mi->data_lock);
DBUG_PRINT("info", ("error: %d", error));
- if (error)
+
+ /*
+ Do not print ER_SLAVE_RELAY_LOG_WRITE_FAILURE error here, as the caller
+ handle_slave_io() prints it on return.
+ */
+ if (error && error != ER_SLAVE_RELAY_LOG_WRITE_FAILURE)
mi->report(ERROR_LEVEL, error, NULL, ER(error),
- (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
- "could not queue event from master" :
error_msg.ptr());
+
DBUG_RETURN(error);
}