summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc668
1 files changed, 581 insertions, 87 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index f6660e5a5c8..8be17860c61 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -40,6 +40,7 @@
#include <errmsg.h>
#include <mysqld_error.h>
#include <mysys_err.h>
+#include "rpl_handler.h"
#ifdef HAVE_REPLICATION
@@ -48,6 +49,10 @@
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
#define MAX_SLAVE_RETRY_PAUSE 5
+/*
+ a parameter of sql_slave_killed() to defer the killed status
+*/
+#define SLAVE_WAIT_GROUP_DONE 60
bool use_slave_mask = 0;
MY_BITMAP slave_error_mask;
char slave_skip_error_names[SHOW_VAR_FUNC_BUFF_SIZE];
@@ -69,6 +74,8 @@ ulonglong relay_log_space_limit = 0;
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
int events_till_abort = -1;
+static pthread_key(Master_info*, RPL_MASTER_INFO);
+
enum enum_slave_reconnect_actions
{
SLAVE_RECON_ACT_REG= 0,
@@ -220,6 +227,7 @@ void unlock_slave_threads(Master_info* mi)
int init_slave()
{
DBUG_ENTER("init_slave");
+ int error= 0;
/*
This is called when mysqld starts. Before client connections are
@@ -231,7 +239,10 @@ int init_slave()
TODO: re-write this to interate through the list of files
for multi-master
*/
- active_mi= new Master_info;
+ active_mi= new Master_info(relay_log_recovery);
+
+ if (pthread_key_create(&RPL_MASTER_INFO, NULL))
+ goto err;
/*
If --slave-skip-errors=... was not used, the string value for the
@@ -250,6 +261,7 @@ int init_slave()
if (!active_mi)
{
sql_print_error("Failed to allocate memory for the master info structure");
+ error= 1;
goto err;
}
@@ -257,6 +269,7 @@ int init_slave()
!master_host, (SLAVE_IO | SLAVE_SQL)))
{
sql_print_error("Failed to initialize the master info structure");
+ error= 1;
goto err;
}
@@ -275,18 +288,69 @@ int init_slave()
SLAVE_IO | SLAVE_SQL))
{
sql_print_error("Failed to create slave threads");
+ error= 1;
goto err;
}
}
- pthread_mutex_unlock(&LOCK_active_mi);
- DBUG_RETURN(0);
err:
pthread_mutex_unlock(&LOCK_active_mi);
- DBUG_RETURN(1);
+ DBUG_RETURN(error);
}
+/*
+ Updates the master info based on the information stored in the
+ relay info and ignores relay logs previously retrieved by the IO
+ thread, which thus starts fetching again based on to the
+ group_master_log_pos and group_master_log_name. Eventually, the old
+ relay logs will be purged by the normal purge mechanism.
+
+ In the feature, we should improve this routine in order to avoid throwing
+ away logs that are safely stored in the disk. Note also that this recovery
+ routine relies on the correctness of the relay-log.info and only tolerates
+ coordinate problems in master.info.
+
+ In this function, there is no need for a mutex as the caller
+ (i.e. init_slave) already has one acquired.
+
+ Specifically, the following structures are updated:
+
+ 1 - mi->master_log_pos <-- rli->group_master_log_pos
+ 2 - mi->master_log_name <-- rli->group_master_log_name
+ 3 - It moves the relay log to the new relay log file, by
+ rli->group_relay_log_pos <-- BIN_LOG_HEADER_SIZE;
+ rli->event_relay_log_pos <-- BIN_LOG_HEADER_SIZE;
+ rli->group_relay_log_name <-- rli->relay_log.get_log_fname();
+ rli->event_relay_log_name <-- rli->relay_log.get_log_fname();
+
+ If there is an error, it returns (1), otherwise returns (0).
+ */
+int init_recovery(Master_info* mi, const char** errmsg)
+{
+ DBUG_ENTER("init_recovery");
+
+ Relay_log_info *rli= &mi->rli;
+ if (rli->group_master_log_name[0])
+ {
+ mi->master_log_pos= max(BIN_LOG_HEADER_SIZE,
+ rli->group_master_log_pos);
+ strmake(mi->master_log_name, rli->group_master_log_name,
+ sizeof(mi->master_log_name)-1);
+
+ sql_print_warning("Recovery from master pos %ld and file %s.",
+ (ulong) mi->master_log_pos, mi->master_log_name);
+
+ strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(),
+ sizeof(rli->group_relay_log_name)-1);
+ strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
+ sizeof(mi->rli.event_relay_log_name)-1);
+
+ rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
+ }
+ DBUG_RETURN(0);
+}
+
/**
Convert slave skip errors bitmap into a printable string.
*/
@@ -519,7 +583,7 @@ terminate_slave_thread(THD *thd,
EINVAL: invalid signal number (can't happen)
ESRCH: thread already killed (can happen, should be ignored)
*/
- IF_DBUG(int err= ) pthread_kill(thd->real_id, thr_client_alarm);
+ int err __attribute__((unused))= pthread_kill(thd->real_id, thr_client_alarm);
DBUG_ASSERT(err != EINVAL);
#endif
thd->awake(THD::NOT_KILLED);
@@ -730,44 +794,92 @@ static bool io_slave_killed(THD* thd, Master_info* mi)
DBUG_RETURN(mi->abort_slave || abort_loop || thd->killed);
}
+/**
+ The function analyzes a possible killed status and makes
+ a decision whether to accept it or not.
+ Normally upon accepting the sql thread goes to shutdown.
+ In the event of deffering decision @rli->last_event_start_time waiting
+ timer is set to force the killed status be accepted upon its expiration.
+
+ @param thd pointer to a THD instance
+ @param rli pointer to Relay_log_info instance
+ @return TRUE the killed status is recognized, FALSE a possible killed
+ status is deferred.
+*/
static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
{
+ bool ret= FALSE;
DBUG_ENTER("sql_slave_killed");
DBUG_ASSERT(rli->sql_thd == thd);
DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
if (abort_loop || thd->killed || rli->abort_slave)
{
- if (rli->abort_slave && rli->is_in_group() &&
- thd->transaction.all.modified_non_trans_table)
- DBUG_RETURN(0);
- /*
- If we are in an unsafe situation (stopping could corrupt replication),
- we give one minute to the slave SQL thread of grace before really
- terminating, in the hope that it will be able to read more events and
- the unsafe situation will soon be left. Note that this one minute starts
- from the last time anything happened in the slave SQL thread. So it's
- really one minute of idleness, we don't timeout if the slave SQL thread
- is actively working.
- */
- if (rli->last_event_start_time == 0)
- DBUG_RETURN(1);
- DBUG_PRINT("info", ("Slave SQL thread is in an unsafe situation, giving "
- "it some grace period"));
- if (difftime(time(0), rli->last_event_start_time) > 60)
+ if (thd->transaction.all.modified_non_trans_table && rli->is_in_group())
{
- rli->report(ERROR_LEVEL, 0,
- "SQL thread had to stop in an unsafe situation, in "
- "the middle of applying updates to a "
- "non-transactional table without any primary key. "
- "There is a risk of duplicate updates when the slave "
- "SQL thread is restarted. Please check your tables' "
- "contents after restart.");
- DBUG_RETURN(1);
+ char msg_stopped[]=
+ "... The slave SQL is stopped, leaving the current group "
+ "of events unfinished with a non-transaction table changed. "
+ "If the group consists solely of Row-based events, you can try "
+ "restarting the slave with --slave-exec-mode=IDEMPOTENT, which "
+ "ignores duplicate key, key not found, and similar errors (see "
+ "documentation for details).";
+
+ if (rli->abort_slave)
+ {
+ DBUG_PRINT("info", ("Slave SQL thread is being stopped in the middle of"
+ " a group having updated a non-trans table, giving"
+ " it some grace period"));
+
+ /*
+ Slave sql thread shutdown in face of unfinished group modified
+ Non-trans table is handled via a timer. The slave may eventually
+ give out to complete the current group and in that case there
+ might be issues at consequent slave restart, see the error message.
+ WL#2975 offers a robust solution requiring to store the last exectuted
+ event's coordinates along with the group's coordianates
+ instead of waiting with @c last_event_start_time the timer.
+ */
+
+ if (rli->last_event_start_time == 0)
+ rli->last_event_start_time= my_time(0);
+ ret= difftime(my_time(0), rli->last_event_start_time) <=
+ SLAVE_WAIT_GROUP_DONE ? FALSE : TRUE;
+
+ DBUG_EXECUTE_IF("stop_slave_middle_group",
+ DBUG_EXECUTE_IF("incomplete_group_in_relay_log",
+ ret= TRUE;);); // time is over
+
+ if (ret == 0)
+ {
+ rli->report(WARNING_LEVEL, 0,
+ "slave SQL thread is being stopped in the middle "
+ "of applying of a group having updated a non-transaction "
+ "table; waiting for the group completion ... ");
+ }
+ else
+ {
+ rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ ER(ER_SLAVE_FATAL_ERROR), msg_stopped);
+ }
+ }
+ else
+ {
+ ret= TRUE;
+ rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, ER(ER_SLAVE_FATAL_ERROR),
+ msg_stopped);
+ }
+ }
+ else
+ {
+ ret= TRUE;
}
}
- DBUG_RETURN(0);
+ if (ret)
+ rli->last_event_start_time= 0;
+
+ DBUG_RETURN(ret);
}
@@ -860,6 +972,126 @@ int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
DBUG_RETURN(1);
}
+int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
+{
+ char buf[16];
+ DBUG_ENTER("init_floatvar_from_file");
+
+
+ if (my_b_gets(f, buf, sizeof(buf)))
+ {
+ if (sscanf(buf, "%f", var) != 1)
+ DBUG_RETURN(1);
+ else
+ DBUG_RETURN(0);
+ }
+ else if (default_val != 0.0)
+ {
+ *var = default_val;
+ DBUG_RETURN(0);
+ }
+ DBUG_RETURN(1);
+}
+
+
+/**
+ A master info read method
+
+ This function is called from @c init_master_info() along with
+ relatives to restore some of @c active_mi members.
+ Particularly, this function is responsible for restoring
+ IGNORE_SERVER_IDS list of servers whose events the slave is
+ going to ignore (to not log them in the relay log).
+ Items being read are supposed to be decimal output of values of a
+ type shorter or equal of @c long and separated by the single space.
+
+ @param arr @c DYNAMIC_ARRAY pointer to storage for servers id
+ @param f @c IO_CACHE pointer to the source file
+
+ @retval 0 All OK
+ @retval non-zero An error
+*/
+
+int init_dynarray_intvar_from_file(DYNAMIC_ARRAY* arr, IO_CACHE* f)
+{
+ int ret= 0;
+ char buf[16 * (sizeof(long)*4 + 1)]; // static buffer to use most of times
+ char *buf_act= buf; // actual buffer can be dynamic if static is short
+ char *token, *last;
+ uint num_items; // number of items of `arr'
+ size_t read_size;
+ DBUG_ENTER("init_dynarray_intvar_from_file");
+
+ if ((read_size= my_b_gets(f, buf_act, sizeof(buf))) == 0)
+ {
+ return 0; // no line in master.info
+ }
+ if (read_size + 1 == sizeof(buf) && buf[sizeof(buf) - 2] != '\n')
+ {
+ /*
+ short read happend; allocate sufficient memory and make the 2nd read
+ */
+ char buf_work[(sizeof(long)*3 + 1)*16];
+ memcpy(buf_work, buf, sizeof(buf_work));
+ num_items= atoi(strtok_r(buf_work, " ", &last));
+ size_t snd_size;
+ /*
+ max size lower bound approximate estimation bases on the formula:
+ (the items number + items themselves) *
+ (decimal size + space) - 1 + `\n' + '\0'
+ */
+ size_t max_size= (1 + num_items) * (sizeof(long)*3 + 1) + 1;
+ buf_act= (char*) my_malloc(max_size, MYF(MY_WME));
+ memcpy(buf_act, buf, read_size);
+ snd_size= my_b_gets(f, buf_act + read_size, max_size - read_size);
+ if (snd_size == 0 ||
+ (snd_size + 1 == max_size - read_size) && buf[max_size - 2] != '\n')
+ {
+ /*
+ failure to make the 2nd read or short read again
+ */
+ ret= 1;
+ goto err;
+ }
+ }
+ token= strtok_r(buf_act, " ", &last);
+ if (token == NULL)
+ {
+ ret= 1;
+ goto err;
+ }
+ num_items= atoi(token);
+ for (uint i=0; i < num_items; i++)
+ {
+ token= strtok_r(NULL, " ", &last);
+ if (token == NULL)
+ {
+ ret= 1;
+ goto err;
+ }
+ else
+ {
+ ulong val= atol(token);
+ insert_dynamic(arr, (uchar *) &val);
+ }
+ }
+err:
+ if (buf_act != buf)
+ my_free(buf_act, MYF(0));
+ DBUG_RETURN(ret);
+}
+
+
+static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
+{
+ if (io_slave_killed(thd, mi))
+ {
+ if (info && global_system_variables.log_warnings)
+ sql_print_information("%s", info);
+ return TRUE;
+ }
+ return FALSE;
+}
/*
Check if the error is caused by network.
@@ -1028,7 +1260,7 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
(master_res= mysql_store_result(mysql)) &&
(master_row= mysql_fetch_row(master_res)))
{
- if ((::server_id == strtoul(master_row[1], 0, 10)) &&
+ if ((::server_id == (mi->master_id= strtoul(master_row[1], 0, 10))) &&
!mi->rli.replicate_same_server_id)
{
errmsg= "The slave I/O thread stops because master and slave have equal \
@@ -1066,6 +1298,13 @@ maybe it is a *VERY OLD MASTER*.");
mysql_free_result(master_res);
master_res= NULL;
}
+ if (mi->master_id == 0 && mi->ignore_server_ids.elements > 0)
+ {
+ errmsg= "Slave configured with server id filtering could not detect the master server id.";
+ err_code= ER_SLAVE_FATAL_ERROR;
+ sprintf(err_buff, ER(err_code), errmsg);
+ goto err;
+ }
/*
Check that the master's global character_set_server and ours are the same.
@@ -1189,6 +1428,31 @@ when it try to get the value of TIME_ZONE global variable from master.";
}
}
+ if (mi->heartbeat_period != 0.0)
+ {
+ char llbuf[22];
+ const char query_format[]= "SET @master_heartbeat_period= %s";
+ char query[sizeof(query_format) - 2 + sizeof(llbuf)];
+ /*
+ the period is an ulonglong of nano-secs.
+ */
+ llstr((ulonglong) (mi->heartbeat_period*1000000000UL), llbuf);
+ my_sprintf(query, (query, query_format, llbuf));
+
+ if (mysql_real_query(mysql, query, strlen(query))
+ && !check_io_slave_killed(mi->io_thd, mi, NULL))
+ {
+ errmsg= "The slave I/O thread stops because SET @master_heartbeat_period "
+ "on master failed.";
+ err_code= ER_SLAVE_FATAL_ERROR;
+ sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
+ mysql_free_result(mysql_store_result(mysql));
+ goto err;
+ }
+ mysql_free_result(mysql_store_result(mysql));
+ }
+
+
err:
if (errmsg)
{
@@ -1274,7 +1538,7 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
else
{
/* Clear the OK result of mysql_rm_table(). */
- thd->main_da.reset_diagnostics_area();
+ thd->stmt_da->reset_diagnostics_area();
}
}
@@ -1298,7 +1562,7 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
goto err; // mysql_parse took care of the error send
thd_proc_info(thd, "Opening master dump table");
- thd->main_da.reset_diagnostics_area(); /* cleanup from CREATE_TABLE */
+ thd->stmt_da->reset_diagnostics_area(); /* cleanup from CREATE_TABLE */
/*
Note: If this function starts to fail for MERGE tables,
change the next two lines to these:
@@ -1605,8 +1869,12 @@ bool show_master_info(THD* thd, Master_info* mi)
field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, MYSQL_TYPE_LONG));
field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
+ field_list.push_back(new Item_empty_string("Replicate_Ignore_Server_Ids",
+ FN_REFLEN));
+ field_list.push_back(new Item_return_int("Master_Server_Id", sizeof(ulong),
+ MYSQL_TYPE_LONG));
- if (protocol->send_fields(&field_list,
+ if (protocol->send_result_set_metadata(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(TRUE);
@@ -1640,7 +1908,8 @@ bool show_master_info(THD* thd, Master_info* mi)
protocol->store((ulonglong) mi->rli.group_relay_log_pos);
protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
protocol->store(mi->slave_running == MYSQL_SLAVE_RUN_CONNECT ?
- "Yes" : "No", &my_charset_bin);
+ "Yes" : (mi->slave_running == MYSQL_SLAVE_RUN_NOT_CONNECT ?
+ "Connecting" : "No"), &my_charset_bin);
protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
protocol->store(rpl_filter->get_do_db());
protocol->store(rpl_filter->get_ignore_db());
@@ -1726,6 +1995,32 @@ bool show_master_info(THD* thd, Master_info* mi)
protocol->store(mi->rli.last_error().number);
// Last_SQL_Error
protocol->store(mi->rli.last_error().message, &my_charset_bin);
+ // Replicate_Ignore_Server_Ids
+ {
+ char buff[FN_REFLEN];
+ ulong i, cur_len;
+ for (i= 0, buff[0]= 0, cur_len= 0;
+ i < mi->ignore_server_ids.elements; i++)
+ {
+ ulong s_id, slen;
+ char sbuff[FN_REFLEN];
+ get_dynamic(&mi->ignore_server_ids, (uchar*) &s_id, i);
+ slen= my_sprintf(sbuff, (sbuff, (i==0? "%lu" : ", %lu"), s_id));
+ if (cur_len + slen + 4 > FN_REFLEN)
+ {
+ /*
+ break the loop whenever remained space could not fit
+ ellipses on the next cycle
+ */
+ my_sprintf(buff + cur_len, (buff + cur_len, "..."));
+ break;
+ }
+ cur_len += my_sprintf(buff + cur_len, (buff + cur_len, "%s", sbuff));
+ }
+ protocol->store(buff, &my_charset_bin);
+ }
+ // Master_Server_id
+ protocol->store((uint32) mi->master_id);
pthread_mutex_unlock(&mi->rli.err_lock);
pthread_mutex_unlock(&mi->err_lock);
@@ -1869,17 +2164,22 @@ static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
}
-static int request_dump(MYSQL* mysql, Master_info* mi,
- bool *suppress_warnings)
+static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
+ bool *suppress_warnings)
{
uchar buf[FN_REFLEN + 10];
int len;
- int binlog_flags = 0; // for now
+ ushort binlog_flags = 0; // for now
char* logname = mi->master_log_name;
DBUG_ENTER("request_dump");
*suppress_warnings= FALSE;
+ if (RUN_HOOK(binlog_relay_io,
+ before_request_transmit,
+ (thd, mi, binlog_flags)))
+ DBUG_RETURN(1);
+
// TODO if big log files: Change next to int8store()
int4store(buf, (ulong) mi->master_log_pos);
int2store(buf + 4, binlog_flags);
@@ -2012,7 +2312,7 @@ static int has_temporary_error(THD *thd)
DBUG_ENTER("has_temporary_error");
DBUG_EXECUTE_IF("all_errors_are_temporary_errors",
- if (thd->main_da.is_error())
+ if (thd->stmt_da->is_error())
{
thd->clear_error();
my_error(ER_LOCK_DEADLOCK, MYF(0));
@@ -2031,20 +2331,21 @@ static int has_temporary_error(THD *thd)
currently, InnoDB deadlock detected by InnoDB or lock
wait timeout (innodb_lock_wait_timeout exceeded
*/
- if (thd->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
- thd->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
+ if (thd->stmt_da->sql_errno() == ER_LOCK_DEADLOCK ||
+ thd->stmt_da->sql_errno() == ER_LOCK_WAIT_TIMEOUT)
DBUG_RETURN(1);
#ifdef HAVE_NDB_BINLOG
/*
currently temporary error set in ndbcluster
*/
- List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
+ List_iterator_fast<MYSQL_ERROR> it(thd->warning_info->warn_list());
MYSQL_ERROR *err;
while ((err= it++))
{
- DBUG_PRINT("info", ("has warning %d %s", err->code, err->msg));
- switch (err->code)
+ DBUG_PRINT("info", ("has condition %d %s", err->get_sql_errno(),
+ err->get_message_text()));
+ switch (err->get_sql_errno())
{
case ER_GET_TEMPORARY_ERRMSG:
DBUG_RETURN(1);
@@ -2274,6 +2575,27 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
delete ev;
DBUG_RETURN(1);
}
+
+ { /**
+ The following failure injecion works in cooperation with tests
+ setting @@global.debug= 'd,incomplete_group_in_relay_log'.
+ Xid or Commit events are not executed to force the slave sql
+ read hanging if the realy log does not have any more events.
+ */
+ DBUG_EXECUTE_IF("incomplete_group_in_relay_log",
+ if ((ev->get_type_code() == XID_EVENT) ||
+ ((ev->get_type_code() == QUERY_EVENT) &&
+ strcmp("COMMIT", ((Query_log_event *) ev)->query) == 0))
+ {
+ DBUG_ASSERT(thd->transaction.all.modified_non_trans_table);
+ rli->abort_slave= 1;
+ pthread_mutex_unlock(&rli->data_lock);
+ delete ev;
+ rli->inc_event_relay_log_pos();
+ DBUG_RETURN(0);
+ };);
+ }
+
exec_res= apply_event_and_update_pos(ev, thd, rli);
/*
@@ -2377,18 +2699,6 @@ on this slave.\
}
-static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
-{
- if (io_slave_killed(thd, mi))
- {
- if (info && global_system_variables.log_warnings)
- sql_print_information("%s", info);
- return TRUE;
- }
- return FALSE;
-}
-
-
/**
@brief Try to reconnect slave IO thread.
@@ -2528,6 +2838,16 @@ pthread_handler_t handle_slave_io(void *arg)
mi->master_log_name,
llstr(mi->master_log_pos,llbuff)));
+ /* This must be called before run any binlog_relay_io hooks */
+ my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi);
+
+ if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)))
+ {
+ mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ ER(ER_SLAVE_FATAL_ERROR), "Failed to run 'thread_start' hook");
+ goto err;
+ }
+
if (!(mi->mysql = mysql = mysql_init(NULL)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
@@ -2617,7 +2937,7 @@ connected:
while (!io_slave_killed(thd,mi))
{
thd_proc_info(thd, "Requesting binlog dump");
- if (request_dump(mysql, mi, &suppress_warnings))
+ if (request_dump(thd, mysql, mi, &suppress_warnings))
{
sql_print_error("Failed on request_dump()");
if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
@@ -2637,6 +2957,7 @@ requesting master dump") ||
goto err;
goto connected;
});
+ const char *event_buf;
DBUG_ASSERT(mi->last_error().number == 0);
while (!io_slave_killed(thd,mi))
@@ -2697,14 +3018,37 @@ Stopping slave I/O thread due to out-of-memory error from master");
retry_count=0; // ok event, reset retry counter
thd_proc_info(thd, "Queueing master event to the relay log");
- if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
- event_len))
+ event_buf= (const char*)mysql->net.read_pos + 1;
+ if (RUN_HOOK(binlog_relay_io, after_read_event,
+ (thd, mi,(const char*)mysql->net.read_pos + 1,
+ event_len, &event_buf, &event_len)))
+ {
+ mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ ER(ER_SLAVE_FATAL_ERROR),
+ "Failed to run 'after_read_event' hook");
+ goto err;
+ }
+
+ /* XXX: 'synced' should be updated by queue_event to indicate
+ whether event has been synced to disk */
+ bool synced= 0;
+ if (queue_event(mi, event_buf, event_len))
{
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"could not queue event from master");
goto err;
}
+
+ if (RUN_HOOK(binlog_relay_io, after_queue_event,
+ (thd, mi, event_buf, event_len, synced)))
+ {
+ mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ ER(ER_SLAVE_FATAL_ERROR),
+ "Failed to run 'after_queue_event' hook");
+ goto err;
+ }
+
if (flush_master_info(mi, 1))
{
sql_print_error("Failed to flush master info file");
@@ -2750,6 +3094,7 @@ err:
// print the current replication position
sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s",
IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
+ RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi));
thd->set_query(NULL, 0);
thd->reset_db(NULL, 0);
if (mysql)
@@ -2979,9 +3324,9 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
if (check_temp_dir(rli->slave_patternload_file))
{
- rli->report(ERROR_LEVEL, thd->main_da.sql_errno(),
+ rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(),
"Unable to use slave's temporary directory %s - %s",
- slave_load_tmpdir, thd->main_da.message());
+ slave_load_tmpdir, thd->stmt_da->message());
goto err;
}
@@ -2991,7 +3336,7 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
if (thd->is_slave_error)
{
- rli->report(ERROR_LEVEL, thd->main_da.sql_errno(),
+ rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(),
"Slave SQL thread aborted. Can't execute init_slave query");
goto err;
}
@@ -3035,20 +3380,20 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
if (thd->is_error())
{
- char const *const errmsg= thd->main_da.message();
+ char const *const errmsg= thd->stmt_da->message();
DBUG_PRINT("info",
- ("thd->main_da.sql_errno()=%d; rli->last_error.number=%d",
- thd->main_da.sql_errno(), last_errno));
+ ("thd->stmt_da->sql_errno()=%d; rli->last_error.number=%d",
+ thd->stmt_da->sql_errno(), last_errno));
if (last_errno == 0)
{
/*
This function is reporting an error which was not reported
while executing exec_relay_log_event().
*/
- rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), "%s", errmsg);
+ rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(), "%s", errmsg);
}
- else if (last_errno != thd->main_da.sql_errno())
+ else if (last_errno != thd->stmt_da->sql_errno())
{
/*
* An error was reported while executing exec_relay_log_event()
@@ -3057,12 +3402,12 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
* what caused the problem.
*/
sql_print_error("Slave (additional info): %s Error_code: %d",
- errmsg, thd->main_da.sql_errno());
+ errmsg, thd->stmt_da->sql_errno());
}
}
/* Print any warnings issued */
- List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
+ List_iterator_fast<MYSQL_ERROR> it(thd->warning_info->warn_list());
MYSQL_ERROR *err;
/*
Added controlled slave thread cancel for replication
@@ -3071,9 +3416,9 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
bool udf_error = false;
while ((err= it++))
{
- if (err->code == ER_CANT_OPEN_LIBRARY)
+ if (err->get_sql_errno() == ER_CANT_OPEN_LIBRARY)
udf_error = true;
- sql_print_warning("Slave: %s Error_code: %d",err->msg, err->code);
+ sql_print_warning("Slave: %s Error_code: %d", err->get_message_text(), err->get_sql_errno());
}
if (udf_error)
sql_print_error("Error loading user-defined library, slave SQL "
@@ -3555,9 +3900,11 @@ static int queue_old_event(Master_info *mi, const char *buf,
static int queue_event(Master_info* mi,const char* buf, ulong event_len)
{
int error= 0;
+ String error_msg;
ulong inc_pos;
Relay_log_info *rli= &mi->rli;
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
+ ulong s_id;
DBUG_ENTER("queue_event");
LINT_INIT(inc_pos);
@@ -3589,7 +3936,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
if (unlikely(process_io_rotate(mi,&rev)))
{
- error= 1;
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
goto err;
}
/*
@@ -3616,7 +3963,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
Log_event::read_log_event(buf, event_len, &errmsg,
mi->rli.relay_log.description_event_for_queue)))
{
- error= 2;
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
goto err;
}
delete mi->rli.relay_log.description_event_for_queue;
@@ -3635,6 +3982,56 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
}
break;
+
+ case HEARTBEAT_LOG_EVENT:
+ {
+ /*
+ HB (heartbeat) cannot come before RL (Relay)
+ */
+ char llbuf[22];
+ Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
+ if (!hb.is_valid())
+ {
+ error= ER_SLAVE_HEARTBEAT_FAILURE;
+ error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
+ error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
+ error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident()));
+ error_msg.append(STRING_WITH_LEN(" log_pos "));
+ llstr(hb.log_pos, llbuf);
+ error_msg.append(llbuf, strlen(llbuf));
+ goto err;
+ }
+ mi->received_heartbeats++;
+ /*
+ compare local and event's versions of log_file, log_pos.
+
+ Heartbeat is sent only after an event corresponding to the corrdinates
+ the heartbeat carries.
+ Slave can not have a difference in coordinates except in the only
+ special case when mi->master_log_name, master_log_pos have never
+ been updated by Rotate event i.e when slave does not have any history
+ with the master (and thereafter mi->master_log_pos is NULL).
+
+ TODO: handling `when' for SHOW SLAVE STATUS' snds behind
+ */
+ if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len())
+ && mi->master_log_name != NULL)
+ || mi->master_log_pos != hb.log_pos)
+ {
+ /* missed events of heartbeat from the past */
+ error= ER_SLAVE_HEARTBEAT_FAILURE;
+ error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
+ error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
+ error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident()));
+ error_msg.append(STRING_WITH_LEN(" log_pos "));
+ llstr(hb.log_pos, llbuf);
+ error_msg.append(llbuf, strlen(llbuf));
+ goto err;
+ }
+ goto skip_relay_logging;
+ }
+ break;
+
default:
inc_pos= event_len;
break;
@@ -3654,9 +4051,20 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
*/
pthread_mutex_lock(log_lock);
-
- if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
- !mi->rli.replicate_same_server_id)
+ s_id= uint4korr(buf + SERVER_ID_OFFSET);
+ if ((s_id == ::server_id && !mi->rli.replicate_same_server_id) ||
+ /*
+ the following conjunction deals with IGNORE_SERVER_IDS, if set
+ If the master is on the ignore list, execution of
+ format description log events and rotate events is necessary.
+ */
+ (mi->ignore_server_ids.elements > 0 &&
+ mi->shall_ignore_server_id(s_id) &&
+ /* everything is filtered out from non-master */
+ (s_id != mi->master_id ||
+ /* for the master meta information is necessary */
+ buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
+ buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT)))
{
/*
Do not write it to the relay log.
@@ -3671,10 +4079,14 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
But events which were generated by this slave and which do not exist in
the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
mi->master_log_pos.
+ If the event is originated remotely and is being filtered out by
+ IGNORE_SERVER_IDS it increments mi->master_log_pos
+ as well as rli->group_relay_log_pos.
*/
- if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
- buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
- buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
+ if (!(s_id == ::server_id && !mi->rli.replicate_same_server_id) ||
+ buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
+ buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT &&
+ buf[EVENT_TYPE_OFFSET] != STOP_EVENT)
{
mi->master_log_pos+= inc_pos;
memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
@@ -3682,8 +4094,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
rli->ign_master_log_pos_end= mi->master_log_pos;
}
rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
- DBUG_PRINT("info", ("master_log_pos: %lu, event originating from the same server, ignored",
- (ulong) mi->master_log_pos));
+ DBUG_PRINT("info", ("master_log_pos: %lu, event originating from %u server, ignored",
+ (ulong) mi->master_log_pos, uint4korr(buf + SERVER_ID_OFFSET)));
}
else
{
@@ -3695,15 +4107,23 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
}
else
- error= 3;
+ {
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ }
rli->ign_master_log_name_end[0]= 0; // last event is not ignored
}
pthread_mutex_unlock(log_lock);
-
+skip_relay_logging:
+
err:
pthread_mutex_unlock(&mi->data_lock);
DBUG_PRINT("info", ("error: %d", error));
+ if (error)
+ mi->report(ERROR_LEVEL, error, ER(error),
+ (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
+ "could not queue event from master" :
+ error_msg.ptr());
DBUG_RETURN(error);
}
@@ -3909,6 +4329,71 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
}
+MYSQL *rpl_connect_master(MYSQL *mysql)
+{
+ THD *thd= current_thd;
+ Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO);
+ if (!mi)
+ {
+ sql_print_error("'rpl_connect_master' must be called in slave I/O thread context.");
+ return NULL;
+ }
+
+ bool allocated= false;
+
+ if (!mysql)
+ {
+ if(!(mysql= mysql_init(NULL)))
+ {
+ sql_print_error("rpl_connect_master: failed in mysql_init()");
+ return NULL;
+ }
+ allocated= true;
+ }
+
+ /*
+ XXX: copied from connect_to_master, this function should not
+ change the slave status, so we cannot use connect_to_master
+ directly
+
+ TODO: make this part a seperate function to eliminate duplication
+ */
+ mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
+ mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
+
+#ifdef HAVE_OPENSSL
+ if (mi->ssl)
+ {
+ mysql_ssl_set(mysql,
+ mi->ssl_key[0]?mi->ssl_key:0,
+ mi->ssl_cert[0]?mi->ssl_cert:0,
+ mi->ssl_ca[0]?mi->ssl_ca:0,
+ mi->ssl_capath[0]?mi->ssl_capath:0,
+ mi->ssl_cipher[0]?mi->ssl_cipher:0);
+ mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT,
+ &mi->ssl_verify_server_cert);
+ }
+#endif
+
+ mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
+ /* This one is not strictly needed but we have it here for completeness */
+ mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);
+
+ if (io_slave_killed(thd, mi)
+ || !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
+ mi->port, 0, 0))
+ {
+ if (!io_slave_killed(thd, mi))
+ sql_print_error("rpl_connect_master: error connecting to master: %s (server_error: %d)",
+ mysql_error(mysql), mysql_errno(mysql));
+
+ if (allocated)
+ mysql_close(mysql); // this will free the object
+ return NULL;
+ }
+ return mysql;
+}
+
/*
Store the file and position where the execute-slave thread are in the
relay log.
@@ -3962,7 +4447,14 @@ bool flush_relay_log_info(Relay_log_info* rli)
error=1;
if (flush_io_cache(file))
error=1;
-
+ if (sync_relayloginfo_period &&
+ !error &&
+ ++(rli->sync_counter) >= sync_relayloginfo_period)
+ {
+ if (my_sync(rli->info_fd, MYF(MY_WME)))
+ error=1;
+ rli->sync_counter= 0;
+ }
/* Flushing the relay log is done by the slave I/O thread */
DBUG_RETURN(error);
}
@@ -4211,8 +4703,8 @@ static Log_event* next_event(Relay_log_info* rli)
*/
pthread_mutex_unlock(&rli->log_space_lock);
pthread_cond_broadcast(&rli->log_space_cond);
- // Note that wait_for_update unlocks lock_log !
- rli->relay_log.wait_for_update(rli->sql_thd, 1);
+ // Note that wait_for_update_relay_log unlocks lock_log !
+ rli->relay_log.wait_for_update_relay_log(rli->sql_thd);
// re-acquire data lock since we released it earlier
pthread_mutex_lock(&rli->data_lock);
rli->last_master_timestamp= save_timestamp;
@@ -4369,6 +4861,8 @@ void rotate_relay_log(Master_info* mi)
DBUG_ENTER("rotate_relay_log");
Relay_log_info* rli= &mi->rli;
+ DBUG_EXECUTE_IF("crash_before_rotate_relaylog", abort(););
+
/* We don't lock rli->run_lock. This would lead to deadlocks. */
pthread_mutex_lock(&mi->run_lock);