diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 152 |
1 files changed, 134 insertions, 18 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 407b4a73a72..c7f4dc08096 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -824,7 +824,8 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, while one of the threads is running, they are in use and cannot be removed. */ - if (mi->using_gtid && !mi->slave_running && !mi->rli.slave_running) + if (mi->using_gtid != Master_info::USE_GTID_NO && + !mi->slave_running && !mi->rli.slave_running) { purge_relay_logs(&mi->rli, NULL, 0, &errmsg); mi->master_log_name[0]= 0; @@ -1833,12 +1834,12 @@ after_set_capability: restart or reconnect, we might end up re-fetching and hence re-applying the same event(s) again. */ - if (mi->using_gtid && !mi->master_log_name[0]) + if (mi->using_gtid != Master_info::USE_GTID_NO && !mi->master_log_name[0]) { int rc; char str_buf[256]; - String connect_state(str_buf, sizeof(str_buf), system_charset_info); - connect_state.length(0); + String query_str(str_buf, sizeof(str_buf), system_charset_info); + query_str.length(0); /* Read the master @@GLOBAL.gtid_domain_id variable. @@ -1861,9 +1862,11 @@ after_set_capability: mysql_free_result(master_res); master_res= NULL; - connect_state.append(STRING_WITH_LEN("SET @slave_connect_state='"), - system_charset_info); - if (rpl_append_gtid_state(&connect_state, true)) + query_str.append(STRING_WITH_LEN("SET @slave_connect_state='"), + system_charset_info); + if (rpl_append_gtid_state(&query_str, + mi->using_gtid == + Master_info::USE_GTID_CURRENT_POS)) { err_code= ER_OUTOFMEMORY; errmsg= "The slave I/O thread stops because a fatal out-of-memory " @@ -1871,9 +1874,9 @@ after_set_capability: sprintf(err_buff, "%s Error: Out of memory", errmsg); goto err; } - connect_state.append(STRING_WITH_LEN("'"), system_charset_info); + query_str.append(STRING_WITH_LEN("'"), system_charset_info); - rc= mysql_real_query(mysql, connect_state.ptr(), connect_state.length()); + rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); if (rc) { err_code= mysql_errno(mysql); @@ -1893,8 +1896,78 @@ after_set_capability: goto err; } } + + query_str.length(0); + if (query_str.append(STRING_WITH_LEN("SET @slave_gtid_strict_mode="), + system_charset_info) || + query_str.append_ulonglong(opt_gtid_strict_mode != false)) + { + err_code= ER_OUTOFMEMORY; + errmsg= "The slave I/O thread stops because a fatal out-of-memory " + "error is encountered when it tries to set @slave_gtid_strict_mode."; + sprintf(err_buff, "%s Error: Out of memory", errmsg); + goto err; + } + + rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); + if (rc) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, + "Setting @slave_gtid_strict_mode failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to set @slave_gtid_strict_mode."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + + if (mi->rli.until_condition == Relay_log_info::UNTIL_GTID) + { + query_str.length(0); + query_str.append(STRING_WITH_LEN("SET @slave_until_gtid='"), + system_charset_info); + if (mi->rli.until_gtid_pos.append_to_string(&query_str)) + { + err_code= ER_OUTOFMEMORY; + errmsg= "The slave I/O thread stops because a fatal out-of-memory " + "error is encountered when it tries to compute @slave_until_gtid."; + sprintf(err_buff, "%s Error: Out of memory", errmsg); + goto err; + } + query_str.append(STRING_WITH_LEN("'"), system_charset_info); + + rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); + if (rc) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, + "Setting @slave_until_gtid failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to set @slave_until_gtid."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + } } - if (!mi->using_gtid) + if (mi->using_gtid == Master_info::USE_GTID_NO) { /* If we are not using GTID to connect this time, then instead request @@ -1920,7 +1993,7 @@ after_set_capability: (master_row[0] != NULL)) { rpl_global_gtid_slave_state.load(mi->io_thd, master_row[0], - strlen(master_row[0]), false); + strlen(master_row[0]), false, false); } else if (check_io_slave_killed(mi->io_thd, mi, NULL)) goto slave_killed_err; @@ -2288,8 +2361,8 @@ static bool send_show_master_info_header(THD *thd, bool full, sizeof(mi->ssl_crl))); field_list.push_back(new Item_empty_string("Master_SSL_Crlpath", sizeof(mi->ssl_crlpath))); - field_list.push_back(new Item_return_int("Using_Gtid", sizeof(ulong), - MYSQL_TYPE_LONG)); + field_list.push_back(new Item_empty_string("Using_Gtid", + sizeof("Current_Pos")-1)); if (full) { field_list.push_back(new Item_return_int("Retried_transactions", @@ -2302,7 +2375,8 @@ static bool send_show_master_info_header(THD *thd, bool full, 10, MYSQL_TYPE_LONG)); field_list.push_back(new Item_float("Slave_heartbeat_period", 0.0, 3, 10)); - field_list.push_back(new Item_empty_string("Gtid_Pos", gtid_pos_length)); + field_list.push_back(new Item_empty_string("Gtid_Slave_Pos", + gtid_pos_length)); } if (protocol->send_result_set_metadata(&field_list, @@ -2382,7 +2456,8 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, protocol->store( mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None": ( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master": - "Relay"), &my_charset_bin); + ( mi->rli.until_condition==Relay_log_info::UNTIL_RELAY_POS? "Relay": + "Gtid")), &my_charset_bin); protocol->store(mi->rli.until_log_name, &my_charset_bin); protocol->store((ulonglong) mi->rli.until_log_pos); @@ -2473,7 +2548,10 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, protocol->store(mi->ssl_ca, &my_charset_bin); // Master_Ssl_Crlpath protocol->store(mi->ssl_capath, &my_charset_bin); - protocol->store((uint32) (mi->using_gtid != 0)); + protocol->store((mi->using_gtid==Master_info::USE_GTID_NO ? "No" : + (mi->using_gtid==Master_info::USE_GTID_SLAVE_POS ? + "Slave_Pos" : "Current_Pos")), + &my_charset_bin); if (full) { protocol->store((uint32) mi->rli.retried_trans); @@ -3079,7 +3157,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) This tests if the position of the beginning of the current event hits the UNTIL barrier. */ - if (rli->until_condition != Relay_log_info::UNTIL_NONE && + if ((rli->until_condition == Relay_log_info::UNTIL_MASTER_POS || + rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) && rli->is_until_satisfied(thd, ev)) { char buf[22]; @@ -3976,7 +4055,8 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, saved_master_log_pos= rli->group_master_log_pos; saved_skip= rli->slave_skip_counter; } - if (rli->until_condition != Relay_log_info::UNTIL_NONE && + if ((rli->until_condition == Relay_log_info::UNTIL_MASTER_POS || + rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) && rli->is_until_satisfied(thd, NULL)) { char buf[22]; @@ -4815,7 +4895,43 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } break; + case GTID_LIST_EVENT: + { + const char *errmsg; + Gtid_list_log_event *glev; + Log_event *tmp; + + if (mi->rli.until_condition != Relay_log_info::UNTIL_GTID) + goto default_action; + if (!(tmp= Log_event::read_log_event(buf, event_len, &errmsg, + mi->rli.relay_log.description_event_for_queue, + opt_slave_sql_verify_checksum))) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; + } + glev= static_cast<Gtid_list_log_event *>(tmp); + if (glev->gl_flags & Gtid_list_log_event::FLAG_UNTIL_REACHED) + { + char str_buf[128]; + String str(str_buf, sizeof(str_buf), system_charset_info); + mi->rli.until_gtid_pos.to_string(&str); + sql_print_information("Slave IO thread stops because it reached its" + " UNTIL master_gtid_pos %s", str.c_ptr_safe()); + mi->abort_slave= true; + } + delete glev; + + /* + Do not update position for fake Gtid_list event (which has a zero + end_log_pos). + */ + inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0; + } + break; + default: + default_action: inc_pos= event_len; break; } |