summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--dbug/dbug.c8
-rw-r--r--sql/mysqld.cc79
-rw-r--r--sql/net_pkg.cc6
-rw-r--r--sql/slave.cc65
-rw-r--r--sql/slave.h5
-rw-r--r--sql/sql_class.cc8
-rw-r--r--sql/sql_class.h8
-rw-r--r--sql/sql_repl.cc18
8 files changed, 131 insertions, 66 deletions
diff --git a/dbug/dbug.c b/dbug/dbug.c
index 8f214bfbe7c..88d8043c0e5 100644
--- a/dbug/dbug.c
+++ b/dbug/dbug.c
@@ -706,8 +706,6 @@ char ***_sframep_ __attribute__((unused)))
if (!_no_db_)
{
int save_errno=errno;
- if (!init_done)
- _db_push_ (_DBUG_START_CONDITION_);
/* Sasha: the test below is so we could call functions with DBUG_ENTER
before my_thread_init(). I needed this because I suspected corruption
of a block allocated by my_thread_init() itself, so I wanted to use
@@ -715,6 +713,8 @@ char ***_sframep_ __attribute__((unused)))
*/
if (!(state=code_state()))
return;
+ if (!init_done)
+ _db_push_ (_DBUG_START_CONDITION_);
*_sfunc_ = state->func;
*_sfile_ = state->file;
@@ -794,10 +794,10 @@ uint *_slevel_)
if (!_no_db_)
{
int save_errno=errno;
+ if (!(state=code_state()))
+ return;
if (!init_done)
_db_push_ ("");
- if (!(state=code_state()))
- return; /* Only happens at end of program */
if (stack->flags & (TRACE_ON | DEBUG_ON | PROFILE_ON))
{
if (!state->locked)
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index a0154f980f1..3a441bb61dc 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -1911,45 +1911,6 @@ int main(int argc, char **argv)
using_update_log=1;
}
- init_slave();
-
- if (opt_bin_log && !server_id)
- {
- server_id= !master_host ? 1 : 2;
- switch (server_id) {
-#ifdef EXTRA_DEBUG
- case 1:
- sql_print_error("\
-Warning: You have enabled the binary log, but you haven't set server-id:\n\
-Updates will be logged to the binary log, but connections to slaves will\n\
-not be accepted.");
- break;
-#endif
- case 2:
- sql_print_error("\
-Warning: You should set server-id to a non-0 value if master_host is set.\n\
-The server will not act as a slave.");
- break;
- }
- }
- if (opt_bin_log)
- {
- if (!opt_bin_logname)
- {
- char tmp[FN_REFLEN];
- /* TODO: The following should be using fn_format(); We just need to
- first change fn_format() to cut the file name if it's too long.
- */
- strmake(tmp,glob_hostname,FN_REFLEN-5);
- strmov(strcend(tmp,'.'),"-bin");
- opt_bin_logname=my_strdup(tmp,MYF(MY_WME));
- }
- mysql_bin_log.set_index_file_name(opt_binlog_index_name);
- open_log(&mysql_bin_log, glob_hostname, opt_bin_logname, "-bin",
- LOG_BIN);
- using_update_log=1;
- }
-
if (opt_slow_log)
open_log(&mysql_slow_log, glob_hostname, opt_slow_logname, "-slow.log",
LOG_NORMAL);
@@ -2020,6 +1981,46 @@ The server will not act as a slave.");
if (!opt_noacl)
udf_init();
#endif
+ /* init_slave() must be called after the thread keys are created */
+ init_slave();
+
+ if (opt_bin_log && !server_id)
+ {
+ server_id= !master_host ? 1 : 2;
+ switch (server_id) {
+#ifdef EXTRA_DEBUG
+ case 1:
+ sql_print_error("\
+Warning: You have enabled the binary log, but you haven't set server-id:\n\
+Updates will be logged to the binary log, but connections to slaves will\n\
+not be accepted.");
+ break;
+#endif
+ case 2:
+ sql_print_error("\
+Warning: You should set server-id to a non-0 value if master_host is set.\n\
+The server will not act as a slave.");
+ break;
+ }
+ }
+ if (opt_bin_log)
+ {
+ if (!opt_bin_logname)
+ {
+ char tmp[FN_REFLEN];
+ /* TODO: The following should be using fn_format(); We just need to
+ first change fn_format() to cut the file name if it's too long.
+ */
+ strmake(tmp,glob_hostname,FN_REFLEN-5);
+ strmov(strcend(tmp,'.'),"-bin");
+ opt_bin_logname=my_strdup(tmp,MYF(MY_WME));
+ }
+ mysql_bin_log.set_index_file_name(opt_binlog_index_name);
+ open_log(&mysql_bin_log, glob_hostname, opt_bin_logname, "-bin",
+ LOG_BIN);
+ using_update_log=1;
+ }
+
if (opt_bootstrap)
{
diff --git a/sql/net_pkg.cc b/sql/net_pkg.cc
index 64c1b07a493..9e52e4580e6 100644
--- a/sql/net_pkg.cc
+++ b/sql/net_pkg.cc
@@ -108,7 +108,11 @@ net_printf(NET *net, uint errcode, ...)
thd->query_error = 1; // if we are here, something is wrong :-)
query_cache_abort(net); // Safety
va_start(args,errcode);
- format=ER(errcode);
+ // Sasha: this is needed to make net_printf() work with 0 argument for
+ // errorcode and use the argument after that as the format string. This
+ // is usefull for rare errors that are not worth the hassle to put in
+ // errmsg.sys, but at the same time, the message is not fixed text
+ format=errcode ? ER(errcode) : va_arg(args,char*);
offset= net->return_errno ? 2 : 0;
text_pos=(char*) net->buff+head_length+offset+1;
(void) vsprintf(my_const_cast(char*) (text_pos),format,args);
diff --git a/sql/slave.cc b/sql/slave.cc
index 9050bf7362f..919cf362238 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -166,6 +166,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
ulonglong pos, bool need_data_lock,
const char** errmsg)
{
+ *errmsg=0;
if (rli->log_pos_current)
return 0;
pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
@@ -348,6 +349,7 @@ int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
/* is is criticate to test if the slave is running. Otherwise, we might
be referening freed memory trying to kick it
*/
+ THD_CHECK_SENTRY(thd);
if (*slave_running)
{
KICK_SLAVE(thd);
@@ -966,6 +968,8 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
rli->cur_log_fd = -1;
rli->slave_skip_counter=0;
rli->log_pos_current=0;
+ rli->abort_pos_wait=0;
+ rli->skip_log_purge=0;
// TODO: make this work with multi-master
if (!opt_relay_logname)
{
@@ -1296,9 +1300,16 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
bool pos_reached = 0;
int event_count = 0;
pthread_mutex_lock(&data_lock);
- while (!thd->killed)
+ abort_pos_wait=0; // abort only if master info changes during wait
+ while (!thd->killed || !abort_pos_wait)
{
int cmp_result;
+ if (abort_pos_wait)
+ {
+ abort_pos_wait=0;
+ pthread_mutex_unlock(&data_lock);
+ return -1;
+ }
DBUG_ASSERT(*master_log_name || master_log_pos == 0);
if (*master_log_name)
{
@@ -1350,10 +1361,7 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
thd->thread_id = thread_id++;
pthread_mutex_unlock(&LOCK_thread_count);
- if (init_thr_lock() ||
- my_pthread_setspecific_ptr(THR_THD, thd) ||
- my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) ||
- my_pthread_setspecific_ptr(THR_NET, &thd->net))
+ if (init_thr_lock() || thd->store_globals())
{
end_thread(thd,0);
DBUG_RETURN(-1);
@@ -1367,7 +1375,6 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif
- thd->mem_root.free=thd->mem_root.used=0; // Probably not needed
if (thd->max_join_size == (ulong) ~0L)
thd->options |= OPTION_BIG_SELECTS;
@@ -1381,7 +1388,6 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
}
thd->version=refresh_version;
thd->set_time();
-
DBUG_RETURN(0);
}
@@ -1611,6 +1617,7 @@ slave_begin:
my_thread_init();
thd = new THD; // note that contructor of THD uses DBUG_ !
DBUG_ENTER("handle_slave_io");
+ THD_CHECK_SENTRY(thd);
pthread_detach_this_thread();
if (init_slave_thread(thd, SLAVE_THD_IO))
@@ -1808,11 +1815,12 @@ err:
DBUG_ASSERT(thd->net.buff != 0);
net_end(&thd->net); // destructor will not free it, because net.vio is 0
pthread_mutex_lock(&LOCK_thread_count);
+ THD_CHECK_SENTRY(thd);
delete thd;
pthread_mutex_unlock(&LOCK_thread_count);
+ my_thread_end(); // clean-up before broadcast
pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
pthread_mutex_unlock(&mi->run_lock);
- my_thread_end();
#ifndef DBUG_OFF
if(abort_slave_event_count && !events_till_abort)
goto slave_begin;
@@ -1848,7 +1856,8 @@ slave_begin:
my_thread_init();
thd = new THD; // note that contructor of THD uses DBUG_ !
DBUG_ENTER("handle_slave_sql");
-
+ THD_CHECK_SENTRY(thd);
+
pthread_detach_this_thread();
if (init_slave_thread(thd, SLAVE_THD_SQL))
{
@@ -1861,6 +1870,7 @@ slave_begin:
sql_print_error("Failed during slave thread initialization");
goto err;
}
+ THD_CHECK_SENTRY(thd);
thd->thread_stack = (char*)&thd; // remember where our stack is
thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
threads.append(thd);
@@ -1891,6 +1901,7 @@ log '%s' at position %s,relay log: name='%s',pos='%s'", RPL_LOG_NAME,
{
thd->proc_info = "Processing master log event";
DBUG_ASSERT(rli->sql_thd == thd);
+ THD_CHECK_SENTRY(thd);
if (exec_relay_log_event(thd,rli))
{
// do not scare the user if SQL thread was simply killed or stopped
@@ -1926,14 +1937,16 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
DBUG_ASSERT(thd->net.buff != 0);
net_end(&thd->net); // destructor will not free it, because we are weird
DBUG_ASSERT(rli->sql_thd == thd);
+ THD_CHECK_SENTRY(thd);
rli->sql_thd = 0;
pthread_mutex_lock(&LOCK_thread_count);
+ THD_CHECK_SENTRY(thd);
delete thd;
pthread_mutex_unlock(&LOCK_thread_count);
+ my_thread_end(); // clean-up before broadcasting termination
pthread_cond_broadcast(&rli->stop_cond);
// tell the world we are done
pthread_mutex_unlock(&rli->run_lock);
- my_thread_end();
#ifndef DBUG_OFF // TODO: reconsider the code below
if (abort_slave_event_count && !rli->events_till_abort)
goto slave_begin;
@@ -2429,13 +2442,35 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
end_io_cache(cur_log);
DBUG_ASSERT(rli->cur_log_fd >= 0);
my_close(rli->cur_log_fd, MYF(MY_WME));
- rli->cur_log_fd = -1;
+ rli->cur_log_fd = -1;
- // purge_first_log will properly set up relay log coordinates in rli
- if (rli->relay_log.purge_first_log(rli))
+ // TODO: make skip_log_purge a start-up option. At this point this
+ // is not critical priority
+ if (!rli->skip_log_purge)
{
- errmsg = "Error purging processed log";
- goto err;
+ // purge_first_log will properly set up relay log coordinates in rli
+ if (rli->relay_log.purge_first_log(rli))
+ {
+ errmsg = "Error purging processed log";
+ goto err;
+ }
+ }
+ else
+ {
+ // TODO: verify that no lock is ok here. At this point, if we
+ // get this wrong, this is actually no big deal - the only time
+ // this code will ever be executed is if we are recovering from
+ // a bug when a full reload of the slave is not feasible or
+ // desirable.
+ if (rli->relay_log.find_next_log(&rli->linfo,0/*no lock*/))
+ {
+ errmsg = "error switching to the next log";
+ goto err;
+ }
+ rli->relay_log_pos = 4;
+ strnmov(rli->relay_log_name,rli->linfo.log_file_name,
+ sizeof(rli->relay_log_name));
+ flush_relay_log_info(rli);
}
// next log is hot
diff --git a/sql/slave.h b/sql/slave.h
index 9d3f55cbfbf..7ae5da1a340 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -151,10 +151,13 @@ typedef struct st_relay_log_info
char last_slave_error[MAX_SLAVE_ERRMSG];
THD* sql_thd;
bool log_pos_current;
+ bool abort_pos_wait;
+ bool skip_log_purge;
st_relay_log_info():info_fd(-1),cur_log_fd(-1),inited(0),
cur_log_init_count(0),
- log_pos_current(0)
+ log_pos_current(0),abort_pos_wait(0),
+ skip_log_purge(0)
{
relay_log_name[0] = master_log_name[0] = 0;
bzero(&info_file,sizeof(info_file));
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index fdb7825e344..03bb8ae2c97 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -104,6 +104,9 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0),
cond_count=0;
convert_set=0;
mysys_var=0;
+#ifndef DBUG_OFF
+ dbug_sentry=THD_SENTRY_MAGIC;
+#endif
net.vio=0;
ull=0;
system_thread=cleanup_done=0;
@@ -191,6 +194,7 @@ void THD::cleanup(void)
THD::~THD()
{
+ THD_CHECK_SENTRY(this);
DBUG_ENTER("~THD()");
/* Close connection */
if (net.vio)
@@ -223,12 +227,16 @@ THD::~THD()
mysys_var=0; // Safety (shouldn't be needed)
#ifdef SIGNAL_WITH_VIO_CLOSE
pthread_mutex_destroy(&active_vio_lock);
+#endif
+#ifndef DBUG_OFF
+ dbug_sentry = THD_SENTRY_GONE;
#endif
DBUG_VOID_RETURN;
}
void THD::awake(bool prepare_to_die)
{
+ THD_CHECK_SENTRY(this);
if (prepare_to_die)
killed = 1;
thr_alarm_kill(real_id);
diff --git a/sql/sql_class.h b/sql/sql_class.h
index fa63cd90976..289a2ab9255 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -251,6 +251,11 @@ public:
class delayed_insert;
+#define THD_SENTRY_MAGIC 0xfeedd1ff
+#define THD_SENTRY_GONE 0xdeadbeef
+
+#define THD_CHECK_SENTRY(thd) DBUG_ASSERT(thd->dbug_sentry == THD_SENTRY_MAGIC)
+
/* For each client connection we create a separate thread with THD serving as
a thread/connection descriptor */
@@ -312,6 +317,9 @@ public:
// TODO: document the variables below
MYSQL_LOCK *lock,*locked_tables;
ULL *ull;
+#ifndef DBUG_OFF
+ uint dbug_sentry; // watch out for memory corruption
+#endif
struct st_my_thread_var *mysys_var;
enum enum_server_command command;
uint32 server_id;
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index bbe92f3e526..8216dec815a 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -714,7 +714,10 @@ int change_master(THD* thd, MASTER_INFO* mi)
return 1;
}
- pthread_mutex_lock(&mi->data_lock);
+ /* data lock not needed since we have already stopped the running threads,
+ and we have the hold on the run locks which will keep all threads that
+ could possibly modify the data structures from running
+ */
if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
{
// if we change host or port, we must reset the postion
@@ -746,6 +749,7 @@ int change_master(THD* thd, MASTER_INFO* mi)
if (lex_mi->relay_log_name)
{
need_relay_log_purge = 0;
+ mi->rli.skip_log_purge=1;
strnmov(mi->rli.relay_log_name,lex_mi->relay_log_name,
sizeof(mi->rli.relay_log_name));
}
@@ -759,16 +763,14 @@ int change_master(THD* thd, MASTER_INFO* mi)
flush_master_info(mi);
if (need_relay_log_purge)
{
- pthread_mutex_unlock(&mi->data_lock);
+ mi->rli.skip_log_purge=0;
thd->proc_info="purging old relay logs";
if (purge_relay_logs(&mi->rli,0 /* not only reset, but also reinit*/,
&errmsg))
{
- send_error(&thd->net, 0, "Failed purging old relay logs");
- unlock_slave_threads(mi);
+ net_printf(&thd->net, 0, "Failed purging old relay logs: %s",errmsg);
return 1;
}
- pthread_mutex_lock(&mi->rli.data_lock);
}
else
{
@@ -778,6 +780,7 @@ int change_master(THD* thd, MASTER_INFO* mi)
0 /*no data lock*/,
&msg))
{
+ //Sasha: note that I had to change net_printf() to make this work
net_printf(&thd->net,0,"Failed initializing relay log position: %s",msg);
unlock_slave_threads(mi);
return 1;
@@ -789,7 +792,10 @@ int change_master(THD* thd, MASTER_INFO* mi)
sizeof(mi->rli.master_log_name));
if (!mi->rli.master_log_name[0]) // uninitialized case
mi->rli.master_log_pos=0;
- pthread_cond_broadcast(&mi->rli.data_cond);
+
+ pthread_mutex_lock(&mi->rli.data_lock);
+ mi->rli.abort_pos_wait = 1;
+ pthread_cond_broadcast(&mi->data_cond);
pthread_mutex_unlock(&mi->rli.data_lock);
thd->proc_info = "starting slave";