summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc272
1 files changed, 248 insertions, 24 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 41eb7247b8c..d0723c331df 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -162,8 +162,10 @@ static int terminate_slave_thread(THD *thd,
volatile uint *slave_running,
bool skip_lock);
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
-static bool send_show_master_info_header(THD *thd, bool full);
-static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full);
+static bool send_show_master_info_header(THD *thd, bool full,
+ size_t gtid_pos_length);
+static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
+ String *gtid_pos);
/*
Find out which replications threads are running
@@ -395,6 +397,7 @@ int init_recovery(Master_info* mi, const char** errmsg)
DBUG_RETURN(0);
}
+
/**
Convert slave skip errors bitmap into a printable string.
@@ -718,7 +721,7 @@ int start_slave_thread(
if (start_lock)
mysql_mutex_lock(start_lock);
- if (!server_id)
+ if (!global_system_variables.server_id)
{
if (start_cond)
mysql_cond_broadcast(start_cond);
@@ -796,6 +799,7 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
mysql_mutex_t *lock_io=0, *lock_sql=0, *lock_cond_io=0, *lock_cond_sql=0;
mysql_cond_t* cond_io=0, *cond_sql=0;
int error=0;
+ const char *errmsg;
DBUG_ENTER("start_slave_threads");
if (need_slave_mutex)
@@ -811,6 +815,22 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
lock_cond_sql = &mi->rli.run_lock;
}
+ /*
+ If we are using GTID and both SQL and IO threads are stopped, then get
+ rid of all relay logs.
+
+ Relay logs are not very useful when using GTID, except as a buffer
+ between the fetch in the IO thread and the apply in SQL thread. However
+ while one of the threads is running, they are in use and cannot be
+ removed.
+ */
+ if (mi->using_gtid && !mi->slave_running && !mi->rli.slave_running)
+ {
+ purge_relay_logs(&mi->rli, NULL, 0, &errmsg);
+ mi->master_log_name[0]= 0;
+ mi->master_log_pos= 0;
+ }
+
if (thread_mask & SLAVE_IO)
error= start_slave_thread(
#ifdef HAVE_PSI_INTERFACE
@@ -1421,7 +1441,8 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
(master_res= mysql_store_result(mysql)) &&
(master_row= mysql_fetch_row(master_res)))
{
- if ((::server_id == (mi->master_id= strtoul(master_row[1], 0, 10))) &&
+ if ((global_system_variables.server_id ==
+ (mi->master_id= strtoul(master_row[1], 0, 10))) &&
!mi->rli.replicate_same_server_id)
{
errmsg= "The slave I/O thread stops because master and slave have equal \
@@ -1801,6 +1822,133 @@ past_checksum:
after_set_capability:
#endif
+ /*
+ Request dump start from slave replication GTID state.
+
+ Only request GTID position the first time we connect after CHANGE MASTER
+ or after starting both IO or SQL thread.
+
+ Otherwise, if the IO thread was ahead of the SQL thread before the
+ restart or reconnect, we might end up re-fetching and hence re-applying
+ the same event(s) again.
+ */
+ if (mi->using_gtid && !mi->master_log_name[0])
+ {
+ int rc;
+ char str_buf[256];
+ String connect_state(str_buf, sizeof(str_buf), system_charset_info);
+ connect_state.length(0);
+
+ /*
+ Read the master @@GLOBAL.gtid_domain_id variable.
+ This is mostly to check that master is GTID aware, but we could later
+ perhaps use it to check that different multi-source masters are correctly
+ configured with distinct domain_id.
+ */
+ if (mysql_real_query(mysql,
+ STRING_WITH_LEN("SELECT @@GLOBAL.gtid_domain_id")) ||
+ !(master_res= mysql_store_result(mysql)) ||
+ !(master_row= mysql_fetch_row(master_res)))
+ {
+ err_code= mysql_errno(mysql);
+ errmsg= "The slave I/O thread stops because master does not support "
+ "MariaDB global transaction id. A fatal error is encountered when "
+ "it tries to SELECT @@GLOBAL.gtid_domain_id.";
+ sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
+ goto err;
+ }
+ mysql_free_result(master_res);
+ master_res= NULL;
+
+ connect_state.append(STRING_WITH_LEN("SET @slave_connect_state='"),
+ system_charset_info);
+ if (rpl_append_gtid_state(&connect_state, true))
+ {
+ err_code= ER_OUTOFMEMORY;
+ errmsg= "The slave I/O thread stops because a fatal out-of-memory "
+ "error is encountered when it tries to compute @slave_connect_state.";
+ sprintf(err_buff, "%s Error: Out of memory", errmsg);
+ goto err;
+ }
+ connect_state.append(STRING_WITH_LEN("'"), system_charset_info);
+
+ rc= mysql_real_query(mysql, connect_state.ptr(), connect_state.length());
+ if (rc)
+ {
+ err_code= mysql_errno(mysql);
+ if (is_network_error(err_code))
+ {
+ mi->report(ERROR_LEVEL, err_code,
+ "Setting @slave_connect_state failed with error: %s",
+ mysql_error(mysql));
+ goto network_err;
+ }
+ else
+ {
+ /* Fatal error */
+ errmsg= "The slave I/O thread stops because a fatal error is "
+ "encountered when it tries to set @slave_connect_state.";
+ sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
+ goto err;
+ }
+ }
+ }
+ if (!mi->using_gtid)
+ {
+ /*
+ If we are not using GTID to connect this time, then instead request
+ the corresponding GTID position from the master, so that the user
+ can reconnect the next time using MASTER_GTID_POS=AUTO.
+ */
+ char quote_buf[2*sizeof(mi->master_log_name)+1];
+ char str_buf[28+2*sizeof(mi->master_log_name)+10];
+ String query(str_buf, sizeof(str_buf), system_charset_info);
+ query.length(0);
+
+ query.append("SELECT binlog_gtid_pos('");
+ escape_quotes_for_mysql(&my_charset_bin, quote_buf, sizeof(quote_buf),
+ mi->master_log_name, strlen(mi->master_log_name));
+ query.append(quote_buf);
+ query.append("',");
+ query.append_ulonglong(mi->master_log_pos);
+ query.append(")");
+
+ if (!mysql_real_query(mysql, query.c_ptr_safe(), query.length()) &&
+ (master_res= mysql_store_result(mysql)) &&
+ (master_row= mysql_fetch_row(master_res)) &&
+ (master_row[0] != NULL))
+ {
+ rpl_global_gtid_slave_state.load(mi->io_thd, master_row[0],
+ strlen(master_row[0]), false);
+ }
+ else if (check_io_slave_killed(mi->io_thd, mi, NULL))
+ goto slave_killed_err;
+ else if (is_network_error(mysql_errno(mysql)))
+ {
+ mi->report(WARNING_LEVEL, mysql_errno(mysql),
+ "Get master GTID position failed with error: %s", mysql_error(mysql));
+ goto network_err;
+ }
+ else
+ {
+ /*
+ ToDo: If the master does not have the binlog_gtid_pos() function, it
+ just means that it is an old master with no GTID support, so we should
+ do nothing.
+
+ However, if binlog_gtid_pos() exists, but fails or returns NULL, then
+ it means that the requested position is not valid. We could use this
+ to catch attempts to replicate from within the middle of an event,
+ avoiding strange failures or possible corruption.
+ */
+ }
+ if (master_res)
+ {
+ mysql_free_result(master_res);
+ master_res= NULL;
+ }
+ }
+
err:
if (errmsg)
{
@@ -1994,7 +2142,7 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi,
DBUG_RETURN(0);
}
- int4store(pos, server_id); pos+= 4;
+ int4store(pos, global_system_variables.server_id); pos+= 4;
pos= net_store_data(pos, (uchar*) report_host, report_host_len);
pos= net_store_data(pos, (uchar*) report_user, report_user_len);
pos= net_store_data(pos, (uchar*) report_password, report_password_len);
@@ -2043,16 +2191,20 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi,
bool show_master_info(THD *thd, Master_info *mi, bool full)
{
DBUG_ENTER("show_master_info");
+ String gtid_pos;
- if (send_show_master_info_header(thd, full))
+ if (full && rpl_global_gtid_slave_state.tostring(&gtid_pos, NULL, 0))
+ DBUG_RETURN(TRUE);
+ if (send_show_master_info_header(thd, full, gtid_pos.length()))
DBUG_RETURN(TRUE);
- if (send_show_master_info_data(thd, mi, full))
+ if (send_show_master_info_data(thd, mi, full, &gtid_pos))
DBUG_RETURN(TRUE);
my_eof(thd);
DBUG_RETURN(FALSE);
}
-static bool send_show_master_info_header(THD *thd, bool full)
+static bool send_show_master_info_header(THD *thd, bool full,
+ size_t gtid_pos_length)
{
List<Item> field_list;
Protocol *protocol= thd->protocol;
@@ -2135,6 +2287,8 @@ static bool send_show_master_info_header(THD *thd, bool full)
sizeof(mi->ssl_crl)));
field_list.push_back(new Item_empty_string("Master_SSL_Crlpath",
sizeof(mi->ssl_crlpath)));
+ field_list.push_back(new Item_return_int("Using_Gtid", sizeof(ulong),
+ MYSQL_TYPE_LONG));
if (full)
{
field_list.push_back(new Item_return_int("Retried_transactions",
@@ -2147,6 +2301,7 @@ static bool send_show_master_info_header(THD *thd, bool full)
10, MYSQL_TYPE_LONG));
field_list.push_back(new Item_float("Slave_heartbeat_period",
0.0, 3, 10));
+ field_list.push_back(new Item_empty_string("Gtid_Pos", gtid_pos_length));
}
if (protocol->send_result_set_metadata(&field_list,
@@ -2156,7 +2311,8 @@ static bool send_show_master_info_header(THD *thd, bool full)
}
-static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full)
+static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
+ String *gtid_pos)
{
DBUG_ENTER("send_show_master_info_data");
@@ -2315,6 +2471,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full)
protocol->store(mi->ssl_ca, &my_charset_bin);
// Master_Ssl_Crlpath
protocol->store(mi->ssl_capath, &my_charset_bin);
+ protocol->store((uint32) (mi->using_gtid != 0));
if (full)
{
protocol->store((uint32) mi->rli.retried_trans);
@@ -2322,6 +2479,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full)
protocol->store((uint32) mi->rli.executed_entries);
protocol->store((uint32) mi->received_heartbeats);
protocol->store((double) mi->heartbeat_period, 3, &tmp);
+ protocol->store(gtid_pos->ptr(), gtid_pos->length(), &my_charset_bin);
}
mysql_mutex_unlock(&mi->rli.err_lock);
@@ -2364,11 +2522,19 @@ static int cmp_mi_by_name(const Master_info **arg1,
bool show_all_master_info(THD* thd)
{
uint i, elements;
+ String gtid_pos;
Master_info **tmp;
DBUG_ENTER("show_master_info");
mysql_mutex_assert_owner(&LOCK_active_mi);
- if (send_show_master_info_header(thd, 1))
+ gtid_pos.length(0);
+ if (rpl_append_gtid_state(&gtid_pos, true))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ DBUG_RETURN(TRUE);
+ }
+
+ if (send_show_master_info_header(thd, 1, gtid_pos.length()))
DBUG_RETURN(TRUE);
if (!(elements= master_info_index->master_info_hash.records))
@@ -2390,7 +2556,7 @@ bool show_all_master_info(THD* thd)
for (i= 0; i < elements; i++)
{
- if (send_show_master_info_data(thd, tmp[i], 1))
+ if (send_show_master_info_data(thd, tmp[i], 1, &gtid_pos))
DBUG_RETURN(TRUE);
}
@@ -2555,7 +2721,7 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
// TODO if big log files: Change next to int8store()
int4store(buf, (ulong) mi->master_log_pos);
int2store(buf + 4, binlog_flags);
- int4store(buf + 6, server_id);
+ int4store(buf + 6, global_system_variables.server_id);
len = (uint) strlen(logname);
memcpy(buf + 10, logname,len);
if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
@@ -2764,7 +2930,8 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
has a Rotate etc).
*/
- thd->server_id = ev->server_id; // use the original server id for logging
+ /* Use the original server id for logging. */
+ thd->variables.server_id = ev->server_id;
thd->set_time(); // time the query
thd->lex->current_select= 0;
if (!ev->when)
@@ -3548,17 +3715,34 @@ err_during_init:
/*
Check the temporary directory used by commands like
LOAD DATA INFILE.
+
+ As the directory never changes during a mysqld run, we only
+ test this once and cache the result. This also resolve a race condition
+ when this can be run by multiple threads at the same time.
*/
+
+static bool check_temp_dir_run= 0;
+static int check_temp_dir_result= 0;
+
static
int check_temp_dir(char* tmp_file)
{
- int fd;
+ File fd;
+ int result= 1; // Assume failure
MY_DIR *dirp;
char tmp_dir[FN_REFLEN];
size_t tmp_dir_size;
DBUG_ENTER("check_temp_dir");
+ mysql_mutex_lock(&LOCK_thread_count);
+ if (check_temp_dir_run)
+ {
+ result= check_temp_dir_result;
+ goto end;
+ }
+ check_temp_dir_run= 1;
+
/*
Get the directory from the temporary file.
*/
@@ -3568,27 +3752,33 @@ int check_temp_dir(char* tmp_file)
Check if the directory exists.
*/
if (!(dirp=my_dir(tmp_dir,MYF(MY_WME))))
- DBUG_RETURN(1);
+ goto end;
my_dirend(dirp);
/*
- Check permissions to create a file.
+ Check permissions to create a file. We use O_TRUNC to ensure that
+ things works even if we happen to have and old file laying around.
*/
if ((fd= mysql_file_create(key_file_misc,
tmp_file, CREATE_MODE,
- O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
+ O_WRONLY | O_BINARY | O_TRUNC | O_NOFOLLOW,
MYF(MY_WME))) < 0)
- DBUG_RETURN(1);
+ goto end;
+ result= 0; // Directory name ok
/*
Clean up.
*/
mysql_file_close(fd, MYF(0));
mysql_file_delete(key_file_misc, tmp_file, MYF(0));
- DBUG_RETURN(0);
+end:
+ check_temp_dir_result= result;
+ mysql_mutex_unlock(&LOCK_thread_count);
+ DBUG_RETURN(result);
}
+
/**
Slave SQL thread entry point.
@@ -3750,6 +3940,15 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
goto err;
}
+ /* Load the set of seen GTIDs, if we did not already. */
+ if (rpl_load_gtid_slave_state(thd))
+ {
+ rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(),
+ "Unable to load replication GTID slave state from mysql.%s: %s",
+ rpl_gtid_slave_state_table_name.str, thd->stmt_da->message());
+ goto err;
+ }
+
/* execute init_slave variable */
if (opt_init_slave.length)
{
@@ -3975,7 +4174,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev)
}
DBUG_ASSERT(cev->inited_from_old);
thd->file_id = cev->file_id = mi->file_id++;
- thd->server_id = cev->server_id;
+ thd->variables.server_id = cev->server_id;
cev_not_written = 1;
if (unlikely(net_request_file(net,cev->fname)))
@@ -4587,16 +4786,18 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
Heartbeat is sent only after an event corresponding to the corrdinates
the heartbeat carries.
- Slave can not have a difference in coordinates except in the only
+ Slave can not have a higher coordinate except in the only
special case when mi->master_log_name, master_log_pos have never
been updated by Rotate event i.e when slave does not have any history
with the master (and thereafter mi->master_log_pos is NULL).
+ Slave can have lower coordinates, if some event from master was omitted.
+
TODO: handling `when' for SHOW SLAVE STATUS' snds behind
*/
if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len())
&& mi->master_log_name != NULL)
- || mi->master_log_pos != hb.log_pos)
+ || mi->master_log_pos > hb.log_pos)
{
/* missed events of heartbeat from the past */
error= ER_SLAVE_HEARTBEAT_FAILURE;
@@ -4648,7 +4849,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mysql_mutex_lock(log_lock);
s_id= uint4korr(buf + SERVER_ID_OFFSET);
- if ((s_id == ::server_id && !mi->rli.replicate_same_server_id) ||
+ if ((s_id == global_system_variables.server_id &&
+ !mi->rli.replicate_same_server_id) ||
/*
the following conjunction deals with IGNORE_SERVER_IDS, if set
If the master is on the ignore list, execution of
@@ -4679,7 +4881,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
IGNORE_SERVER_IDS it increments mi->master_log_pos
as well as rli->group_relay_log_pos.
*/
- if (!(s_id == ::server_id && !mi->rli.replicate_same_server_id) ||
+ if (!(s_id == global_system_variables.server_id &&
+ !mi->rli.replicate_same_server_id) ||
(buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT &&
buf[EVENT_TYPE_OFFSET] != STOP_EVENT))
@@ -5217,6 +5420,27 @@ static Log_event* next_event(Relay_log_info* rli)
inc_event_relay_log_pos()
*/
rli->future_event_relay_log_pos= my_b_tell(cur_log);
+ /*
+ For GTID, allocate a new sub_id for the given domain_id.
+ The sub_id must be allocated in increasing order of binlog order.
+ */
+ if (ev->get_type_code() == GTID_EVENT)
+ {
+ Gtid_log_event *gev= static_cast<Gtid_log_event *>(ev);
+ uint64 sub_id= rpl_global_gtid_slave_state.next_subid(gev->domain_id);
+ if (!sub_id)
+ {
+ errmsg = "slave SQL thread aborted because of out-of-memory error";
+ if (hot_log)
+ mysql_mutex_unlock(log_lock);
+ goto err;
+ }
+ rli->gtid_sub_id= sub_id;
+ rli->current_gtid.server_id= gev->server_id;
+ rli->current_gtid.domain_id= gev->domain_id;
+ rli->current_gtid.seq_no= gev->seq_no;
+ }
+
if (hot_log)
mysql_mutex_unlock(log_lock);
DBUG_RETURN(ev);