summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc1395
1 files changed, 891 insertions, 504 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 6d266245460..a96ee505fe9 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -24,22 +24,33 @@
replication slave.
*/
-#include "mysql_priv.h"
-
-#include <mysql.h>
-#include <myisam.h>
+#include "sql_priv.h"
+#include "my_global.h"
#include "slave.h"
+#include "sql_parse.h" // execute_init_command
+#include "sql_table.h" // mysql_rm_table
#include "rpl_mi.h"
#include "rpl_rli.h"
#include "sql_repl.h"
#include "rpl_filter.h"
#include "repl_failsafe.h"
+#include "transaction.h"
#include <thr_alarm.h>
#include <my_dir.h>
#include <sql_common.h>
#include <errmsg.h>
#include <mysqld_error.h>
#include <mysys_err.h>
+#include "rpl_handler.h"
+#include <signal.h>
+#include <mysql.h>
+#include <myisam.h>
+
+#include "sql_base.h" // close_thread_tables
+#include "tztime.h" // struct Time_zone
+#include "log_event.h" // Rotate_log_event,
+ // Create_file_log_event,
+ // Format_description_log_event
#ifdef HAVE_REPLICATION
@@ -49,6 +60,10 @@
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
#define MAX_SLAVE_RETRY_PAUSE 5
+/*
+ a parameter of sql_slave_killed() to defer the killed status
+*/
+#define SLAVE_WAIT_GROUP_DONE 60
bool use_slave_mask = 0;
MY_BITMAP slave_error_mask;
char slave_skip_error_names[SHOW_VAR_FUNC_BUFF_SIZE];
@@ -68,7 +83,8 @@ ulonglong relay_log_space_limit = 0;
*/
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
-int events_till_abort = -1;
+
+static pthread_key(Master_info*, RPL_MASTER_INFO);
enum enum_slave_reconnect_actions
{
@@ -138,15 +154,12 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
bool reconnect, bool suppress_warnings);
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
void* thread_killed_arg);
-static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
-static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
- const char* table_name, bool overwrite);
static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi);
static Log_event* next_event(Relay_log_info* rli);
static int queue_event(Master_info* mi,const char* buf,ulong event_len);
static int terminate_slave_thread(THD *thd,
- pthread_mutex_t *term_lock,
- pthread_cond_t *term_cond,
+ mysql_mutex_t *term_lock,
+ mysql_cond_t *term_cond,
volatile uint *slave_running,
bool skip_lock);
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
@@ -195,8 +208,8 @@ void lock_slave_threads(Master_info* mi)
DBUG_ENTER("lock_slave_threads");
//TODO: see if we can do this without dual mutex
- pthread_mutex_lock(&mi->run_lock);
- pthread_mutex_lock(&mi->rli.run_lock);
+ mysql_mutex_lock(&mi->run_lock);
+ mysql_mutex_lock(&mi->rli.run_lock);
DBUG_VOID_RETURN;
}
@@ -210,29 +223,58 @@ void unlock_slave_threads(Master_info* mi)
DBUG_ENTER("unlock_slave_threads");
//TODO: see if we can do this without dual mutex
- pthread_mutex_unlock(&mi->rli.run_lock);
- pthread_mutex_unlock(&mi->run_lock);
+ mysql_mutex_unlock(&mi->rli.run_lock);
+ mysql_mutex_unlock(&mi->run_lock);
DBUG_VOID_RETURN;
}
+#ifdef HAVE_PSI_INTERFACE
+static PSI_thread_key key_thread_slave_io, key_thread_slave_sql;
+
+static PSI_thread_info all_slave_threads[]=
+{
+ { &key_thread_slave_io, "slave_io", PSI_FLAG_GLOBAL},
+ { &key_thread_slave_sql, "slave_sql", PSI_FLAG_GLOBAL}
+};
+
+static void init_slave_psi_keys(void)
+{
+ const char* category= "sql";
+ int count;
+
+ if (PSI_server == NULL)
+ return;
+
+ count= array_elements(all_slave_threads);
+ PSI_server->register_thread(category, all_slave_threads, count);
+}
+#endif /* HAVE_PSI_INTERFACE */
/* Initialize slave structures */
int init_slave()
{
DBUG_ENTER("init_slave");
+ int error= 0;
+
+#ifdef HAVE_PSI_INTERFACE
+ init_slave_psi_keys();
+#endif
/*
This is called when mysqld starts. Before client connections are
accepted. However bootstrap may conflict with us if it does START SLAVE.
So it's safer to take the lock.
*/
- pthread_mutex_lock(&LOCK_active_mi);
+ mysql_mutex_lock(&LOCK_active_mi);
/*
TODO: re-write this to interate through the list of files
for multi-master
*/
- active_mi= new Master_info;
+ active_mi= new Master_info(relay_log_recovery);
+
+ if (pthread_key_create(&RPL_MASTER_INFO, NULL))
+ goto err;
/*
If --slave-skip-errors=... was not used, the string value for the
@@ -251,22 +293,21 @@ int init_slave()
if (!active_mi)
{
sql_print_error("Failed to allocate memory for the master info structure");
+ error= 1;
goto err;
}
if (init_master_info(active_mi,master_info_file,relay_log_info_file,
- !master_host, (SLAVE_IO | SLAVE_SQL)))
+ 1, (SLAVE_IO | SLAVE_SQL)))
{
sql_print_error("Failed to initialize the master info structure");
+ error= 1;
goto err;
}
- if (server_id && !master_host && active_mi->host[0])
- master_host= active_mi->host;
-
/* If server id is not set, start_slave_thread() will say it */
- if (master_host && !opt_skip_slave_start)
+ if (active_mi->host[0] && !opt_skip_slave_start)
{
if (start_slave_threads(1 /* need mutex */,
0 /* no wait for start*/,
@@ -276,18 +317,69 @@ int init_slave()
SLAVE_IO | SLAVE_SQL))
{
sql_print_error("Failed to create slave threads");
+ error= 1;
goto err;
}
}
- pthread_mutex_unlock(&LOCK_active_mi);
- DBUG_RETURN(0);
err:
- pthread_mutex_unlock(&LOCK_active_mi);
- DBUG_RETURN(1);
+ mysql_mutex_unlock(&LOCK_active_mi);
+ DBUG_RETURN(error);
}
+/*
+ Updates the master info based on the information stored in the
+ relay info and ignores relay logs previously retrieved by the IO
+ thread, which thus starts fetching again based on to the
+ group_master_log_pos and group_master_log_name. Eventually, the old
+ relay logs will be purged by the normal purge mechanism.
+
+ In the feature, we should improve this routine in order to avoid throwing
+ away logs that are safely stored in the disk. Note also that this recovery
+ routine relies on the correctness of the relay-log.info and only tolerates
+ coordinate problems in master.info.
+
+ In this function, there is no need for a mutex as the caller
+ (i.e. init_slave) already has one acquired.
+
+ Specifically, the following structures are updated:
+
+ 1 - mi->master_log_pos <-- rli->group_master_log_pos
+ 2 - mi->master_log_name <-- rli->group_master_log_name
+ 3 - It moves the relay log to the new relay log file, by
+ rli->group_relay_log_pos <-- BIN_LOG_HEADER_SIZE;
+ rli->event_relay_log_pos <-- BIN_LOG_HEADER_SIZE;
+ rli->group_relay_log_name <-- rli->relay_log.get_log_fname();
+ rli->event_relay_log_name <-- rli->relay_log.get_log_fname();
+
+ If there is an error, it returns (1), otherwise returns (0).
+ */
+int init_recovery(Master_info* mi, const char** errmsg)
+{
+ DBUG_ENTER("init_recovery");
+
+ Relay_log_info *rli= &mi->rli;
+ if (rli->group_master_log_name[0])
+ {
+ mi->master_log_pos= max(BIN_LOG_HEADER_SIZE,
+ rli->group_master_log_pos);
+ strmake(mi->master_log_name, rli->group_master_log_name,
+ sizeof(mi->master_log_name)-1);
+
+ sql_print_warning("Recovery from master pos %ld and file %s.",
+ (ulong) mi->master_log_pos, mi->master_log_name);
+
+ strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(),
+ sizeof(rli->group_relay_log_name)-1);
+ strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
+ sizeof(mi->rli.event_relay_log_name)-1);
+
+ rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
+ }
+ DBUG_RETURN(0);
+}
+
/**
Convert slave skip errors bitmap into a printable string.
*/
@@ -305,6 +397,9 @@ static void print_slave_skip_errors(void)
DBUG_ASSERT(sizeof(slave_skip_error_names) > MIN_ROOM);
DBUG_ASSERT(MAX_SLAVE_ERROR <= 999999); // 6 digits
+ /* Make @@slave_skip_errors show the nice human-readable value. */
+ opt_slave_skip_errors= slave_skip_error_names;
+
if (!use_slave_mask || bitmap_is_clear_all(&slave_error_mask))
{
/* purecov: begin tested */
@@ -406,7 +501,8 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
if (!mi->inited)
DBUG_RETURN(0); /* successfully do nothing */
int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
- pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
+ mysql_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
+ mysql_mutex_t *log_lock= mi->rli.relay_log.get_log_lock();
if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL))
{
@@ -418,6 +514,19 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
skip_lock)) &&
!force_all)
DBUG_RETURN(error);
+
+ mysql_mutex_lock(log_lock);
+
+ DBUG_PRINT("info",("Flushing relay-log info file."));
+ if (current_thd)
+ thd_proc_info(current_thd, "Flushing relay-log info file.");
+ if (flush_relay_log_info(&mi->rli))
+ DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS);
+
+ if (my_sync(mi->rli.info_fd, MYF(MY_WME)))
+ DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS);
+
+ mysql_mutex_unlock(log_lock);
}
if (thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL))
{
@@ -429,8 +538,25 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
skip_lock)) &&
!force_all)
DBUG_RETURN(error);
+
+ mysql_mutex_lock(log_lock);
+
+ DBUG_PRINT("info",("Flushing relay log and master info file."));
+ if (current_thd)
+ thd_proc_info(current_thd, "Flushing relay log and master info files.");
+ if (flush_master_info(mi, TRUE, FALSE))
+ DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS);
+
+ if (mi->rli.relay_log.is_open() &&
+ my_sync(mi->rli.relay_log.get_log_file()->file, MYF(MY_WME)))
+ DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS);
+
+ if (my_sync(mi->fd, MYF(MY_WME)))
+ DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS);
+
+ mysql_mutex_unlock(log_lock);
}
- DBUG_RETURN(0);
+ DBUG_RETURN(0);
}
@@ -470,19 +596,19 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
*/
static int
terminate_slave_thread(THD *thd,
- pthread_mutex_t *term_lock,
- pthread_cond_t *term_cond,
+ mysql_mutex_t *term_lock,
+ mysql_cond_t *term_cond,
volatile uint *slave_running,
bool skip_lock)
{
DBUG_ENTER("terminate_slave_thread");
if (!skip_lock)
{
- pthread_mutex_lock(term_lock);
+ mysql_mutex_lock(term_lock);
}
else
{
- safe_mutex_assert_owner(term_lock);
+ mysql_mutex_assert_owner(term_lock);
}
if (!*slave_running)
{
@@ -492,7 +618,7 @@ terminate_slave_thread(THD *thd,
if run_lock (term_lock) is acquired locally then either
slave_running status is fine
*/
- pthread_mutex_unlock(term_lock);
+ mysql_mutex_unlock(term_lock);
DBUG_RETURN(0);
}
else
@@ -513,18 +639,18 @@ terminate_slave_thread(THD *thd,
int error;
DBUG_PRINT("loop", ("killing slave thread"));
- pthread_mutex_lock(&thd->LOCK_thd_data);
+ mysql_mutex_lock(&thd->LOCK_thd_data);
#ifndef DONT_USE_THR_ALARM
/*
Error codes from pthread_kill are:
EINVAL: invalid signal number (can't happen)
ESRCH: thread already killed (can happen, should be ignored)
*/
- IF_DBUG(int err= ) pthread_kill(thd->real_id, thr_client_alarm);
+ int err __attribute__((unused))= pthread_kill(thd->real_id, thr_client_alarm);
DBUG_ASSERT(err != EINVAL);
#endif
thd->awake(THD::NOT_KILLED);
- pthread_mutex_unlock(&thd->LOCK_thd_data);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
/*
There is a small chance that slave thread might miss the first
@@ -532,25 +658,28 @@ terminate_slave_thread(THD *thd,
*/
struct timespec abstime;
set_timespec(abstime,2);
- error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
+ error= mysql_cond_timedwait(term_cond, term_lock, &abstime);
DBUG_ASSERT(error == ETIMEDOUT || error == 0);
}
DBUG_ASSERT(*slave_running == 0);
if (!skip_lock)
- pthread_mutex_unlock(term_lock);
+ mysql_mutex_unlock(term_lock);
DBUG_RETURN(0);
}
-int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
- pthread_mutex_t *cond_lock,
- pthread_cond_t *start_cond,
+int start_slave_thread(
+#ifdef HAVE_PSI_INTERFACE
+ PSI_thread_key thread_key,
+#endif
+ pthread_handler h_func, mysql_mutex_t *start_lock,
+ mysql_mutex_t *cond_lock,
+ mysql_cond_t *start_cond,
volatile uint *slave_running,
volatile ulong *slave_run_id,
- Master_info* mi,
- bool high_priority)
+ Master_info* mi)
{
pthread_t th;
ulong start_id;
@@ -559,13 +688,13 @@ int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
DBUG_ASSERT(mi->inited);
if (start_lock)
- pthread_mutex_lock(start_lock);
+ mysql_mutex_lock(start_lock);
if (!server_id)
{
if (start_cond)
- pthread_cond_broadcast(start_cond);
+ mysql_cond_broadcast(start_cond);
if (start_lock)
- pthread_mutex_unlock(start_lock);
+ mysql_mutex_unlock(start_lock);
sql_print_error("Server id not set, will not start slave");
DBUG_RETURN(ER_BAD_SLAVE);
}
@@ -573,19 +702,18 @@ int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
if (*slave_running)
{
if (start_cond)
- pthread_cond_broadcast(start_cond);
+ mysql_cond_broadcast(start_cond);
if (start_lock)
- pthread_mutex_unlock(start_lock);
+ mysql_mutex_unlock(start_lock);
DBUG_RETURN(ER_SLAVE_MUST_STOP);
}
start_id= *slave_run_id;
DBUG_PRINT("info",("Creating new slave thread"));
- if (high_priority)
- my_pthread_attr_setprio(&connection_attrib,CONNECT_PRIOR);
- if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
+ if (mysql_thread_create(thread_key,
+ &th, &connection_attrib, h_func, (void*)mi))
{
if (start_lock)
- pthread_mutex_unlock(start_lock);
+ mysql_mutex_unlock(start_lock);
DBUG_RETURN(ER_SLAVE_THREAD);
}
if (start_cond && cond_lock) // caller has cond_lock
@@ -594,17 +722,29 @@ int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
while (start_id == *slave_run_id)
{
DBUG_PRINT("sleep",("Waiting for slave thread to start"));
- const char* old_msg = thd->enter_cond(start_cond,cond_lock,
- "Waiting for slave thread to start");
- pthread_cond_wait(start_cond,cond_lock);
+ const char *old_msg= thd->enter_cond(start_cond, cond_lock,
+ "Waiting for slave thread to start");
+ /*
+ It is not sufficient to test this at loop bottom. We must test
+ it after registering the mutex in enter_cond(). If the kill
+ happens after testing of thd->killed and before the mutex is
+ registered, we could otherwise go waiting though thd->killed is
+ set.
+ */
+ if (!thd->killed)
+ mysql_cond_wait(start_cond, cond_lock);
thd->exit_cond(old_msg);
- pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
+ mysql_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
if (thd->killed)
+ {
+ if (start_lock)
+ mysql_mutex_unlock(start_lock);
DBUG_RETURN(thd->killed_errno());
+ }
}
}
if (start_lock)
- pthread_mutex_unlock(start_lock);
+ mysql_mutex_unlock(start_lock);
DBUG_RETURN(0);
}
@@ -622,8 +762,8 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
Master_info* mi, const char* master_info_fname,
const char* slave_info_fname, int thread_mask)
{
- pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
- pthread_cond_t* cond_io=0,*cond_sql=0;
+ mysql_mutex_t *lock_io=0, *lock_sql=0, *lock_cond_io=0, *lock_cond_sql=0;
+ mysql_cond_t* cond_io=0, *cond_sql=0;
int error=0;
DBUG_ENTER("start_slave_threads");
@@ -641,16 +781,24 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
}
if (thread_mask & SLAVE_IO)
- error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
- cond_io,
- &mi->slave_running, &mi->slave_run_id,
- mi, 1); //high priority, to read the most possible
+ error= start_slave_thread(
+#ifdef HAVE_PSI_INTERFACE
+ key_thread_slave_io,
+#endif
+ handle_slave_io, lock_io, lock_cond_io,
+ cond_io,
+ &mi->slave_running, &mi->slave_run_id,
+ mi);
if (!error && (thread_mask & SLAVE_SQL))
{
- error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
- cond_sql,
- &mi->rli.slave_running, &mi->rli.slave_run_id,
- mi, 0);
+ error= start_slave_thread(
+#ifdef HAVE_PSI_INTERFACE
+ key_thread_slave_sql,
+#endif
+ handle_slave_sql, lock_sql, lock_cond_sql,
+ cond_sql,
+ &mi->rli.slave_running, &mi->rli.slave_run_id,
+ mi);
if (error)
terminate_slave_threads(mi, thread_mask & SLAVE_IO, !need_slave_mutex);
}
@@ -658,17 +806,6 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
}
-#ifdef NOT_USED_YET
-static int end_slave_on_walk(Master_info* mi, uchar* /*unused*/)
-{
- DBUG_ENTER("end_slave_on_walk");
-
- end_master_info(mi);
- DBUG_RETURN(0);
-}
-#endif
-
-
/*
Release slave threads at time of executing shutdown.
@@ -687,7 +824,7 @@ void end_slave()
will make us wait until slave threads have started, and START SLAVE
returns, then we terminate them here.
*/
- pthread_mutex_lock(&LOCK_active_mi);
+ mysql_mutex_lock(&LOCK_active_mi);
if (active_mi)
{
/*
@@ -697,7 +834,7 @@ void end_slave()
*/
terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
}
- pthread_mutex_unlock(&LOCK_active_mi);
+ mysql_mutex_unlock(&LOCK_active_mi);
DBUG_VOID_RETURN;
}
@@ -712,14 +849,14 @@ void end_slave()
*/
void close_active_mi()
{
- pthread_mutex_lock(&LOCK_active_mi);
+ mysql_mutex_lock(&LOCK_active_mi);
if (active_mi)
{
end_master_info(active_mi);
delete active_mi;
active_mi= 0;
}
- pthread_mutex_unlock(&LOCK_active_mi);
+ mysql_mutex_unlock(&LOCK_active_mi);
}
static bool io_slave_killed(THD* thd, Master_info* mi)
@@ -731,9 +868,22 @@ static bool io_slave_killed(THD* thd, Master_info* mi)
DBUG_RETURN(mi->abort_slave || abort_loop || thd->killed);
}
+/**
+ The function analyzes a possible killed status and makes
+ a decision whether to accept it or not.
+ Normally upon accepting the sql thread goes to shutdown.
+ In the event of deffering decision @rli->last_event_start_time waiting
+ timer is set to force the killed status be accepted upon its expiration.
+
+ @param thd pointer to a THD instance
+ @param rli pointer to Relay_log_info instance
+ @return TRUE the killed status is recognized, FALSE a possible killed
+ status is deferred.
+*/
static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
{
+ bool ret= FALSE;
DBUG_ENTER("sql_slave_killed");
DBUG_ASSERT(rli->sql_thd == thd);
@@ -748,36 +898,72 @@ static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
as well.
Example: OPTION_KEEP_LOG is set if a temporary table is created or dropped.
*/
- if (rli->abort_slave && rli->is_in_group() &&
- (thd->transaction.all.modified_non_trans_table ||
- (thd->options & OPTION_KEEP_LOG)))
- DBUG_RETURN(0);
- /*
- If we are in an unsafe situation (stopping could corrupt replication),
- we give one minute to the slave SQL thread of grace before really
- terminating, in the hope that it will be able to read more events and
- the unsafe situation will soon be left. Note that this one minute starts
- from the last time anything happened in the slave SQL thread. So it's
- really one minute of idleness, we don't timeout if the slave SQL thread
- is actively working.
- */
- if (rli->last_event_start_time == 0)
- DBUG_RETURN(1);
- DBUG_PRINT("info", ("Slave SQL thread is in an unsafe situation, giving "
- "it some grace period"));
- if (difftime(time(0), rli->last_event_start_time) > 60)
+ if ((thd->transaction.all.modified_non_trans_table ||
+ (thd->variables.option_bits & OPTION_KEEP_LOG))
+ && rli->is_in_group())
{
- rli->report(ERROR_LEVEL, 0,
- "SQL thread had to stop in an unsafe situation, in "
- "the middle of applying updates to a "
- "non-transactional table without any primary key. "
- "There is a risk of duplicate updates when the slave "
- "SQL thread is restarted. Please check your tables' "
- "contents after restart.");
- DBUG_RETURN(1);
+ char msg_stopped[]=
+ "... The slave SQL is stopped, leaving the current group "
+ "of events unfinished with a non-transaction table changed. "
+ "If the group consists solely of Row-based events, you can try "
+ "restarting the slave with --slave-exec-mode=IDEMPOTENT, which "
+ "ignores duplicate key, key not found, and similar errors (see "
+ "documentation for details).";
+
+ if (rli->abort_slave)
+ {
+ DBUG_PRINT("info", ("Slave SQL thread is being stopped in the middle of"
+ " a group having updated a non-trans table, giving"
+ " it some grace period"));
+
+ /*
+ Slave sql thread shutdown in face of unfinished group modified
+ Non-trans table is handled via a timer. The slave may eventually
+ give out to complete the current group and in that case there
+ might be issues at consequent slave restart, see the error message.
+ WL#2975 offers a robust solution requiring to store the last exectuted
+ event's coordinates along with the group's coordianates
+ instead of waiting with @c last_event_start_time the timer.
+ */
+
+ if (rli->last_event_start_time == 0)
+ rli->last_event_start_time= my_time(0);
+ ret= difftime(my_time(0), rli->last_event_start_time) <=
+ SLAVE_WAIT_GROUP_DONE ? FALSE : TRUE;
+
+ DBUG_EXECUTE_IF("stop_slave_middle_group",
+ DBUG_EXECUTE_IF("incomplete_group_in_relay_log",
+ ret= TRUE;);); // time is over
+
+ if (ret == 0)
+ {
+ rli->report(WARNING_LEVEL, 0,
+ "slave SQL thread is being stopped in the middle "
+ "of applying of a group having updated a non-transaction "
+ "table; waiting for the group completion ... ");
+ }
+ else
+ {
+ rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ ER(ER_SLAVE_FATAL_ERROR), msg_stopped);
+ }
+ }
+ else
+ {
+ ret= TRUE;
+ rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, ER(ER_SLAVE_FATAL_ERROR),
+ msg_stopped);
+ }
+ }
+ else
+ {
+ ret= TRUE;
}
}
- DBUG_RETURN(0);
+ if (ret)
+ rli->last_event_start_time= 0;
+
+ DBUG_RETURN(ret);
}
@@ -870,6 +1056,115 @@ int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
DBUG_RETURN(1);
}
+int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
+{
+ char buf[16];
+ DBUG_ENTER("init_floatvar_from_file");
+
+
+ if (my_b_gets(f, buf, sizeof(buf)))
+ {
+ if (sscanf(buf, "%f", var) != 1)
+ DBUG_RETURN(1);
+ else
+ DBUG_RETURN(0);
+ }
+ else if (default_val != 0.0)
+ {
+ *var = default_val;
+ DBUG_RETURN(0);
+ }
+ DBUG_RETURN(1);
+}
+
+
+/**
+ A master info read method
+
+ This function is called from @c init_master_info() along with
+ relatives to restore some of @c active_mi members.
+ Particularly, this function is responsible for restoring
+ IGNORE_SERVER_IDS list of servers whose events the slave is
+ going to ignore (to not log them in the relay log).
+ Items being read are supposed to be decimal output of values of a
+ type shorter or equal of @c long and separated by the single space.
+
+ @param arr @c DYNAMIC_ARRAY pointer to storage for servers id
+ @param f @c IO_CACHE pointer to the source file
+
+ @retval 0 All OK
+ @retval non-zero An error
+*/
+
+int init_dynarray_intvar_from_file(DYNAMIC_ARRAY* arr, IO_CACHE* f)
+{
+ int ret= 0;
+ char buf[16 * (sizeof(long)*4 + 1)]; // static buffer to use most of times
+ char *buf_act= buf; // actual buffer can be dynamic if static is short
+ char *token, *last;
+ uint num_items; // number of items of `arr'
+ size_t read_size;
+ DBUG_ENTER("init_dynarray_intvar_from_file");
+
+ if ((read_size= my_b_gets(f, buf_act, sizeof(buf))) == 0)
+ {
+ return 0; // no line in master.info
+ }
+ if (read_size + 1 == sizeof(buf) && buf[sizeof(buf) - 2] != '\n')
+ {
+ /*
+ short read happend; allocate sufficient memory and make the 2nd read
+ */
+ char buf_work[(sizeof(long)*3 + 1)*16];
+ memcpy(buf_work, buf, sizeof(buf_work));
+ num_items= atoi(strtok_r(buf_work, " ", &last));
+ size_t snd_size;
+ /*
+ max size lower bound approximate estimation bases on the formula:
+ (the items number + items themselves) *
+ (decimal size + space) - 1 + `\n' + '\0'
+ */
+ size_t max_size= (1 + num_items) * (sizeof(long)*3 + 1) + 1;
+ buf_act= (char*) my_malloc(max_size, MYF(MY_WME));
+ memcpy(buf_act, buf, read_size);
+ snd_size= my_b_gets(f, buf_act + read_size, max_size - read_size);
+ if (snd_size == 0 ||
+ ((snd_size + 1 == max_size - read_size) && buf[max_size - 2] != '\n'))
+ {
+ /*
+ failure to make the 2nd read or short read again
+ */
+ ret= 1;
+ goto err;
+ }
+ }
+ token= strtok_r(buf_act, " ", &last);
+ if (token == NULL)
+ {
+ ret= 1;
+ goto err;
+ }
+ num_items= atoi(token);
+ for (uint i=0; i < num_items; i++)
+ {
+ token= strtok_r(NULL, " ", &last);
+ if (token == NULL)
+ {
+ ret= 1;
+ goto err;
+ }
+ else
+ {
+ ulong val= atol(token);
+ insert_dynamic(arr, (uchar *) &val);
+ }
+ }
+err:
+ if (buf_act != buf)
+ my_free(buf_act);
+ DBUG_RETURN(ret);
+}
+
/*
Check if the error is caused by network.
@@ -1009,6 +1304,8 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
mi->clock_diff_with_master=
(long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
}
+ else if (check_io_slave_killed(mi->io_thd, mi, NULL))
+ goto slave_killed_err;
else if (is_network_error(mysql_errno(mysql)))
{
mi->report(WARNING_LEVEL, mysql_errno(mysql),
@@ -1055,7 +1352,7 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
(master_res= mysql_store_result(mysql)) &&
(master_row= mysql_fetch_row(master_res)))
{
- if ((::server_id == strtoul(master_row[1], 0, 10)) &&
+ if ((::server_id == (mi->master_id= strtoul(master_row[1], 0, 10))) &&
!mi->rli.replicate_same_server_id)
{
errmsg= "The slave I/O thread stops because master and slave have equal \
@@ -1069,7 +1366,9 @@ not always make sense; please check the manual before using it).";
}
else if (mysql_errno(mysql))
{
- if (is_network_error(mysql_errno(mysql)))
+ if (check_io_slave_killed(mi->io_thd, mi, NULL))
+ goto slave_killed_err;
+ else if (is_network_error(mysql_errno(mysql)))
{
mi->report(WARNING_LEVEL, mysql_errno(mysql),
"Get master SERVER_ID failed with error: %s", mysql_error(mysql));
@@ -1093,6 +1392,13 @@ maybe it is a *VERY OLD MASTER*.");
mysql_free_result(master_res);
master_res= NULL;
}
+ if (mi->master_id == 0 && mi->ignore_server_ids.elements > 0)
+ {
+ errmsg= "Slave configured with server id filtering could not detect the master server id.";
+ err_code= ER_SLAVE_FATAL_ERROR;
+ sprintf(err_buff, ER(err_code), errmsg);
+ goto err;
+ }
/*
Check that the master's global character_set_server and ours are the same.
@@ -1133,6 +1439,8 @@ be equal for the Statement-format replication to work";
goto err;
}
}
+ else if (check_io_slave_killed(mi->io_thd, mi, NULL))
+ goto slave_killed_err;
else if (is_network_error(mysql_errno(mysql)))
{
mi->report(WARNING_LEVEL, mysql_errno(mysql),
@@ -1194,6 +1502,8 @@ be equal for the Statement-format replication to work";
goto err;
}
}
+ else if (check_io_slave_killed(mi->io_thd, mi, NULL))
+ goto slave_killed_err;
else if (is_network_error(mysql_errno(mysql)))
{
mi->report(WARNING_LEVEL, mysql_errno(mysql),
@@ -1216,6 +1526,31 @@ when it try to get the value of TIME_ZONE global variable from master.";
}
}
+ if (mi->heartbeat_period != 0.0)
+ {
+ char llbuf[22];
+ const char query_format[]= "SET @master_heartbeat_period= %s";
+ char query[sizeof(query_format) - 2 + sizeof(llbuf)];
+ /*
+ the period is an ulonglong of nano-secs.
+ */
+ llstr((ulonglong) (mi->heartbeat_period*1000000000UL), llbuf);
+ sprintf(query, query_format, llbuf);
+
+ if (mysql_real_query(mysql, query, strlen(query))
+ && !check_io_slave_killed(mi->io_thd, mi, NULL))
+ {
+ errmsg= "The slave I/O thread stops because SET @master_heartbeat_period "
+ "on master failed.";
+ err_code= ER_SLAVE_FATAL_ERROR;
+ sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
+ mysql_free_result(mysql_store_result(mysql));
+ goto err;
+ }
+ mysql_free_result(mysql_store_result(mysql));
+ }
+
+
err:
if (errmsg)
{
@@ -1232,201 +1567,13 @@ network_err:
if (master_res)
mysql_free_result(master_res);
DBUG_RETURN(2);
-}
-
-/*
- Used by fetch_master_table (used by LOAD TABLE tblname FROM MASTER and LOAD
- DATA FROM MASTER). Drops the table (if 'overwrite' is true) and recreates it
- from the dump. Honours replication inclusion/exclusion rules.
- db must be non-zero (guarded by assertion).
-
- RETURN VALUES
- 0 success
- 1 error
-*/
-static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
- const char* table_name, bool overwrite)
-{
- ulong packet_len;
- char *query, *save_db;
- uint32 save_db_length;
- Vio* save_vio;
- HA_CHECK_OPT check_opt;
- TABLE_LIST tables;
- int error= 1;
- handler *file;
- ulonglong save_options;
- NET *net= &mysql->net;
- const char *found_semicolon= NULL;
- DBUG_ENTER("create_table_from_dump");
-
- packet_len= my_net_read(net); // read create table statement
- if (packet_len == packet_error)
- {
- my_message(ER_MASTER_NET_READ, ER(ER_MASTER_NET_READ), MYF(0));
- DBUG_RETURN(1);
- }
- if (net->read_pos[0] == 255) // error from master
- {
- char *err_msg;
- err_msg= (char*) net->read_pos + ((mysql->server_capabilities &
- CLIENT_PROTOCOL_41) ?
- 3+SQLSTATE_LENGTH+1 : 3);
- my_error(ER_MASTER, MYF(0), err_msg);
- DBUG_RETURN(1);
- }
- thd->command = COM_TABLE_DUMP;
- if (!(query = thd->strmake((char*) net->read_pos, packet_len)))
- {
- sql_print_error("create_table_from_dump: out of memory");
- my_message(ER_GET_ERRNO, "Out of memory", MYF(0));
- DBUG_RETURN(1);
- }
- thd->set_query(query, packet_len);
- thd->is_slave_error = 0;
-
- bzero((char*) &tables,sizeof(tables));
- tables.db = (char*)db;
- tables.alias= tables.table_name= (char*)table_name;
-
- /* Drop the table if 'overwrite' is true */
- if (overwrite)
- {
- if (mysql_rm_table(thd,&tables,1,0)) /* drop if exists */
- {
- sql_print_error("create_table_from_dump: failed to drop the table");
- goto err;
- }
- else
- {
- /* Clear the OK result of mysql_rm_table(). */
- thd->main_da.reset_diagnostics_area();
- }
- }
-
- /* Create the table. We do not want to log the "create table" statement */
- save_options = thd->options;
- thd->options &= ~ (OPTION_BIN_LOG);
- thd_proc_info(thd, "Creating table from master dump");
- // save old db in case we are creating in a different database
- save_db = thd->db;
- save_db_length= thd->db_length;
- thd->db = (char*)db;
- DBUG_ASSERT(thd->db != 0);
- thd->db_length= strlen(thd->db);
- /* run create table */
- mysql_parse(thd, thd->query(), packet_len, &found_semicolon);
- thd->db = save_db; // leave things the way the were before
- thd->db_length= save_db_length;
- thd->options = save_options;
-
- if (thd->is_slave_error)
- goto err; // mysql_parse took care of the error send
-
- thd_proc_info(thd, "Opening master dump table");
- thd->main_da.reset_diagnostics_area(); /* cleanup from CREATE_TABLE */
- /*
- Note: If this function starts to fail for MERGE tables,
- change the next two lines to these:
- tables.table= NULL; // was set by mysql_rm_table()
- if (!open_n_lock_single_table(thd, &tables, TL_WRITE))
- */
- tables.lock_type = TL_WRITE;
- if (!open_ltable(thd, &tables, TL_WRITE, 0))
- {
- sql_print_error("create_table_from_dump: could not open created table");
- goto err;
- }
-
- file = tables.table->file;
- thd_proc_info(thd, "Reading master dump table data");
- /* Copy the data file */
- if (file->net_read_dump(net))
- {
- my_message(ER_MASTER_NET_READ, ER(ER_MASTER_NET_READ), MYF(0));
- sql_print_error("create_table_from_dump: failed in\
- handler::net_read_dump()");
- goto err;
- }
-
- check_opt.init();
- check_opt.flags|= T_VERY_SILENT | T_CALC_CHECKSUM | T_QUICK;
- thd_proc_info(thd, "Rebuilding the index on master dump table");
- /*
- We do not want repair() to spam us with messages
- just send them to the error log, and report the failure in case of
- problems.
- */
- save_vio = thd->net.vio;
- thd->net.vio = 0;
- /* Rebuild the index file from the copied data file (with REPAIR) */
- error=file->ha_repair(thd,&check_opt) != 0;
- thd->net.vio = save_vio;
- if (error)
- my_error(ER_INDEX_REBUILD, MYF(0), tables.table->s->table_name.str);
-
-err:
- close_thread_tables(thd);
- DBUG_RETURN(error);
-}
-
-
-int fetch_master_table(THD *thd, const char *db_name, const char *table_name,
- Master_info *mi, MYSQL *mysql, bool overwrite)
-{
- int error= 1;
- const char *errmsg=0;
- bool called_connected= (mysql != NULL);
- DBUG_ENTER("fetch_master_table");
- DBUG_PRINT("enter", ("db_name: '%s' table_name: '%s'",
- db_name,table_name));
-
- if (!called_connected)
- {
- if (!(mysql = mysql_init(NULL)))
- {
- DBUG_RETURN(1);
- }
- if (connect_to_master(thd, mysql, mi))
- {
- my_error(ER_CONNECT_TO_MASTER, MYF(0), mysql_error(mysql));
- /*
- We need to clear the active VIO since, theoretically, somebody
- might issue an awake() on this thread. If we are then in the
- middle of closing and destroying the VIO inside the
- mysql_close(), we will have a problem.
- */
-#ifdef SIGNAL_WITH_VIO_CLOSE
- thd->clear_active_vio();
-#endif
- mysql_close(mysql);
- DBUG_RETURN(1);
- }
- if (thd->killed)
- goto err;
- }
-
- if (request_table_dump(mysql, db_name, table_name))
- {
- error= ER_UNKNOWN_ERROR;
- errmsg= "Failed on table dump request";
- goto err;
- }
- if (create_table_from_dump(thd, mysql, db_name,
- table_name, overwrite))
- goto err; // create_table_from_dump have sent the error already
- error = 0;
-
- err:
- if (!called_connected)
- mysql_close(mysql);
- if (errmsg && thd->vio_ok())
- my_message(error, errmsg, MYF(0));
- DBUG_RETURN(test(error)); // Return 1 on error
+slave_killed_err:
+ if (master_res)
+ mysql_free_result(master_res);
+ DBUG_RETURN(2);
}
-
static bool wait_for_relay_log_space(Relay_log_info* rli)
{
bool slave_killed=0;
@@ -1435,7 +1582,7 @@ static bool wait_for_relay_log_space(Relay_log_info* rli)
THD* thd = mi->io_thd;
DBUG_ENTER("wait_for_relay_log_space");
- pthread_mutex_lock(&rli->log_space_lock);
+ mysql_mutex_lock(&rli->log_space_lock);
save_proc_info= thd->enter_cond(&rli->log_space_cond,
&rli->log_space_lock,
"\
@@ -1443,7 +1590,7 @@ Waiting for the slave SQL thread to free enough relay log space");
while (rli->log_space_limit < rli->log_space_total &&
!(slave_killed=io_slave_killed(thd,mi)) &&
!rli->ignore_log_space_limit)
- pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
+ mysql_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
thd->exit_cond(save_proc_info);
DBUG_RETURN(slave_killed);
}
@@ -1465,11 +1612,11 @@ Waiting for the slave SQL thread to free enough relay log space");
static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
{
Relay_log_info *rli= &mi->rli;
- pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
+ mysql_mutex_t *log_lock= rli->relay_log.get_log_lock();
DBUG_ENTER("write_ignored_events_info_to_relay_log");
DBUG_ASSERT(thd == mi->io_thd);
- pthread_mutex_lock(log_lock);
+ mysql_mutex_lock(log_lock);
if (rli->ign_master_log_name_end[0])
{
DBUG_PRINT("info",("writing a Rotate event to track down ignored events"));
@@ -1478,7 +1625,7 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
Rotate_log_event::DUP_NAME);
rli->ign_master_log_name_end[0]= 0;
/* can unlock before writing as slave SQL thd will soon see our Rotate */
- pthread_mutex_unlock(log_lock);
+ mysql_mutex_unlock(log_lock);
if (likely((bool)ev))
{
ev->server_id= 0; // don't be ignored by slave SQL thread
@@ -1500,7 +1647,7 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
" SHOW SLAVE STATUS may be inaccurate");
}
else
- pthread_mutex_unlock(log_lock);
+ mysql_mutex_unlock(log_lock);
DBUG_VOID_RETURN;
}
@@ -1509,28 +1656,54 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi,
bool *suppress_warnings)
{
uchar buf[1024], *pos= buf;
- uint report_host_len, report_user_len=0, report_password_len=0;
+ uint report_host_len=0, report_user_len=0, report_password_len=0;
DBUG_ENTER("register_slave_on_master");
*suppress_warnings= FALSE;
- if (!report_host)
+ if (report_host)
+ report_host_len= strlen(report_host);
+ if (report_host_len > HOSTNAME_LENGTH)
+ {
+ sql_print_warning("The length of report_host is %d. "
+ "It is larger than the max length(%d), so this "
+ "slave cannot be registered to the master.",
+ report_host_len, HOSTNAME_LENGTH);
DBUG_RETURN(0);
- report_host_len= strlen(report_host);
+ }
+
if (report_user)
report_user_len= strlen(report_user);
+ if (report_user_len > USERNAME_LENGTH)
+ {
+ sql_print_warning("The length of report_user is %d. "
+ "It is larger than the max length(%d), so this "
+ "slave cannot be registered to the master.",
+ report_user_len, USERNAME_LENGTH);
+ DBUG_RETURN(0);
+ }
+
if (report_password)
report_password_len= strlen(report_password);
- /* 30 is a good safety margin */
- if (report_host_len + report_user_len + report_password_len + 30 >
- sizeof(buf))
- DBUG_RETURN(0); // safety
+ if (report_password_len > MAX_PASSWORD_LENGTH)
+ {
+ sql_print_warning("The length of report_password is %d. "
+ "It is larger than the max length(%d), so this "
+ "slave cannot be registered to the master.",
+ report_password_len, MAX_PASSWORD_LENGTH);
+ DBUG_RETURN(0);
+ }
int4store(pos, server_id); pos+= 4;
pos= net_store_data(pos, (uchar*) report_host, report_host_len);
pos= net_store_data(pos, (uchar*) report_user, report_user_len);
pos= net_store_data(pos, (uchar*) report_password, report_password_len);
int2store(pos, (uint16) report_port); pos+= 2;
- int4store(pos, rpl_recovery_rank); pos+= 4;
+ /*
+ Fake rpl_recovery_rank, which was removed in BUG#13963,
+ so that this server can register itself on old servers,
+ see BUG#49259.
+ */
+ int4store(pos, /* rpl_recovery_rank */ 0); pos+= 4;
/* The master will fill in master_id */
int4store(pos, 0); pos+= 4;
@@ -1632,8 +1805,12 @@ bool show_master_info(THD* thd, Master_info* mi)
field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, MYSQL_TYPE_LONG));
field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
+ field_list.push_back(new Item_empty_string("Replicate_Ignore_Server_Ids",
+ FN_REFLEN));
+ field_list.push_back(new Item_return_int("Master_Server_Id", sizeof(ulong),
+ MYSQL_TYPE_LONG));
- if (protocol->send_fields(&field_list,
+ if (protocol->send_result_set_metadata(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(TRUE);
@@ -1647,14 +1824,14 @@ bool show_master_info(THD* thd, Master_info* mi)
slave_running can be accessed without run_lock but not other
non-volotile members like mi->io_thd, which is guarded by the mutex.
*/
- pthread_mutex_lock(&mi->run_lock);
+ mysql_mutex_lock(&mi->run_lock);
protocol->store(mi->io_thd ? mi->io_thd->proc_info : "", &my_charset_bin);
- pthread_mutex_unlock(&mi->run_lock);
+ mysql_mutex_unlock(&mi->run_lock);
- pthread_mutex_lock(&mi->data_lock);
- pthread_mutex_lock(&mi->rli.data_lock);
- pthread_mutex_lock(&mi->err_lock);
- pthread_mutex_lock(&mi->rli.err_lock);
+ mysql_mutex_lock(&mi->data_lock);
+ mysql_mutex_lock(&mi->rli.data_lock);
+ mysql_mutex_lock(&mi->err_lock);
+ mysql_mutex_lock(&mi->rli.err_lock);
protocol->store(mi->host, &my_charset_bin);
protocol->store(mi->user, &my_charset_bin);
protocol->store((uint32) mi->port);
@@ -1667,7 +1844,8 @@ bool show_master_info(THD* thd, Master_info* mi)
protocol->store((ulonglong) mi->rli.group_relay_log_pos);
protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
protocol->store(mi->slave_running == MYSQL_SLAVE_RUN_CONNECT ?
- "Yes" : "No", &my_charset_bin);
+ "Yes" : (mi->slave_running == MYSQL_SLAVE_RUN_NOT_CONNECT ?
+ "Connecting" : "No"), &my_charset_bin);
protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
protocol->store(rpl_filter->get_do_db());
protocol->store(rpl_filter->get_ignore_db());
@@ -1753,11 +1931,37 @@ bool show_master_info(THD* thd, Master_info* mi)
protocol->store(mi->rli.last_error().number);
// Last_SQL_Error
protocol->store(mi->rli.last_error().message, &my_charset_bin);
+ // Replicate_Ignore_Server_Ids
+ {
+ char buff[FN_REFLEN];
+ ulong i, cur_len;
+ for (i= 0, buff[0]= 0, cur_len= 0;
+ i < mi->ignore_server_ids.elements; i++)
+ {
+ ulong s_id, slen;
+ char sbuff[FN_REFLEN];
+ get_dynamic(&mi->ignore_server_ids, (uchar*) &s_id, i);
+ slen= sprintf(sbuff, (i==0? "%lu" : ", %lu"), s_id);
+ if (cur_len + slen + 4 > FN_REFLEN)
+ {
+ /*
+ break the loop whenever remained space could not fit
+ ellipses on the next cycle
+ */
+ sprintf(buff + cur_len, "...");
+ break;
+ }
+ cur_len += sprintf(buff + cur_len, "%s", sbuff);
+ }
+ protocol->store(buff, &my_charset_bin);
+ }
+ // Master_Server_id
+ protocol->store((uint32) mi->master_id);
- pthread_mutex_unlock(&mi->rli.err_lock);
- pthread_mutex_unlock(&mi->err_lock);
- pthread_mutex_unlock(&mi->rli.data_lock);
- pthread_mutex_unlock(&mi->data_lock);
+ mysql_mutex_unlock(&mi->rli.err_lock);
+ mysql_mutex_unlock(&mi->err_lock);
+ mysql_mutex_unlock(&mi->rli.data_lock);
+ mysql_mutex_unlock(&mi->data_lock);
if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length()))
DBUG_RETURN(TRUE);
@@ -1779,12 +1983,12 @@ void set_slave_thread_options(THD* thd)
when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
only for client threads.
*/
- ulonglong options= thd->options | OPTION_BIG_SELECTS;
+ ulonglong options= thd->variables.option_bits | OPTION_BIG_SELECTS;
if (opt_log_slave_updates)
options|= OPTION_BIN_LOG;
else
options&= ~OPTION_BIN_LOG;
- thd->options= options;
+ thd->variables.option_bits= options;
thd->variables.completion_type= 0;
DBUG_VOID_RETURN;
}
@@ -1836,9 +2040,9 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
thd->enable_slow_log= opt_log_slow_slave_statements;
set_slave_thread_options(thd);
thd->client_capabilities = CLIENT_LOCAL_FILES;
- pthread_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_thread_count);
thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
- pthread_mutex_unlock(&LOCK_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
DBUG_EXECUTE_IF("simulate_io_slave_error_on_init",
simulate_error|= (1 << SLAVE_THD_IO););
@@ -1853,14 +2057,14 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
thd->cleanup();
DBUG_RETURN(-1);
}
- lex_start(thd);
if (thd_type == SLAVE_THD_SQL)
thd_proc_info(thd, "Waiting for the next event in relay log");
else
thd_proc_info(thd, "Waiting for master update");
- thd->version=refresh_version;
thd->set_time();
+ /* Do not use user-supplied timeout value for system threads. */
+ thd->variables.lock_wait_timeout= LONG_TIMEOUT;
DBUG_RETURN(0);
}
@@ -1896,17 +2100,22 @@ static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
}
-static int request_dump(MYSQL* mysql, Master_info* mi,
- bool *suppress_warnings)
+static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
+ bool *suppress_warnings)
{
uchar buf[FN_REFLEN + 10];
int len;
- int binlog_flags = 0; // for now
+ ushort binlog_flags = 0; // for now
char* logname = mi->master_log_name;
DBUG_ENTER("request_dump");
*suppress_warnings= FALSE;
+ if (RUN_HOOK(binlog_relay_io,
+ before_request_transmit,
+ (thd, mi, binlog_flags)))
+ DBUG_RETURN(1);
+
// TODO if big log files: Change next to int8store()
int4store(buf, (ulong) mi->master_log_pos);
int2store(buf + 4, binlog_flags);
@@ -1925,37 +2134,7 @@ static int request_dump(MYSQL* mysql, Master_info* mi,
else
sql_print_error("Error on COM_BINLOG_DUMP: %d %s, will retry in %d secs",
mysql_errno(mysql), mysql_error(mysql),
- master_connect_retry);
- DBUG_RETURN(1);
- }
-
- DBUG_RETURN(0);
-}
-
-
-static int request_table_dump(MYSQL* mysql, const char* db, const char* table)
-{
- uchar buf[1024], *p = buf;
- DBUG_ENTER("request_table_dump");
-
- uint table_len = (uint) strlen(table);
- uint db_len = (uint) strlen(db);
- if (table_len + db_len > sizeof(buf) - 2)
- {
- sql_print_error("request_table_dump: Buffer overrun");
- DBUG_RETURN(1);
- }
-
- *p++ = db_len;
- memcpy(p, db, db_len);
- p += db_len;
- *p++ = table_len;
- memcpy(p, table, table_len);
-
- if (simple_command(mysql, COM_TABLE_DUMP, buf, p - buf + table_len, 1))
- {
- sql_print_error("request_table_dump: Error sending the table dump \
-command");
+ mi->connect_retry);
DBUG_RETURN(1);
}
@@ -2039,7 +2218,7 @@ static int has_temporary_error(THD *thd)
DBUG_ENTER("has_temporary_error");
DBUG_EXECUTE_IF("all_errors_are_temporary_errors",
- if (thd->main_da.is_error())
+ if (thd->stmt_da->is_error())
{
thd->clear_error();
my_error(ER_LOCK_DEADLOCK, MYF(0));
@@ -2058,20 +2237,21 @@ static int has_temporary_error(THD *thd)
currently, InnoDB deadlock detected by InnoDB or lock
wait timeout (innodb_lock_wait_timeout exceeded
*/
- if (thd->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
- thd->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
+ if (thd->stmt_da->sql_errno() == ER_LOCK_DEADLOCK ||
+ thd->stmt_da->sql_errno() == ER_LOCK_WAIT_TIMEOUT)
DBUG_RETURN(1);
#ifdef HAVE_NDB_BINLOG
/*
currently temporary error set in ndbcluster
*/
- List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
+ List_iterator_fast<MYSQL_ERROR> it(thd->warning_info->warn_list());
MYSQL_ERROR *err;
while ((err= it++))
{
- DBUG_PRINT("info", ("has warning %d %s", err->code, err->msg));
- switch (err->code)
+ DBUG_PRINT("info", ("has condition %d %s", err->get_sql_errno(),
+ err->get_message_text()));
+ switch (err->get_sql_errno())
{
case ER_GET_TEMPORARY_ERRMSG:
DBUG_RETURN(1);
@@ -2120,8 +2300,8 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
ev->get_type_str(), ev->get_type_code(),
ev->server_id));
DBUG_PRINT("info", ("thd->options: %s%s; rli->last_event_start_time: %lu",
- FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
- FLAGSTR(thd->options, OPTION_BEGIN),
+ FLAGSTR(thd->variables.option_bits, OPTION_NOT_AUTOCOMMIT),
+ FLAGSTR(thd->variables.option_bits, OPTION_BEGIN),
(ulong) rli->last_event_start_time));
/*
@@ -2157,8 +2337,8 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
int reason= ev->shall_skip(rli);
if (reason == Log_event::EVENT_SKIP_COUNT)
- --rli->slave_skip_counter;
- pthread_mutex_unlock(&rli->data_lock);
+ sql_slave_skip_counter= --rli->slave_skip_counter;
+ mysql_mutex_unlock(&rli->data_lock);
if (reason == Log_event::EVENT_SKIP_NOT)
exec_res= ev->apply_event(rli);
@@ -2177,7 +2357,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
"skipped because event skip counter was non-zero"
};
DBUG_PRINT("info", ("OPTION_BEGIN: %d; IN_STMT: %d",
- thd->options & OPTION_BEGIN ? 1 : 0,
+ test(thd->variables.option_bits & OPTION_BEGIN),
rli->get_flag(Relay_log_info::IN_STMT)));
DBUG_PRINT("skip_event", ("%s event was %s",
ev->get_type_str(), explain[reason]));
@@ -2264,7 +2444,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
event execution. But we will release it in places where we will
wait for something for example inside of next_event().
*/
- pthread_mutex_lock(&rli->data_lock);
+ mysql_mutex_lock(&rli->data_lock);
Log_event * ev = next_event(rli);
@@ -2272,7 +2452,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
if (sql_slave_killed(thd,rli))
{
- pthread_mutex_unlock(&rli->data_lock);
+ mysql_mutex_unlock(&rli->data_lock);
delete ev;
DBUG_RETURN(1);
}
@@ -2295,10 +2475,31 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
error in query execution to be printed.
*/
rli->abort_slave= 1;
- pthread_mutex_unlock(&rli->data_lock);
+ mysql_mutex_unlock(&rli->data_lock);
delete ev;
DBUG_RETURN(1);
}
+
+ { /**
+ The following failure injecion works in cooperation with tests
+ setting @@global.debug= 'd,incomplete_group_in_relay_log'.
+ Xid or Commit events are not executed to force the slave sql
+ read hanging if the realy log does not have any more events.
+ */
+ DBUG_EXECUTE_IF("incomplete_group_in_relay_log",
+ if ((ev->get_type_code() == XID_EVENT) ||
+ ((ev->get_type_code() == QUERY_EVENT) &&
+ strcmp("COMMIT", ((Query_log_event *) ev)->query) == 0))
+ {
+ DBUG_ASSERT(thd->transaction.all.modified_non_trans_table);
+ rli->abort_slave= 1;
+ mysql_mutex_unlock(&rli->data_lock);
+ delete ev;
+ rli->inc_event_relay_log_pos();
+ DBUG_RETURN(0);
+ };);
+ }
+
exec_res= apply_event_and_update_pos(ev, thd, rli);
/*
@@ -2356,10 +2557,10 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
/* chance for concurrent connection to get more locks */
safe_sleep(thd, min(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE),
(CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
- pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
+ mysql_mutex_lock(&rli->data_lock); // because of SHOW STATUS
rli->trans_retries++;
rli->retried_trans++;
- pthread_mutex_unlock(&rli->data_lock);
+ mysql_mutex_unlock(&rli->data_lock);
DBUG_PRINT("info", ("Slave retries transaction "
"rli->trans_retries: %lu", rli->trans_retries));
}
@@ -2387,7 +2588,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
}
DBUG_RETURN(exec_res);
}
- pthread_mutex_unlock(&rli->data_lock);
+ mysql_mutex_unlock(&rli->data_lock);
rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
ER(ER_SLAVE_RELAY_LOG_READ_FAILURE), "\
Could not parse relay log event entry. The possible reasons are: the master's \
@@ -2413,7 +2614,6 @@ static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
return FALSE;
}
-
/**
@brief Try to reconnect slave IO thread.
@@ -2519,7 +2719,7 @@ pthread_handler_t handle_slave_io(void *arg)
mysql= NULL ;
retry_count= 0;
- pthread_mutex_lock(&mi->run_lock);
+ mysql_mutex_lock(&mi->run_lock);
/* Inform waiting threads that slave has started */
mi->slave_run_id++;
@@ -2536,23 +2736,33 @@ pthread_handler_t handle_slave_io(void *arg)
mi->clear_error();
if (init_slave_thread(thd, SLAVE_THD_IO))
{
- pthread_cond_broadcast(&mi->start_cond);
- pthread_mutex_unlock(&mi->run_lock);
+ mysql_cond_broadcast(&mi->start_cond);
+ mysql_mutex_unlock(&mi->run_lock);
sql_print_error("Failed during slave I/O thread initialization");
goto err;
}
- pthread_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_thread_count);
threads.append(thd);
- pthread_mutex_unlock(&LOCK_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
mi->slave_running = 1;
mi->abort_slave = 0;
- pthread_mutex_unlock(&mi->run_lock);
- pthread_cond_broadcast(&mi->start_cond);
+ mysql_mutex_unlock(&mi->run_lock);
+ mysql_cond_broadcast(&mi->start_cond);
DBUG_PRINT("master_info",("log_file_name: '%s' position: %s",
mi->master_log_name,
llstr(mi->master_log_pos,llbuff)));
+ /* This must be called before run any binlog_relay_io hooks */
+ my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi);
+
+ if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)))
+ {
+ mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ ER(ER_SLAVE_FATAL_ERROR), "Failed to run 'thread_start' hook");
+ goto err;
+ }
+
if (!(mi->mysql = mysql = mysql_init(NULL)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
@@ -2602,7 +2812,7 @@ connected:
if (ret == 1)
/* Fatal error */
goto err;
-
+
if (ret == 2)
{
if (check_io_slave_killed(mi->io_thd, mi, "Slave I/O thread killed"
@@ -2652,7 +2862,7 @@ connected:
while (!io_slave_killed(thd,mi))
{
thd_proc_info(thd, "Requesting binlog dump");
- if (request_dump(mysql, mi, &suppress_warnings))
+ if (request_dump(thd, mysql, mi, &suppress_warnings))
{
sql_print_error("Failed on request_dump()");
if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
@@ -2672,6 +2882,7 @@ requesting master dump") ||
goto err;
goto connected;
});
+ const char *event_buf;
DBUG_ASSERT(mi->last_error().number == 0);
while (!io_slave_killed(thd,mi))
@@ -2732,14 +2943,37 @@ Stopping slave I/O thread due to out-of-memory error from master");
retry_count=0; // ok event, reset retry counter
thd_proc_info(thd, "Queueing master event to the relay log");
- if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
- event_len))
+ event_buf= (const char*)mysql->net.read_pos + 1;
+ if (RUN_HOOK(binlog_relay_io, after_read_event,
+ (thd, mi,(const char*)mysql->net.read_pos + 1,
+ event_len, &event_buf, &event_len)))
+ {
+ mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ ER(ER_SLAVE_FATAL_ERROR),
+ "Failed to run 'after_read_event' hook");
+ goto err;
+ }
+
+ /* XXX: 'synced' should be updated by queue_event to indicate
+ whether event has been synced to disk */
+ bool synced= 0;
+ if (queue_event(mi, event_buf, event_len))
{
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"could not queue event from master");
goto err;
}
+
+ if (RUN_HOOK(binlog_relay_io, after_queue_event,
+ (thd, mi, event_buf, event_len, synced)))
+ {
+ mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ ER(ER_SLAVE_FATAL_ERROR),
+ "Failed to run 'after_queue_event' hook");
+ goto err;
+ }
+
if (flush_master_info(mi, TRUE, TRUE))
{
sql_print_error("Failed to flush master info file");
@@ -2785,7 +3019,8 @@ err:
// print the current replication position
sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s",
IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
- thd->set_query(NULL, 0);
+ RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi));
+ thd->reset_query();
thd->reset_db(NULL, 0);
if (mysql)
{
@@ -2805,7 +3040,7 @@ err:
}
write_ignored_events_info_to_relay_log(thd, mi);
thd_proc_info(thd, "Waiting for slave mutex on exit");
- pthread_mutex_lock(&mi->run_lock);
+ mysql_mutex_lock(&mi->run_lock);
/* Forget the relay log's format */
delete mi->rli.relay_log.description_event_for_queue;
@@ -2814,11 +3049,10 @@ err:
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
DBUG_ASSERT(thd->net.buff != 0);
net_end(&thd->net); // destructor will not free it, because net.vio is 0
- close_thread_tables(thd);
- pthread_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_thread_count);
THD_CHECK_SENTRY(thd);
delete thd;
- pthread_mutex_unlock(&LOCK_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
mi->abort_slave= 0;
mi->slave_running= 0;
mi->io_thd= 0;
@@ -2827,9 +3061,9 @@ err:
is important. Otherwise a killer_thread can execute between the calls and
delete the mi structure leading to a crash! (see BUG#25306 for details)
*/
- pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
+ mysql_cond_broadcast(&mi->stop_cond); // tell the world we are done
DBUG_EXECUTE_IF("simulate_slave_delay_at_terminate_bug38694", sleep(5););
- pthread_mutex_unlock(&mi->run_lock);
+ mysql_mutex_unlock(&mi->run_lock);
DBUG_LEAVE; // Must match DBUG_ENTER()
my_thread_end();
@@ -2866,16 +3100,17 @@ int check_temp_dir(char* tmp_file)
/*
Check permissions to create a file.
*/
- if ((fd= my_create(tmp_file, CREATE_MODE,
- O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
- MYF(MY_WME))) < 0)
+ if ((fd= mysql_file_create(key_file_misc,
+ tmp_file, CREATE_MODE,
+ O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
+ MYF(MY_WME))) < 0)
DBUG_RETURN(1);
/*
Clean up.
*/
- my_close(fd, MYF(0));
- my_delete(tmp_file, MYF(0));
+ mysql_file_close(fd, MYF(0));
+ mysql_file_delete(key_file_misc, tmp_file, MYF(0));
DBUG_RETURN(0);
}
@@ -2906,7 +3141,7 @@ pthread_handler_t handle_slave_sql(void *arg)
DBUG_ENTER("handle_slave_sql");
DBUG_ASSERT(rli->inited);
- pthread_mutex_lock(&rli->run_lock);
+ mysql_mutex_lock(&rli->run_lock);
DBUG_ASSERT(!rli->slave_running);
errmsg= 0;
#ifndef DBUG_OFF
@@ -2928,8 +3163,8 @@ pthread_handler_t handle_slave_sql(void *arg)
TODO: this is currently broken - slave start and change master
will be stuck if we fail here
*/
- pthread_cond_broadcast(&rli->start_cond);
- pthread_mutex_unlock(&rli->run_lock);
+ mysql_cond_broadcast(&rli->start_cond);
+ mysql_mutex_unlock(&rli->run_lock);
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
"Failed during slave thread initialization");
goto err;
@@ -2937,9 +3172,9 @@ pthread_handler_t handle_slave_sql(void *arg)
thd->init_for_queries();
thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
set_thd_in_use_temporary_tables(rli); // (re)set sql_thd in use for saved temp tables
- pthread_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_thread_count);
threads.append(thd);
- pthread_mutex_unlock(&LOCK_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
/*
We are going to set slave_running to 1. Assuming slave I/O thread is
alive and connected, this is going to make Seconds_Behind_Master be 0
@@ -2949,8 +3184,8 @@ pthread_handler_t handle_slave_sql(void *arg)
Seconds_Behind_Master grows. No big deal.
*/
rli->abort_slave = 0;
- pthread_mutex_unlock(&rli->run_lock);
- pthread_cond_broadcast(&rli->start_cond);
+ mysql_mutex_unlock(&rli->run_lock);
+ mysql_cond_broadcast(&rli->start_cond);
/*
Reset errors for a clean start (otherwise, if the master is idle, the SQL
@@ -2965,9 +3200,9 @@ pthread_handler_t handle_slave_sql(void *arg)
rli->clear_error();
//tell the I/O thread to take relay_log_space_limit into account from now on
- pthread_mutex_lock(&rli->log_space_lock);
+ mysql_mutex_lock(&rli->log_space_lock);
rli->ignore_log_space_limit= 0;
- pthread_mutex_unlock(&rli->log_space_lock);
+ mysql_mutex_unlock(&rli->log_space_lock);
rli->trans_retries= 0; // start from "no error"
DBUG_PRINT("info", ("rli->trans_retries: %lu", rli->trans_retries));
@@ -3019,19 +3254,19 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
if (check_temp_dir(rli->slave_patternload_file))
{
- rli->report(ERROR_LEVEL, thd->main_da.sql_errno(),
+ rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(),
"Unable to use slave's temporary directory %s - %s",
- slave_load_tmpdir, thd->main_da.message());
+ slave_load_tmpdir, thd->stmt_da->message());
goto err;
}
/* execute init_slave variable */
- if (sys_init_slave.value_length)
+ if (opt_init_slave.length)
{
- execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
+ execute_init_command(thd, &opt_init_slave, &LOCK_sys_init_slave);
if (thd->is_slave_error)
{
- rli->report(ERROR_LEVEL, thd->main_da.sql_errno(),
+ rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(),
"Slave SQL thread aborted. Can't execute init_slave query");
goto err;
}
@@ -3041,7 +3276,7 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
First check until condition - probably there is nothing to execute. We
do not want to wait for next event in this case.
*/
- pthread_mutex_lock(&rli->data_lock);
+ mysql_mutex_lock(&rli->data_lock);
if (rli->slave_skip_counter)
{
strmake(saved_log_name, rli->group_relay_log_name, FN_REFLEN - 1);
@@ -3056,10 +3291,10 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
char buf[22];
sql_print_information("Slave SQL thread stopped because it reached its"
" UNTIL position %s", llstr(rli->until_pos(), buf));
- pthread_mutex_unlock(&rli->data_lock);
+ mysql_mutex_unlock(&rli->data_lock);
goto err;
}
- pthread_mutex_unlock(&rli->data_lock);
+ mysql_mutex_unlock(&rli->data_lock);
/* Read queries from the IO/THREAD until this thread is killed */
@@ -3098,20 +3333,20 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
if (thd->is_error())
{
- char const *const errmsg= thd->main_da.message();
+ char const *const errmsg= thd->stmt_da->message();
DBUG_PRINT("info",
- ("thd->main_da.sql_errno()=%d; rli->last_error.number=%d",
- thd->main_da.sql_errno(), last_errno));
+ ("thd->stmt_da->sql_errno()=%d; rli->last_error.number=%d",
+ thd->stmt_da->sql_errno(), last_errno));
if (last_errno == 0)
{
/*
This function is reporting an error which was not reported
while executing exec_relay_log_event().
*/
- rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), "%s", errmsg);
+ rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(), "%s", errmsg);
}
- else if (last_errno != thd->main_da.sql_errno())
+ else if (last_errno != thd->stmt_da->sql_errno())
{
/*
* An error was reported while executing exec_relay_log_event()
@@ -3120,12 +3355,12 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
* what caused the problem.
*/
sql_print_error("Slave (additional info): %s Error_code: %d",
- errmsg, thd->main_da.sql_errno());
+ errmsg, thd->stmt_da->sql_errno());
}
}
/* Print any warnings issued */
- List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
+ List_iterator_fast<MYSQL_ERROR> it(thd->warning_info->warn_list());
MYSQL_ERROR *err;
/*
Added controlled slave thread cancel for replication
@@ -3134,9 +3369,9 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
bool udf_error = false;
while ((err= it++))
{
- if (err->code == ER_CANT_OPEN_LIBRARY)
+ if (err->get_sql_errno() == ER_CANT_OPEN_LIBRARY)
udf_error = true;
- sql_print_warning("Slave: %s Error_code: %d",err->msg, err->code);
+ sql_print_warning("Slave: %s Error_code: %d", err->get_message_text(), err->get_sql_errno());
}
if (udf_error)
sql_print_error("Error loading user-defined library, slave SQL "
@@ -3175,12 +3410,12 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
variables is supposed to set them to 0 before terminating)).
*/
thd->catalog= 0;
- thd->set_query(NULL, 0);
+ thd->reset_query();
thd->reset_db(NULL, 0);
thd_proc_info(thd, "Waiting for slave mutex on exit");
- pthread_mutex_lock(&rli->run_lock);
+ mysql_mutex_lock(&rli->run_lock);
/* We need data_lock, at least to wake up any waiting master_pos_wait() */
- pthread_mutex_lock(&rli->data_lock);
+ mysql_mutex_lock(&rli->data_lock);
DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun
/* When master_pos_wait() wakes up it will check this and terminate */
rli->slave_running= 0;
@@ -3188,9 +3423,9 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
delete rli->relay_log.description_event_for_exec;
rli->relay_log.description_event_for_exec= 0;
/* Wake up master_pos_wait() */
- pthread_mutex_unlock(&rli->data_lock);
+ mysql_mutex_unlock(&rli->data_lock);
DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions"));
- pthread_cond_broadcast(&rli->data_cond);
+ mysql_cond_broadcast(&rli->data_cond);
rli->ignore_log_space_limit= 0; /* don't need any lock */
/* we die so won't remember charset - re-update them on next thread start */
rli->cached_charset_invalidate();
@@ -3207,18 +3442,18 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
THD_CHECK_SENTRY(thd);
rli->sql_thd= 0;
set_thd_in_use_temporary_tables(rli); // (re)set sql_thd in use for saved temp tables
- pthread_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_thread_count);
THD_CHECK_SENTRY(thd);
delete thd;
- pthread_mutex_unlock(&LOCK_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
/*
Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
is important. Otherwise a killer_thread can execute between the calls and
delete the mi structure leading to a crash! (see BUG#25306 for details)
*/
- pthread_cond_broadcast(&rli->stop_cond);
+ mysql_cond_broadcast(&rli->stop_cond);
DBUG_EXECUTE_IF("simulate_slave_delay_at_terminate_bug38694", sleep(5););
- pthread_mutex_unlock(&rli->run_lock); // tell the world we are done
+ mysql_mutex_unlock(&rli->run_lock); // tell the world we are done
DBUG_LEAVE; // Must match DBUG_ENTER()
my_thread_end();
@@ -3361,7 +3596,7 @@ err:
static int process_io_rotate(Master_info *mi, Rotate_log_event *rev)
{
DBUG_ENTER("process_io_rotate");
- safe_mutex_assert_owner(&mi->data_lock);
+ mysql_mutex_assert_owner(&mi->data_lock);
if (unlikely(!rev->is_valid()))
DBUG_RETURN(1);
@@ -3452,11 +3687,11 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf,
sql_print_error("Read invalid event from master: '%s',\
master could be corrupt but a more likely cause of this is a bug",
errmsg);
- my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
+ my_free(tmp_buf);
DBUG_RETURN(1);
}
- pthread_mutex_lock(&mi->data_lock);
+ mysql_mutex_lock(&mi->data_lock);
ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
switch (ev->get_type_code()) {
case STOP_EVENT:
@@ -3467,7 +3702,7 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf,
if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
{
delete ev;
- pthread_mutex_unlock(&mi->data_lock);
+ mysql_mutex_unlock(&mi->data_lock);
DBUG_RETURN(1);
}
inc_pos= 0;
@@ -3488,8 +3723,8 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf,
delete ev;
mi->master_log_pos += inc_pos;
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
- pthread_mutex_unlock(&mi->data_lock);
- my_free((char*)tmp_buf, MYF(0));
+ mysql_mutex_unlock(&mi->data_lock);
+ my_free(tmp_buf);
DBUG_RETURN(error);
}
default:
@@ -3507,7 +3742,7 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf,
if (unlikely(rli->relay_log.append(ev)))
{
delete ev;
- pthread_mutex_unlock(&mi->data_lock);
+ mysql_mutex_unlock(&mi->data_lock);
DBUG_RETURN(1);
}
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
@@ -3515,7 +3750,7 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf,
delete ev;
mi->master_log_pos+= inc_pos;
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
- pthread_mutex_unlock(&mi->data_lock);
+ mysql_mutex_unlock(&mi->data_lock);
DBUG_RETURN(0);
}
@@ -3540,10 +3775,10 @@ static int queue_binlog_ver_3_event(Master_info *mi, const char *buf,
sql_print_error("Read invalid event from master: '%s',\
master could be corrupt but a more likely cause of this is a bug",
errmsg);
- my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
+ my_free(tmp_buf);
DBUG_RETURN(1);
}
- pthread_mutex_lock(&mi->data_lock);
+ mysql_mutex_lock(&mi->data_lock);
switch (ev->get_type_code()) {
case STOP_EVENT:
goto err;
@@ -3551,7 +3786,7 @@ static int queue_binlog_ver_3_event(Master_info *mi, const char *buf,
if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
{
delete ev;
- pthread_mutex_unlock(&mi->data_lock);
+ mysql_mutex_unlock(&mi->data_lock);
DBUG_RETURN(1);
}
inc_pos= 0;
@@ -3563,7 +3798,7 @@ static int queue_binlog_ver_3_event(Master_info *mi, const char *buf,
if (unlikely(rli->relay_log.append(ev)))
{
delete ev;
- pthread_mutex_unlock(&mi->data_lock);
+ mysql_mutex_unlock(&mi->data_lock);
DBUG_RETURN(1);
}
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
@@ -3571,7 +3806,7 @@ static int queue_binlog_ver_3_event(Master_info *mi, const char *buf,
mi->master_log_pos+= inc_pos;
err:
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
- pthread_mutex_unlock(&mi->data_lock);
+ mysql_mutex_unlock(&mi->data_lock);
DBUG_RETURN(0);
}
@@ -3618,9 +3853,11 @@ static int queue_old_event(Master_info *mi, const char *buf,
static int queue_event(Master_info* mi,const char* buf, ulong event_len)
{
int error= 0;
+ String error_msg;
ulong inc_pos;
Relay_log_info *rli= &mi->rli;
- pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
+ mysql_mutex_t *log_lock= rli->relay_log.get_log_lock();
+ ulong s_id;
DBUG_ENTER("queue_event");
LINT_INIT(inc_pos);
@@ -3630,7 +3867,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
DBUG_RETURN(queue_old_event(mi,buf,event_len));
LINT_INIT(inc_pos);
- pthread_mutex_lock(&mi->data_lock);
+ mysql_mutex_lock(&mi->data_lock);
switch (buf[EVENT_TYPE_OFFSET]) {
case STOP_EVENT:
@@ -3652,7 +3889,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
if (unlikely(process_io_rotate(mi,&rev)))
{
- error= 1;
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
goto err;
}
/*
@@ -3679,7 +3916,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
Log_event::read_log_event(buf, event_len, &errmsg,
mi->rli.relay_log.description_event_for_queue)))
{
- error= 2;
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
goto err;
}
delete mi->rli.relay_log.description_event_for_queue;
@@ -3698,6 +3935,56 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
}
break;
+
+ case HEARTBEAT_LOG_EVENT:
+ {
+ /*
+ HB (heartbeat) cannot come before RL (Relay)
+ */
+ char llbuf[22];
+ Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
+ if (!hb.is_valid())
+ {
+ error= ER_SLAVE_HEARTBEAT_FAILURE;
+ error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
+ error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
+ error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident()));
+ error_msg.append(STRING_WITH_LEN(" log_pos "));
+ llstr(hb.log_pos, llbuf);
+ error_msg.append(llbuf, strlen(llbuf));
+ goto err;
+ }
+ mi->received_heartbeats++;
+ /*
+ compare local and event's versions of log_file, log_pos.
+
+ Heartbeat is sent only after an event corresponding to the corrdinates
+ the heartbeat carries.
+ Slave can not have a difference in coordinates except in the only
+ special case when mi->master_log_name, master_log_pos have never
+ been updated by Rotate event i.e when slave does not have any history
+ with the master (and thereafter mi->master_log_pos is NULL).
+
+ TODO: handling `when' for SHOW SLAVE STATUS' snds behind
+ */
+ if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len())
+ && mi->master_log_name != NULL)
+ || mi->master_log_pos != hb.log_pos)
+ {
+ /* missed events of heartbeat from the past */
+ error= ER_SLAVE_HEARTBEAT_FAILURE;
+ error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
+ error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
+ error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident()));
+ error_msg.append(STRING_WITH_LEN(" log_pos "));
+ llstr(hb.log_pos, llbuf);
+ error_msg.append(llbuf, strlen(llbuf));
+ goto err;
+ }
+ goto skip_relay_logging;
+ }
+ break;
+
default:
inc_pos= event_len;
break;
@@ -3716,10 +4003,21 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
direct master (an unsupported, useless setup!).
*/
- pthread_mutex_lock(log_lock);
-
- if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
- !mi->rli.replicate_same_server_id)
+ mysql_mutex_lock(log_lock);
+ s_id= uint4korr(buf + SERVER_ID_OFFSET);
+ if ((s_id == ::server_id && !mi->rli.replicate_same_server_id) ||
+ /*
+ the following conjunction deals with IGNORE_SERVER_IDS, if set
+ If the master is on the ignore list, execution of
+ format description log events and rotate events is necessary.
+ */
+ (mi->ignore_server_ids.elements > 0 &&
+ mi->shall_ignore_server_id(s_id) &&
+ /* everything is filtered out from non-master */
+ (s_id != mi->master_id ||
+ /* for the master meta information is necessary */
+ (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
+ buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))))
{
/*
Do not write it to the relay log.
@@ -3734,10 +4032,14 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
But events which were generated by this slave and which do not exist in
the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
mi->master_log_pos.
+ If the event is originated remotely and is being filtered out by
+ IGNORE_SERVER_IDS it increments mi->master_log_pos
+ as well as rli->group_relay_log_pos.
*/
- if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
- buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
- buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
+ if (!(s_id == ::server_id && !mi->rli.replicate_same_server_id) ||
+ (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
+ buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT &&
+ buf[EVENT_TYPE_OFFSET] != STOP_EVENT))
{
mi->master_log_pos+= inc_pos;
memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
@@ -3745,8 +4047,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
rli->ign_master_log_pos_end= mi->master_log_pos;
}
rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
- DBUG_PRINT("info", ("master_log_pos: %lu, event originating from the same server, ignored",
- (ulong) mi->master_log_pos));
+ DBUG_PRINT("info", ("master_log_pos: %lu, event originating from %u server, ignored",
+ (ulong) mi->master_log_pos, uint4korr(buf + SERVER_ID_OFFSET)));
}
else
{
@@ -3758,15 +4060,23 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
}
else
- error= 3;
+ {
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ }
rli->ign_master_log_name_end[0]= 0; // last event is not ignored
}
- pthread_mutex_unlock(log_lock);
-
+ mysql_mutex_unlock(log_lock);
+skip_relay_logging:
+
err:
- pthread_mutex_unlock(&mi->data_lock);
+ mysql_mutex_unlock(&mi->data_lock);
DBUG_PRINT("info", ("error: %d", error));
+ if (error)
+ mi->report(ERROR_LEVEL, error, ER(error),
+ (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
+ "could not queue event from master" :
+ error_msg.ptr());
DBUG_RETURN(error);
}
@@ -3780,13 +4090,13 @@ void end_relay_log_info(Relay_log_info* rli)
if (rli->info_fd >= 0)
{
end_io_cache(&rli->info_file);
- (void) my_close(rli->info_fd, MYF(MY_WME));
+ mysql_file_close(rli->info_fd, MYF(MY_WME));
rli->info_fd = -1;
}
if (rli->cur_log_fd >= 0)
{
end_io_cache(&rli->cache_buf);
- (void)my_close(rli->cur_log_fd, MYF(MY_WME));
+ mysql_file_close(rli->cur_log_fd, MYF(MY_WME));
rli->cur_log_fd = -1;
}
rli->inited = 0;
@@ -3972,6 +4282,71 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
}
+MYSQL *rpl_connect_master(MYSQL *mysql)
+{
+ THD *thd= current_thd;
+ Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO);
+ if (!mi)
+ {
+ sql_print_error("'rpl_connect_master' must be called in slave I/O thread context.");
+ return NULL;
+ }
+
+ bool allocated= false;
+
+ if (!mysql)
+ {
+ if(!(mysql= mysql_init(NULL)))
+ {
+ sql_print_error("rpl_connect_master: failed in mysql_init()");
+ return NULL;
+ }
+ allocated= true;
+ }
+
+ /*
+ XXX: copied from connect_to_master, this function should not
+ change the slave status, so we cannot use connect_to_master
+ directly
+
+ TODO: make this part a seperate function to eliminate duplication
+ */
+ mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
+ mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
+
+#ifdef HAVE_OPENSSL
+ if (mi->ssl)
+ {
+ mysql_ssl_set(mysql,
+ mi->ssl_key[0]?mi->ssl_key:0,
+ mi->ssl_cert[0]?mi->ssl_cert:0,
+ mi->ssl_ca[0]?mi->ssl_ca:0,
+ mi->ssl_capath[0]?mi->ssl_capath:0,
+ mi->ssl_cipher[0]?mi->ssl_cipher:0);
+ mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT,
+ &mi->ssl_verify_server_cert);
+ }
+#endif
+
+ mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
+ /* This one is not strictly needed but we have it here for completeness */
+ mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);
+
+ if (io_slave_killed(thd, mi)
+ || !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
+ mi->port, 0, 0))
+ {
+ if (!io_slave_killed(thd, mi))
+ sql_print_error("rpl_connect_master: error connecting to master: %s (server_error: %d)",
+ mysql_error(mysql), mysql_errno(mysql));
+
+ if (allocated)
+ mysql_close(mysql); // this will free the object
+ return NULL;
+ }
+ return mysql;
+}
+
/*
Store the file and position where the execute-slave thread are in the
relay log.
@@ -3981,8 +4356,9 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
rli Relay log information
NOTES
- - As this is only called by the slave thread, we don't need to
- have a lock on this.
+ - As this is only called by the slave thread or on STOP SLAVE, with the
+ log_lock grabbed and the slave thread stopped, we don't need to have
+ a lock here.
- If there is an active transaction, then we don't update the position
in the relay log. This is to ensure that we re-execute statements
if we die in the middle of an transaction that was rolled back.
@@ -4025,8 +4401,18 @@ bool flush_relay_log_info(Relay_log_info* rli)
error=1;
if (flush_io_cache(file))
error=1;
-
- /* Flushing the relay log is done by the slave I/O thread */
+ if (sync_relayloginfo_period &&
+ !error &&
+ ++(rli->sync_counter) >= sync_relayloginfo_period)
+ {
+ if (my_sync(rli->info_fd, MYF(MY_WME)))
+ error=1;
+ rli->sync_counter= 0;
+ }
+ /*
+ Flushing the relay log is done by the slave I/O thread
+ or by the user on STOP SLAVE.
+ */
DBUG_RETURN(error);
}
@@ -4070,7 +4456,7 @@ static Log_event* next_event(Relay_log_info* rli)
{
Log_event* ev;
IO_CACHE* cur_log = rli->cur_log;
- pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
+ mysql_mutex_t *log_lock = rli->relay_log.get_log_lock();
const char* errmsg=0;
THD* thd = rli->sql_thd;
DBUG_ENTER("next_event");
@@ -4087,9 +4473,9 @@ static Log_event* next_event(Relay_log_info* rli)
so we assume calling function acquired this mutex for us and we will
hold it for the most of the loop below However, we will release it
whenever it is worth the hassle, and in the cases when we go into a
- pthread_cond_wait() with the non-data_lock mutex
+ mysql_cond_wait() with the non-data_lock mutex
*/
- safe_mutex_assert_owner(&rli->data_lock);
+ mysql_mutex_assert_owner(&rli->data_lock);
while (!sql_slave_killed(thd,rli))
{
@@ -4108,7 +4494,7 @@ static Log_event* next_event(Relay_log_info* rli)
if ((hot_log = (cur_log != &rli->cache_buf)))
{
DBUG_ASSERT(rli->cur_log_fd == -1); // foreign descriptor
- pthread_mutex_lock(log_lock);
+ mysql_mutex_lock(log_lock);
/*
Reading xxx_file_id is safe because the log will only
@@ -4118,7 +4504,7 @@ static Log_event* next_event(Relay_log_info* rli)
{
// The master has switched to a new log file; Reopen the old log file
cur_log=reopen_relay_log(rli, &errmsg);
- pthread_mutex_unlock(log_lock);
+ mysql_mutex_unlock(log_lock);
if (!cur_log) // No more log files
goto err;
hot_log=0; // Using old binary log
@@ -4165,7 +4551,7 @@ static Log_event* next_event(Relay_log_info* rli)
*/
rli->future_event_relay_log_pos= my_b_tell(cur_log);
if (hot_log)
- pthread_mutex_unlock(log_lock);
+ mysql_mutex_unlock(log_lock);
DBUG_RETURN(ev);
}
DBUG_ASSERT(thd==rli->sql_thd);
@@ -4175,7 +4561,7 @@ static Log_event* next_event(Relay_log_info* rli)
{
errmsg = "slave SQL thread aborted because of I/O error";
if (hot_log)
- pthread_mutex_unlock(log_lock);
+ mysql_mutex_unlock(log_lock);
goto err;
}
if (!cur_log->error) /* EOF */
@@ -4222,7 +4608,7 @@ static Log_event* next_event(Relay_log_info* rli)
0, rli->ign_master_log_pos_end,
Rotate_log_event::DUP_NAME);
rli->ign_master_log_name_end[0]= 0;
- pthread_mutex_unlock(log_lock);
+ mysql_mutex_unlock(log_lock);
if (unlikely(!ev))
{
errmsg= "Slave SQL thread failed to create a Rotate event "
@@ -4237,7 +4623,7 @@ static Log_event* next_event(Relay_log_info* rli)
We can, and should release data_lock while we are waiting for
update. If we do not, show slave status will block
*/
- pthread_mutex_unlock(&rli->data_lock);
+ mysql_mutex_unlock(&rli->data_lock);
/*
Possible deadlock :
@@ -4263,7 +4649,7 @@ static Log_event* next_event(Relay_log_info* rli)
be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
it stops.
*/
- pthread_mutex_lock(&rli->log_space_lock);
+ mysql_mutex_lock(&rli->log_space_lock);
// prevent the I/O thread from blocking next times
rli->ignore_log_space_limit= 1;
/*
@@ -4272,12 +4658,12 @@ static Log_event* next_event(Relay_log_info* rli)
~Relay_log_info(), i.e. when rli is destroyed, and rli will
not be destroyed before we exit the present function.
*/
- pthread_mutex_unlock(&rli->log_space_lock);
- pthread_cond_broadcast(&rli->log_space_cond);
- // Note that wait_for_update unlocks lock_log !
- rli->relay_log.wait_for_update(rli->sql_thd, 1);
+ mysql_mutex_unlock(&rli->log_space_lock);
+ mysql_cond_broadcast(&rli->log_space_cond);
+ // Note that wait_for_update_relay_log unlocks lock_log !
+ rli->relay_log.wait_for_update_relay_log(rli->sql_thd);
// re-acquire data lock since we released it earlier
- pthread_mutex_lock(&rli->data_lock);
+ mysql_mutex_lock(&rli->data_lock);
rli->last_master_timestamp= save_timestamp;
continue;
}
@@ -4288,7 +4674,7 @@ static Log_event* next_event(Relay_log_info* rli)
*/
end_io_cache(cur_log);
DBUG_ASSERT(rli->cur_log_fd >= 0);
- my_close(rli->cur_log_fd, MYF(MY_WME));
+ mysql_file_close(rli->cur_log_fd, MYF(MY_WME));
rli->cur_log_fd = -1;
if (relay_log_purge)
@@ -4345,7 +4731,7 @@ static Log_event* next_event(Relay_log_info* rli)
DBUG_PRINT("info",("hot_log: %d",hot_log));
if (!hot_log) /* if hot_log, we already have this mutex */
- pthread_mutex_lock(log_lock);
+ mysql_mutex_lock(log_lock);
if (rli->relay_log.is_active(rli->linfo.log_file_name))
{
#ifdef EXTRA_DEBUG
@@ -4420,13 +4806,16 @@ static Log_event* next_event(Relay_log_info* rli)
my_b_seek(cur_log, (my_off_t) 0);
if (check_binlog_magic(cur_log,&errmsg))
{
- if (!hot_log) pthread_mutex_unlock(log_lock);
+ if (!hot_log)
+ mysql_mutex_unlock(log_lock);
goto err;
}
- if (!hot_log) pthread_mutex_unlock(log_lock);
+ if (!hot_log)
+ mysql_mutex_unlock(log_lock);
continue;
}
- if (!hot_log) pthread_mutex_unlock(log_lock);
+ if (!hot_log)
+ mysql_mutex_unlock(log_lock);
/*
if we get here, the log was not hot, so we will have to open it
ourselves. We are sure that the log is still not hot now (a log can get
@@ -4449,7 +4838,7 @@ static Log_event* next_event(Relay_log_info* rli)
TODO: come up with something better to handle this error
*/
if (hot_log)
- pthread_mutex_unlock(log_lock);
+ mysql_mutex_unlock(log_lock);
sql_print_error("Slave SQL thread: I/O error reading \
event(errno: %d cur_log->error: %d)",
my_errno,cur_log->error);
@@ -4487,8 +4876,7 @@ int rotate_relay_log(Master_info* mi)
Relay_log_info* rli= &mi->rli;
int error= 0;
- /* We don't lock rli->run_lock. This would lead to deadlocks. */
- pthread_mutex_lock(&mi->run_lock);
+ DBUG_EXECUTE_IF("crash_before_rotate_relaylog", DBUG_SUICIDE(););
/*
We need to test inited because otherwise, new_file() will attempt to lock
@@ -4519,7 +4907,6 @@ int rotate_relay_log(Master_info* mi)
*/
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
end:
- pthread_mutex_unlock(&mi->run_lock);
DBUG_RETURN(error);
}