summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc152
1 files changed, 134 insertions, 18 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 407b4a73a72..c7f4dc08096 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -824,7 +824,8 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
while one of the threads is running, they are in use and cannot be
removed.
*/
- if (mi->using_gtid && !mi->slave_running && !mi->rli.slave_running)
+ if (mi->using_gtid != Master_info::USE_GTID_NO &&
+ !mi->slave_running && !mi->rli.slave_running)
{
purge_relay_logs(&mi->rli, NULL, 0, &errmsg);
mi->master_log_name[0]= 0;
@@ -1833,12 +1834,12 @@ after_set_capability:
restart or reconnect, we might end up re-fetching and hence re-applying
the same event(s) again.
*/
- if (mi->using_gtid && !mi->master_log_name[0])
+ if (mi->using_gtid != Master_info::USE_GTID_NO && !mi->master_log_name[0])
{
int rc;
char str_buf[256];
- String connect_state(str_buf, sizeof(str_buf), system_charset_info);
- connect_state.length(0);
+ String query_str(str_buf, sizeof(str_buf), system_charset_info);
+ query_str.length(0);
/*
Read the master @@GLOBAL.gtid_domain_id variable.
@@ -1861,9 +1862,11 @@ after_set_capability:
mysql_free_result(master_res);
master_res= NULL;
- connect_state.append(STRING_WITH_LEN("SET @slave_connect_state='"),
- system_charset_info);
- if (rpl_append_gtid_state(&connect_state, true))
+ query_str.append(STRING_WITH_LEN("SET @slave_connect_state='"),
+ system_charset_info);
+ if (rpl_append_gtid_state(&query_str,
+ mi->using_gtid ==
+ Master_info::USE_GTID_CURRENT_POS))
{
err_code= ER_OUTOFMEMORY;
errmsg= "The slave I/O thread stops because a fatal out-of-memory "
@@ -1871,9 +1874,9 @@ after_set_capability:
sprintf(err_buff, "%s Error: Out of memory", errmsg);
goto err;
}
- connect_state.append(STRING_WITH_LEN("'"), system_charset_info);
+ query_str.append(STRING_WITH_LEN("'"), system_charset_info);
- rc= mysql_real_query(mysql, connect_state.ptr(), connect_state.length());
+ rc= mysql_real_query(mysql, query_str.ptr(), query_str.length());
if (rc)
{
err_code= mysql_errno(mysql);
@@ -1893,8 +1896,78 @@ after_set_capability:
goto err;
}
}
+
+ query_str.length(0);
+ if (query_str.append(STRING_WITH_LEN("SET @slave_gtid_strict_mode="),
+ system_charset_info) ||
+ query_str.append_ulonglong(opt_gtid_strict_mode != false))
+ {
+ err_code= ER_OUTOFMEMORY;
+ errmsg= "The slave I/O thread stops because a fatal out-of-memory "
+ "error is encountered when it tries to set @slave_gtid_strict_mode.";
+ sprintf(err_buff, "%s Error: Out of memory", errmsg);
+ goto err;
+ }
+
+ rc= mysql_real_query(mysql, query_str.ptr(), query_str.length());
+ if (rc)
+ {
+ err_code= mysql_errno(mysql);
+ if (is_network_error(err_code))
+ {
+ mi->report(ERROR_LEVEL, err_code,
+ "Setting @slave_gtid_strict_mode failed with error: %s",
+ mysql_error(mysql));
+ goto network_err;
+ }
+ else
+ {
+ /* Fatal error */
+ errmsg= "The slave I/O thread stops because a fatal error is "
+ "encountered when it tries to set @slave_gtid_strict_mode.";
+ sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
+ goto err;
+ }
+ }
+
+ if (mi->rli.until_condition == Relay_log_info::UNTIL_GTID)
+ {
+ query_str.length(0);
+ query_str.append(STRING_WITH_LEN("SET @slave_until_gtid='"),
+ system_charset_info);
+ if (mi->rli.until_gtid_pos.append_to_string(&query_str))
+ {
+ err_code= ER_OUTOFMEMORY;
+ errmsg= "The slave I/O thread stops because a fatal out-of-memory "
+ "error is encountered when it tries to compute @slave_until_gtid.";
+ sprintf(err_buff, "%s Error: Out of memory", errmsg);
+ goto err;
+ }
+ query_str.append(STRING_WITH_LEN("'"), system_charset_info);
+
+ rc= mysql_real_query(mysql, query_str.ptr(), query_str.length());
+ if (rc)
+ {
+ err_code= mysql_errno(mysql);
+ if (is_network_error(err_code))
+ {
+ mi->report(ERROR_LEVEL, err_code,
+ "Setting @slave_until_gtid failed with error: %s",
+ mysql_error(mysql));
+ goto network_err;
+ }
+ else
+ {
+ /* Fatal error */
+ errmsg= "The slave I/O thread stops because a fatal error is "
+ "encountered when it tries to set @slave_until_gtid.";
+ sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
+ goto err;
+ }
+ }
+ }
}
- if (!mi->using_gtid)
+ if (mi->using_gtid == Master_info::USE_GTID_NO)
{
/*
If we are not using GTID to connect this time, then instead request
@@ -1920,7 +1993,7 @@ after_set_capability:
(master_row[0] != NULL))
{
rpl_global_gtid_slave_state.load(mi->io_thd, master_row[0],
- strlen(master_row[0]), false);
+ strlen(master_row[0]), false, false);
}
else if (check_io_slave_killed(mi->io_thd, mi, NULL))
goto slave_killed_err;
@@ -2288,8 +2361,8 @@ static bool send_show_master_info_header(THD *thd, bool full,
sizeof(mi->ssl_crl)));
field_list.push_back(new Item_empty_string("Master_SSL_Crlpath",
sizeof(mi->ssl_crlpath)));
- field_list.push_back(new Item_return_int("Using_Gtid", sizeof(ulong),
- MYSQL_TYPE_LONG));
+ field_list.push_back(new Item_empty_string("Using_Gtid",
+ sizeof("Current_Pos")-1));
if (full)
{
field_list.push_back(new Item_return_int("Retried_transactions",
@@ -2302,7 +2375,8 @@ static bool send_show_master_info_header(THD *thd, bool full,
10, MYSQL_TYPE_LONG));
field_list.push_back(new Item_float("Slave_heartbeat_period",
0.0, 3, 10));
- field_list.push_back(new Item_empty_string("Gtid_Pos", gtid_pos_length));
+ field_list.push_back(new Item_empty_string("Gtid_Slave_Pos",
+ gtid_pos_length));
}
if (protocol->send_result_set_metadata(&field_list,
@@ -2382,7 +2456,8 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
protocol->store(
mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
- "Relay"), &my_charset_bin);
+ ( mi->rli.until_condition==Relay_log_info::UNTIL_RELAY_POS? "Relay":
+ "Gtid")), &my_charset_bin);
protocol->store(mi->rli.until_log_name, &my_charset_bin);
protocol->store((ulonglong) mi->rli.until_log_pos);
@@ -2473,7 +2548,10 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
protocol->store(mi->ssl_ca, &my_charset_bin);
// Master_Ssl_Crlpath
protocol->store(mi->ssl_capath, &my_charset_bin);
- protocol->store((uint32) (mi->using_gtid != 0));
+ protocol->store((mi->using_gtid==Master_info::USE_GTID_NO ? "No" :
+ (mi->using_gtid==Master_info::USE_GTID_SLAVE_POS ?
+ "Slave_Pos" : "Current_Pos")),
+ &my_charset_bin);
if (full)
{
protocol->store((uint32) mi->rli.retried_trans);
@@ -3079,7 +3157,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
This tests if the position of the beginning of the current event
hits the UNTIL barrier.
*/
- if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
+ if ((rli->until_condition == Relay_log_info::UNTIL_MASTER_POS ||
+ rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) &&
rli->is_until_satisfied(thd, ev))
{
char buf[22];
@@ -3976,7 +4055,8 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
saved_master_log_pos= rli->group_master_log_pos;
saved_skip= rli->slave_skip_counter;
}
- if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
+ if ((rli->until_condition == Relay_log_info::UNTIL_MASTER_POS ||
+ rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) &&
rli->is_until_satisfied(thd, NULL))
{
char buf[22];
@@ -4815,7 +4895,43 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
}
break;
+ case GTID_LIST_EVENT:
+ {
+ const char *errmsg;
+ Gtid_list_log_event *glev;
+ Log_event *tmp;
+
+ if (mi->rli.until_condition != Relay_log_info::UNTIL_GTID)
+ goto default_action;
+ if (!(tmp= Log_event::read_log_event(buf, event_len, &errmsg,
+ mi->rli.relay_log.description_event_for_queue,
+ opt_slave_sql_verify_checksum)))
+ {
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ goto err;
+ }
+ glev= static_cast<Gtid_list_log_event *>(tmp);
+ if (glev->gl_flags & Gtid_list_log_event::FLAG_UNTIL_REACHED)
+ {
+ char str_buf[128];
+ String str(str_buf, sizeof(str_buf), system_charset_info);
+ mi->rli.until_gtid_pos.to_string(&str);
+ sql_print_information("Slave IO thread stops because it reached its"
+ " UNTIL master_gtid_pos %s", str.c_ptr_safe());
+ mi->abort_slave= true;
+ }
+ delete glev;
+
+ /*
+ Do not update position for fake Gtid_list event (which has a zero
+ end_log_pos).
+ */
+ inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
+ }
+ break;
+
default:
+ default_action:
inc_pos= event_len;
break;
}