diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 110 |
1 files changed, 80 insertions, 30 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 98ad3da90c2..c4c37dd9566 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -1202,7 +1202,6 @@ int init_strvar_from_file(char *var, int max_size, IO_CACHE *f, DBUG_RETURN(1); } - /* when moving these functions to mysys, don't forget to remove slave.cc from libmysqld/CMakeLists.txt @@ -1258,6 +1257,7 @@ int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val) going to ignore (to not log them in the relay log). Items being read are supposed to be decimal output of values of a type shorter or equal of @c long and separated by the single space. + It also used to restore DO_DOMAIN_IDS & IGNORE_DOMAIN_IDS lists. @param arr @c DYNAMIC_ARRAY pointer to storage for servers id @param f @c IO_CACHE pointer to the source file @@ -1278,7 +1278,7 @@ int init_dynarray_intvar_from_file(DYNAMIC_ARRAY* arr, IO_CACHE* f) if ((read_size= my_b_gets(f, buf_act, sizeof(buf))) == 0) { - return 0; // no line in master.info + DBUG_RETURN(0); // no line in master.info } if (read_size + 1 == sizeof(buf) && buf[sizeof(buf) - 2] != '\n') { @@ -2581,6 +2581,10 @@ static bool send_show_master_info_header(THD *thd, bool full, field_list.push_back(new Item_empty_string("Using_Gtid", sizeof("Current_Pos")-1)); field_list.push_back(new Item_empty_string("Gtid_IO_Pos", 30)); + field_list.push_back(new Item_empty_string("Replicate_Do_Domain_Ids", + FN_REFLEN)); + field_list.push_back(new Item_empty_string("Replicate_Ignore_Domain_Ids", + FN_REFLEN)); if (full) { field_list.push_back(new Item_return_int("Retried_transactions", @@ -2762,29 +2766,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, // Last_SQL_Error protocol->store(mi->rli.last_error().message, &my_charset_bin); // Replicate_Ignore_Server_Ids - { - char buff[FN_REFLEN]; - ulong i, cur_len; - for (i= 0, buff[0]= 0, cur_len= 0; - i < mi->ignore_server_ids.elements; i++) - { - ulong s_id, slen; - char sbuff[FN_REFLEN]; - get_dynamic(&mi->ignore_server_ids, (uchar*) &s_id, i); - slen= sprintf(sbuff, (i==0? "%lu" : ", %lu"), s_id); - if (cur_len + slen + 4 > FN_REFLEN) - { - /* - break the loop whenever remained space could not fit - ellipses on the next cycle - */ - sprintf(buff + cur_len, "..."); - break; - } - cur_len += sprintf(buff + cur_len, "%s", sbuff); - } - protocol->store(buff, &my_charset_bin); - } + prot_store_ids(thd, &mi->ignore_server_ids); // Master_Server_id protocol->store((uint32) mi->master_id); // Master_Ssl_Crl @@ -2798,6 +2780,10 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, mi->gtid_current_pos.to_string(&tmp); protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin); } + + // Replicate_Do_Domain_Ids & Replicate_Ignore_Domain_Ids + mi->domain_id_filter.store_ids(thd); + if (full) { protocol->store((uint32) mi->rli.retried_trans); @@ -3784,6 +3770,7 @@ pthread_handler_t handle_slave_io(void *arg) rpl_io_thread_info io_info; #ifndef DBUG_OFF uint retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0; + mi->dbug_do_disconnect= false; #endif // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff my_thread_init(); @@ -5587,6 +5574,12 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) case GTID_EVENT: { + DBUG_EXECUTE_IF("kill_slave_io_after_2_events", + { + mi->dbug_do_disconnect= true; + mi->dbug_event_counter= 2; + };); + uchar gtid_flag; if (Gtid_log_event::peek(buf, event_len, checksum_alg, @@ -5656,6 +5649,10 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mi->last_queued_gtid= event_gtid; mi->last_queued_gtid_standalone= (gtid_flag & Gtid_log_event::FL_STANDALONE) != 0; + + /* Should filter all the subsequent events in the current GTID group? */ + mi->domain_id_filter.do_filter(event_gtid.domain_id); + ++mi->events_queued_since_last_gtid; inc_pos= event_len; } @@ -5663,6 +5660,47 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) default: default_action: + DBUG_EXECUTE_IF("kill_slave_io_after_2_events", + { + if (mi->dbug_do_disconnect && + (((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT) || + ((uchar)buf[EVENT_TYPE_OFFSET] == TABLE_MAP_EVENT)) + && (--mi->dbug_event_counter == 0)) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + mi->dbug_do_disconnect= false; /* Safety */ + goto err; + } + };); + + DBUG_EXECUTE_IF("kill_slave_io_before_commit", + { + if ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT || + ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && + Query_log_event::peek_is_commit_rollback(buf, event_len, + checksum_alg))) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; + } + };); + + if (mi->using_gtid != Master_info::USE_GTID_NO && + mi->domain_id_filter.is_group_filtered() && + mi->events_queued_since_last_gtid > 0 && + ((mi->last_queued_gtid_standalone && + !Log_event::is_part_of_group((Log_event_type)(uchar) + buf[EVENT_TYPE_OFFSET])) || + (!mi->last_queued_gtid_standalone && + ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT || + ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && + Query_log_event::peek_is_commit_rollback(buf, event_len, + checksum_alg)))))) + { + /* Reset the domain_id_filter flag. */ + mi->domain_id_filter.reset_filter(); + } + if (mi->using_gtid != Master_info::USE_GTID_NO && mi->gtid_event_seen) { if (unlikely(mi->gtid_reconnect_event_skip_count)) @@ -5765,7 +5803,15 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) (s_id != mi->master_id || /* for the master meta information is necessary */ (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && - buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT)))) + buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) || + + /* + Check whether it needs to be filtered based on domain_id + (DO_DOMAIN_IDS/IGNORE_DOMAIN_IDS). + */ + (mi->domain_id_filter.is_group_filtered() && + Log_event::is_group_event((Log_event_type)(uchar) + buf[EVENT_TYPE_OFFSET]))) { /* Do not write it to the relay log. @@ -5842,16 +5888,20 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } skip_relay_logging: - + err: if (unlock_data_lock) mysql_mutex_unlock(&mi->data_lock); DBUG_PRINT("info", ("error: %d", error)); - if (error) + + /* + Do not print ER_SLAVE_RELAY_LOG_WRITE_FAILURE error here, as the caller + handle_slave_io() prints it on return. + */ + if (error && error != ER_SLAVE_RELAY_LOG_WRITE_FAILURE) mi->report(ERROR_LEVEL, error, NULL, ER(error), - (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)? - "could not queue event from master" : error_msg.ptr()); + DBUG_RETURN(error); } |