summaryrefslogtreecommitdiff
path: root/sql/sql_repl.cc
diff options
context:
space:
mode:
authorunknown <sasha@mysql.sashanet.com>2002-01-19 19:16:52 -0700
committerunknown <sasha@mysql.sashanet.com>2002-01-19 19:16:52 -0700
commit5df61c3cdc4499197e420a76b25b942dce0f3ccc (patch)
tree87da2fd65f79c28f4b97c4619f95b07797107d82 /sql/sql_repl.cc
parent0831ce1c616296196eff82065da469156b4def82 (diff)
downloadmariadb-git-5df61c3cdc4499197e420a76b25b942dce0f3ccc.tar.gz
Here comes a nasty patch, although I am not ready to push it yet. I will
first pull, merge,test, and get it to work. The main change is the new replication code - now we have two slave threads SQL thread and I/O thread. I have also re-written a lot of the code to prepare for multi-master implementation. I also documented IO_CACHE quite extensively and to some extend, THD class. Makefile.am: moved tags target script into a separate file include/my_sys.h: fixes in IO_CACHE for SEQ_READ_APPEND + some documentation libmysqld/lib_sql.cc: updated replication locks, but now I see I did it wrong and it won't compile. Will fix before the push. mysql-test/r/rpl000014.result: test result update mysql-test/r/rpl000015.result: test result update mysql-test/r/rpl000016.result: test result update mysql-test/r/rpl_log.result: test result update mysql-test/t/rpl000016-slave.sh: remove relay logs mysql-test/t/rpl000017-slave.sh: remove relay logs mysql-test/t/rpl_log.test: updated test mysys/mf_iocache.c: IO_CACHE updates to make replication work mysys/mf_iocache2.c: IO_CACHE update to make replication work mysys/thr_mutex.c: cosmetic change sql/item_func.cc: new replication code sql/lex.h: new replication sql/log.cc: new replication sql/log_event.cc: new replication sql/log_event.h: new replication sql/mini_client.cc: new replication sql/mini_client.h: new replication sql/mysql_priv.h: new replication sql/mysqld.cc: new replication sql/repl_failsafe.cc: new replication sql/slave.cc: new replication sql/slave.h: new replication sql/sql_class.cc: new replication sql/sql_class.h: new replication sql/sql_lex.h: new replication sql/sql_parse.cc: new replication sql/sql_repl.cc: new replication sql/sql_repl.h: new replication sql/sql_show.cc: new replication sql/sql_yacc.yy: new replication sql/stacktrace.c: more robust stack tracing sql/structs.h: new replication code BitKeeper/etc/ignore: Added mysql-test/r/rpl000002.eval mysql-test/r/rpl000014.eval mysql-test/r/rpl000015.eval mysql-test/r/rpl000016.eval mysql-test/r/slave-running.eval mysql-test/r/slave-stopped.eval to the ignore list
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r--sql/sql_repl.cc333
1 files changed, 156 insertions, 177 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 6c738ba36b4..146490c7b87 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -23,9 +23,9 @@
#include "mini_client.h"
#include <thr_alarm.h>
#include <my_dir.h>
+#include <assert.h>
extern const char* any_db;
-extern pthread_handler_decl(handle_slave,arg);
#ifndef DBUG_OFF
int max_binlog_dump_events = 0; // unlimited
@@ -33,6 +33,26 @@ bool opt_sporadic_binlog_dump_fail = 0;
static int binlog_dump_count = 0;
#endif
+int check_binlog_magic(IO_CACHE* log, const char** errmsg)
+{
+ char magic[4];
+ DBUG_ASSERT(my_b_tell(log) == 0);
+
+ if (my_b_read(log, (byte*) magic, sizeof(magic)))
+ {
+ *errmsg = "I/O error reading the header from the binary log";
+ sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno,
+ log->error);
+ return 1;
+ }
+ if (memcmp(magic, BINLOG_MAGIC, sizeof(magic)))
+ {
+ *errmsg = "Binlog has bad magic number; It's not a binary log file that can be used by this version of MySQL";
+ return 1;
+ }
+ return 0;
+}
+
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
const char**errmsg)
{
@@ -46,7 +66,10 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
int4store(header + SERVER_ID_OFFSET, server_id);
int4store(header + EVENT_LEN_OFFSET, event_len);
int2store(header + FLAGS_OFFSET, 0);
- int4store(header + LOG_SEQ_OFFSET, 0);
+
+ // TODO: check what problems this may cause and fix them
+ int4store(header + LOG_POS_OFFSET, 0);
+
packet->append(header, sizeof(header));
/* We need to split the next statement because of problem with cxx */
int4store(buf,4); // tell slave to skip magic number
@@ -133,7 +156,6 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
const char **errmsg)
{
File file;
- char magic[4];
if ((file = my_open(log_file_name, O_RDONLY | O_BINARY, MYF(MY_WME))) < 0 ||
init_io_cache(log, file, IO_SIZE*2, READ_CACHE, 0, 0,
@@ -142,19 +164,8 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
*errmsg = "Could not open log file"; // This will not be sent
goto err;
}
-
- if (my_b_read(log, (byte*) magic, sizeof(magic)))
- {
- *errmsg = "I/O error reading the header from the binary log";
- sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno,
- log->error);
+ if (check_binlog_magic(log,errmsg))
goto err;
- }
- if (memcmp(magic, BINLOG_MAGIC, sizeof(magic)))
- {
- *errmsg = "Binlog has bad magic number; It's not a binary log file that can be used by this version of MySQL";
- goto err;
- }
return file;
err:
@@ -366,7 +377,8 @@ impossible position";
packet->length(0);
packet->append("\0",1);
}
-
+ // TODO: now that we are logging the offset, check to make sure
+ // the recorded offset and the actual match
if (error != LOG_READ_EOF)
{
switch(error) {
@@ -410,13 +422,6 @@ impossible position";
// to signal us
{
log.error=0;
-
- // tell the kill thread how to wake us up
- thd->mysys_var->current_mutex = log_lock;
- thd->mysys_var->current_cond = &COND_binlog_update;
- const char* proc_info = thd->proc_info;
- thd->proc_info = "Slave connection: waiting for binlog update";
-
bool read_packet = 0, fatal_error = 0;
#ifndef DBUG_OFF
@@ -431,32 +436,30 @@ impossible position";
// no one will update the log while we are reading
// now, but we'll be quick and just read one record
pthread_mutex_lock(log_lock);
- switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*) 0))
+ switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0))
{
case 0:
+ pthread_mutex_unlock(log_lock);
read_packet = 1;
// we read successfully, so we'll need to send it to the
// slave
break;
case LOG_READ_EOF:
- DBUG_PRINT("wait",("waiting for data on binary log"));
+ DBUG_PRINT("wait",("waiting for data in binary log"));
+ // wait_for_update unlocks the log lock - needed to avoid race
if (!thd->killed)
- pthread_cond_wait(&COND_binlog_update, log_lock);
+ mysql_bin_log.wait_for_update(thd);
+ else
+ pthread_mutex_unlock(log_lock);
DBUG_PRINT("wait",("binary log received update"));
break;
default:
+ pthread_mutex_unlock(log_lock);
fatal_error = 1;
break;
}
- pthread_mutex_unlock(log_lock);
-
- pthread_mutex_lock(&thd->mysys_var->mutex);
- thd->mysys_var->current_mutex= 0;
- thd->mysys_var->current_cond= 0;
- thd->proc_info= proc_info;
- pthread_mutex_unlock(&thd->mysys_var->mutex);
-
+
if (read_packet)
{
thd->proc_info = "sending update to slave";
@@ -548,39 +551,37 @@ impossible position";
DBUG_VOID_RETURN;
}
-int start_slave(THD* thd , bool net_report)
+int start_slave(THD* thd , MASTER_INFO* mi, bool net_report)
{
int slave_errno = 0;
if (!thd) thd = current_thd;
NET* net = &thd->net;
-
+ int thread_mask;
+
if (check_access(thd, PROCESS_ACL, any_db))
return 1;
- pthread_mutex_lock(&LOCK_slave);
- if (!slave_running)
+ lock_slave_threads(mi); // this allows us to cleanly read slave_running
+ init_thread_mask(&thread_mask,mi,1 /* inverse */);
+ if (thread_mask)
{
- if (init_master_info(&glob_mi))
- slave_errno = ER_MASTER_INFO;
- else if (server_id_supplied && *glob_mi.host)
- {
- pthread_t hThread;
- if (pthread_create(&hThread, &connection_attrib, handle_slave, 0))
- {
- slave_errno = ER_SLAVE_THREAD;
- }
- while (!slave_running) // slave might already be running by now
- pthread_cond_wait(&COND_slave_start, &LOCK_slave);
- }
+ if (server_id_supplied && (!mi->inited || (mi->inited && *mi->host)))
+ slave_errno = start_slave_threads(0 /*no mutex */,
+ 1 /* wait for start */,
+ mi,
+ master_info_file,relay_log_info_file,
+ thread_mask);
else
slave_errno = ER_BAD_SLAVE;
}
else
slave_errno = ER_SLAVE_MUST_STOP;
-
- pthread_mutex_unlock(&LOCK_slave);
+
+ unlock_slave_threads(mi);
+
if (slave_errno)
{
- if (net_report) send_error(net, slave_errno);
+ if (net_report)
+ send_error(net, slave_errno);
return 1;
}
else if (net_report)
@@ -589,8 +590,7 @@ int start_slave(THD* thd , bool net_report)
return 0;
}
-
-int stop_slave(THD* thd, bool net_report )
+int stop_slave(THD* thd, MASTER_INFO* mi, bool net_report )
{
int slave_errno = 0;
if (!thd) thd = current_thd;
@@ -598,43 +598,14 @@ int stop_slave(THD* thd, bool net_report )
if (check_access(thd, PROCESS_ACL, any_db))
return 1;
-
- pthread_mutex_lock(&LOCK_slave);
- if (slave_running)
- {
- abort_slave = 1;
- KICK_SLAVE;
- // do not abort the slave in the middle of a query, so we do not set
- // thd->killed for the slave thread
- thd->proc_info = "waiting for slave to die";
- while(slave_running)
- {
- /* there is a small chance that slave thread might miss the first
- alarm. To protect againts it, resend the signal until it reacts
- */
-
- struct timespec abstime;
-#ifdef HAVE_TIMESPEC_TS_SEC
- abstime.ts_sec=time(NULL)+2;
- abstime.ts_nsec=0;
-#elif defined(__WIN__)
- abstime.tv_sec=time((time_t*) 0)+2;
- abstime.tv_nsec=0;
-#else
- struct timeval tv;
- gettimeofday(&tv,0);
- abstime.tv_sec=tv.tv_sec+2;
- abstime.tv_nsec=tv.tv_usec*1000;
-#endif
- pthread_cond_timedwait(&COND_slave_stopped, &LOCK_slave, &abstime);
- if (slave_running)
- KICK_SLAVE;
- }
- }
- else
- slave_errno = ER_SLAVE_NOT_RUNNING;
-
- pthread_mutex_unlock(&LOCK_slave);
+ thd->proc_info = "Killing slave";
+ int thread_mask;
+ lock_slave_threads(mi);
+ init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
+ slave_errno = (thread_mask) ?
+ terminate_slave_threads(mi,thread_mask,
+ 1 /*skip lock */) : ER_SLAVE_NOT_RUNNING;
+ unlock_slave_threads(mi);
thd->proc_info = 0;
if (slave_errno)
@@ -649,31 +620,43 @@ int stop_slave(THD* thd, bool net_report )
return 0;
}
-
-void reset_slave()
+int reset_slave(MASTER_INFO* mi)
{
MY_STAT stat_area;
char fname[FN_REFLEN];
- bool slave_was_running ;
-
- pthread_mutex_lock(&LOCK_slave);
- if ((slave_was_running = slave_running))
+ int restart_thread_mask = 0,error=0;
+ const char* errmsg=0;
+
+ lock_slave_threads(mi);
+ init_thread_mask(&restart_thread_mask,mi,0 /* not inverse */);
+ if ((error=terminate_slave_threads(mi,restart_thread_mask,1 /*skip lock*/))
+ || (error=purge_relay_logs(&mi->rli,1 /*just reset*/,&errmsg)))
+ goto err;
+
+ end_master_info(mi);
+ fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
+ if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
{
- pthread_mutex_unlock(&LOCK_slave);
- stop_slave(0,0);
+ error=1;
+ goto err;
}
- else
- pthread_mutex_unlock(&LOCK_slave);
-
- end_master_info(&glob_mi);
- fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
+ fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
- return;
- if (slave_was_running)
- start_slave(0,0);
+ {
+ error=1;
+ goto err;
+ }
+ if (restart_thread_mask)
+ error=start_slave_threads(0 /* mutex not needed*/,
+ 1 /* wait for start*/,
+ mi,master_info_file,relay_log_info_file,
+ restart_thread_mask);
+ // TODO: fix error messages so they get to the client
+err:
+ unlock_slave_threads(mi);
+ return error;
}
-
void kill_zombie_dump_threads(uint32 slave_server_id)
{
pthread_mutex_lock(&LOCK_thread_count);
@@ -692,119 +675,114 @@ void kill_zombie_dump_threads(uint32 slave_server_id)
make safe_mutex complain and abort.
We just to do kill the thread ourselves.
*/
-
- thr_alarm_kill(tmp->real_id);
- tmp->killed = 1;
- tmp->mysys_var->abort = 1;
- pthread_mutex_lock(&tmp->mysys_var->mutex);
- if (tmp->mysys_var->current_cond)
- {
- pthread_mutex_lock(tmp->mysys_var->current_mutex);
- pthread_cond_broadcast(tmp->mysys_var->current_cond);
- pthread_mutex_unlock(tmp->mysys_var->current_mutex);
- }
- pthread_mutex_unlock(&tmp->mysys_var->mutex);
+ tmp->awake(1/*prepare to die*/);
}
}
pthread_mutex_unlock(&LOCK_thread_count);
}
-int change_master(THD* thd)
+int change_master(THD* thd, MASTER_INFO* mi)
{
- bool slave_was_running;
+ int error=0,restart_thread_mask;
+ const char* errmsg=0;
+
// kill slave thread
- pthread_mutex_lock(&LOCK_slave);
- if ((slave_was_running = slave_running))
+ lock_slave_threads(mi);
+ init_thread_mask(&restart_thread_mask,mi,0 /*not inverse*/);
+ if (restart_thread_mask &&
+ (error=terminate_slave_threads(mi,
+ restart_thread_mask,
+ 1 /*skip lock*/)))
{
- abort_slave = 1;
- KICK_SLAVE;
- thd->proc_info = "waiting for slave to die";
- while (slave_running)
- pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done
+ send_error(&thd->net,error);
+ unlock_slave_threads(mi);
+ return 1;
}
- pthread_mutex_unlock(&LOCK_slave);
thd->proc_info = "changing master";
LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
-
- if (init_master_info(&glob_mi))
+ // TODO: see if needs re-write
+ if (init_master_info(mi,master_info_file,relay_log_info_file))
{
send_error(&thd->net, 0, "Could not initialize master info");
+ unlock_slave_threads(mi);
return 1;
}
- pthread_mutex_lock(&glob_mi.lock);
+ pthread_mutex_lock(&mi->data_lock);
if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
{
// if we change host or port, we must reset the postion
- glob_mi.log_file_name[0] = 0;
- glob_mi.pos = 4; // skip magic number
- glob_mi.pending = 0;
+ mi->master_log_name[0] = 0;
+ mi->master_log_pos = 4; // skip magic number
+ mi->rli.pending = 0;
}
if (lex_mi->log_file_name)
- strmake(glob_mi.log_file_name, lex_mi->log_file_name,
- sizeof(glob_mi.log_file_name));
+ strmake(mi->master_log_name, lex_mi->log_file_name,
+ sizeof(mi->master_log_name));
if (lex_mi->pos)
{
- glob_mi.pos = lex_mi->pos;
- glob_mi.pending = 0;
+ mi->master_log_pos = lex_mi->pos;
+ mi->rli.pending = 0;
}
if (lex_mi->host)
- strmake(glob_mi.host, lex_mi->host, sizeof(glob_mi.host));
+ strmake(mi->host, lex_mi->host, sizeof(mi->host));
if (lex_mi->user)
- strmake(glob_mi.user, lex_mi->user, sizeof(glob_mi.user));
+ strmake(mi->user, lex_mi->user, sizeof(mi->user));
if (lex_mi->password)
- strmake(glob_mi.password, lex_mi->password, sizeof(glob_mi.password));
+ strmake(mi->password, lex_mi->password, sizeof(mi->password));
if (lex_mi->port)
- glob_mi.port = lex_mi->port;
+ mi->port = lex_mi->port;
if (lex_mi->connect_retry)
- glob_mi.connect_retry = lex_mi->connect_retry;
+ mi->connect_retry = lex_mi->connect_retry;
+
+ flush_master_info(mi);
+ pthread_mutex_unlock(&mi->data_lock);
+ thd->proc_info="purging old relay logs";
+ if (purge_relay_logs(&mi->rli,0 /* not only reset, but also reinit*/,
+ &errmsg))
+ {
+ send_error(&thd->net, 0, "Failed purging old relay logs");
+ unlock_slave_threads(mi);
+ return 1;
+ }
+ pthread_mutex_lock(&mi->rli.data_lock);
+ mi->rli.master_log_pos = mi->master_log_pos;
+ strnmov(mi->rli.master_log_name,mi->master_log_name,
+ sizeof(mi->rli.master_log_name));
+ if (!mi->rli.master_log_name[0]) // uninitialized case
+ mi->rli.master_log_pos=0;
+ pthread_cond_broadcast(&mi->rli.data_cond);
+ pthread_mutex_unlock(&mi->rli.data_lock);
- flush_master_info(&glob_mi);
- pthread_mutex_unlock(&glob_mi.lock);
thd->proc_info = "starting slave";
- if (slave_was_running)
- start_slave(0,0);
+ if (restart_thread_mask)
+ error=start_slave_threads(0 /* mutex not needed*/,
+ 1 /* wait for start*/,
+ mi,master_info_file,relay_log_info_file,
+ restart_thread_mask);
+err:
+ unlock_slave_threads(mi);
thd->proc_info = 0;
-
- send_ok(&thd->net);
+ if (error)
+ send_error(&thd->net,error);
+ else
+ send_ok(&thd->net);
return 0;
}
-
-void reset_master()
+int reset_master(THD* thd)
{
if (!mysql_bin_log.is_open())
{
my_error(ER_FLUSH_MASTER_BINLOG_CLOSED, MYF(ME_BELL+ME_WAITTANG));
- return;
- }
-
- LOG_INFO linfo;
- pthread_mutex_t* log_lock = mysql_bin_log.get_log_lock();
- pthread_mutex_lock(log_lock);
- if (mysql_bin_log.find_first_log(&linfo, ""))
- {
- pthread_mutex_unlock(log_lock);
- return;
- }
-
- for(;;)
- {
- my_delete(linfo.log_file_name, MYF(MY_WME));
- if (mysql_bin_log.find_next_log(&linfo))
- break;
+ return 1;
}
- mysql_bin_log.close(1); // exiting close
- my_delete(mysql_bin_log.get_index_fname(), MYF(MY_WME));
- mysql_bin_log.set_need_start_event();
- mysql_bin_log.open(opt_bin_logname,LOG_BIN);
- pthread_mutex_unlock(log_lock);
+ return mysql_bin_log.reset_logs(thd);
}
-
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
const char* log_file_name2, ulonglong log_pos2)
{
@@ -891,6 +869,7 @@ int show_binlog_events(THD* thd)
if (event_count < limit_end && log.error)
{
errmsg = "Wrong offset or I/O error";
+ pthread_mutex_unlock(mysql_bin_log.get_log_lock());
goto err;
}