diff options
author | unknown <sasha@mysql.sashanet.com> | 2002-01-19 19:16:52 -0700 |
---|---|---|
committer | unknown <sasha@mysql.sashanet.com> | 2002-01-19 19:16:52 -0700 |
commit | 5df61c3cdc4499197e420a76b25b942dce0f3ccc (patch) | |
tree | 87da2fd65f79c28f4b97c4619f95b07797107d82 /sql/sql_repl.cc | |
parent | 0831ce1c616296196eff82065da469156b4def82 (diff) | |
download | mariadb-git-5df61c3cdc4499197e420a76b25b942dce0f3ccc.tar.gz |
Here comes a nasty patch, although I am not ready to push it yet. I will
first pull, merge,test, and get it to work.
The main change is the new replication code - now we have two slave threads
SQL thread and I/O thread. I have also re-written a lot of the code to
prepare for multi-master implementation.
I also documented IO_CACHE quite extensively and to some extend, THD class.
Makefile.am:
moved tags target script into a separate file
include/my_sys.h:
fixes in IO_CACHE for SEQ_READ_APPEND + some documentation
libmysqld/lib_sql.cc:
updated replication locks, but now I see I did it wrong and it won't compile. Will fix
before the push.
mysql-test/r/rpl000014.result:
test result update
mysql-test/r/rpl000015.result:
test result update
mysql-test/r/rpl000016.result:
test result update
mysql-test/r/rpl_log.result:
test result update
mysql-test/t/rpl000016-slave.sh:
remove relay logs
mysql-test/t/rpl000017-slave.sh:
remove relay logs
mysql-test/t/rpl_log.test:
updated test
mysys/mf_iocache.c:
IO_CACHE updates to make replication work
mysys/mf_iocache2.c:
IO_CACHE update to make replication work
mysys/thr_mutex.c:
cosmetic change
sql/item_func.cc:
new replication code
sql/lex.h:
new replication
sql/log.cc:
new replication
sql/log_event.cc:
new replication
sql/log_event.h:
new replication
sql/mini_client.cc:
new replication
sql/mini_client.h:
new replication
sql/mysql_priv.h:
new replication
sql/mysqld.cc:
new replication
sql/repl_failsafe.cc:
new replication
sql/slave.cc:
new replication
sql/slave.h:
new replication
sql/sql_class.cc:
new replication
sql/sql_class.h:
new replication
sql/sql_lex.h:
new replication
sql/sql_parse.cc:
new replication
sql/sql_repl.cc:
new replication
sql/sql_repl.h:
new replication
sql/sql_show.cc:
new replication
sql/sql_yacc.yy:
new replication
sql/stacktrace.c:
more robust stack tracing
sql/structs.h:
new replication code
BitKeeper/etc/ignore:
Added mysql-test/r/rpl000002.eval mysql-test/r/rpl000014.eval mysql-test/r/rpl000015.eval mysql-test/r/rpl000016.eval mysql-test/r/slave-running.eval mysql-test/r/slave-stopped.eval to the ignore list
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r-- | sql/sql_repl.cc | 333 |
1 files changed, 156 insertions, 177 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 6c738ba36b4..146490c7b87 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -23,9 +23,9 @@ #include "mini_client.h" #include <thr_alarm.h> #include <my_dir.h> +#include <assert.h> extern const char* any_db; -extern pthread_handler_decl(handle_slave,arg); #ifndef DBUG_OFF int max_binlog_dump_events = 0; // unlimited @@ -33,6 +33,26 @@ bool opt_sporadic_binlog_dump_fail = 0; static int binlog_dump_count = 0; #endif +int check_binlog_magic(IO_CACHE* log, const char** errmsg) +{ + char magic[4]; + DBUG_ASSERT(my_b_tell(log) == 0); + + if (my_b_read(log, (byte*) magic, sizeof(magic))) + { + *errmsg = "I/O error reading the header from the binary log"; + sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno, + log->error); + return 1; + } + if (memcmp(magic, BINLOG_MAGIC, sizeof(magic))) + { + *errmsg = "Binlog has bad magic number; It's not a binary log file that can be used by this version of MySQL"; + return 1; + } + return 0; +} + static int fake_rotate_event(NET* net, String* packet, char* log_file_name, const char**errmsg) { @@ -46,7 +66,10 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, int4store(header + SERVER_ID_OFFSET, server_id); int4store(header + EVENT_LEN_OFFSET, event_len); int2store(header + FLAGS_OFFSET, 0); - int4store(header + LOG_SEQ_OFFSET, 0); + + // TODO: check what problems this may cause and fix them + int4store(header + LOG_POS_OFFSET, 0); + packet->append(header, sizeof(header)); /* We need to split the next statement because of problem with cxx */ int4store(buf,4); // tell slave to skip magic number @@ -133,7 +156,6 @@ File open_binlog(IO_CACHE *log, const char *log_file_name, const char **errmsg) { File file; - char magic[4]; if ((file = my_open(log_file_name, O_RDONLY | O_BINARY, MYF(MY_WME))) < 0 || init_io_cache(log, file, IO_SIZE*2, READ_CACHE, 0, 0, @@ -142,19 +164,8 @@ File open_binlog(IO_CACHE *log, const char *log_file_name, *errmsg = "Could not open log file"; // This will not be sent goto err; } - - if (my_b_read(log, (byte*) magic, sizeof(magic))) - { - *errmsg = "I/O error reading the header from the binary log"; - sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno, - log->error); + if (check_binlog_magic(log,errmsg)) goto err; - } - if (memcmp(magic, BINLOG_MAGIC, sizeof(magic))) - { - *errmsg = "Binlog has bad magic number; It's not a binary log file that can be used by this version of MySQL"; - goto err; - } return file; err: @@ -366,7 +377,8 @@ impossible position"; packet->length(0); packet->append("\0",1); } - + // TODO: now that we are logging the offset, check to make sure + // the recorded offset and the actual match if (error != LOG_READ_EOF) { switch(error) { @@ -410,13 +422,6 @@ impossible position"; // to signal us { log.error=0; - - // tell the kill thread how to wake us up - thd->mysys_var->current_mutex = log_lock; - thd->mysys_var->current_cond = &COND_binlog_update; - const char* proc_info = thd->proc_info; - thd->proc_info = "Slave connection: waiting for binlog update"; - bool read_packet = 0, fatal_error = 0; #ifndef DBUG_OFF @@ -431,32 +436,30 @@ impossible position"; // no one will update the log while we are reading // now, but we'll be quick and just read one record pthread_mutex_lock(log_lock); - switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*) 0)) + switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) { case 0: + pthread_mutex_unlock(log_lock); read_packet = 1; // we read successfully, so we'll need to send it to the // slave break; case LOG_READ_EOF: - DBUG_PRINT("wait",("waiting for data on binary log")); + DBUG_PRINT("wait",("waiting for data in binary log")); + // wait_for_update unlocks the log lock - needed to avoid race if (!thd->killed) - pthread_cond_wait(&COND_binlog_update, log_lock); + mysql_bin_log.wait_for_update(thd); + else + pthread_mutex_unlock(log_lock); DBUG_PRINT("wait",("binary log received update")); break; default: + pthread_mutex_unlock(log_lock); fatal_error = 1; break; } - pthread_mutex_unlock(log_lock); - - pthread_mutex_lock(&thd->mysys_var->mutex); - thd->mysys_var->current_mutex= 0; - thd->mysys_var->current_cond= 0; - thd->proc_info= proc_info; - pthread_mutex_unlock(&thd->mysys_var->mutex); - + if (read_packet) { thd->proc_info = "sending update to slave"; @@ -548,39 +551,37 @@ impossible position"; DBUG_VOID_RETURN; } -int start_slave(THD* thd , bool net_report) +int start_slave(THD* thd , MASTER_INFO* mi, bool net_report) { int slave_errno = 0; if (!thd) thd = current_thd; NET* net = &thd->net; - + int thread_mask; + if (check_access(thd, PROCESS_ACL, any_db)) return 1; - pthread_mutex_lock(&LOCK_slave); - if (!slave_running) + lock_slave_threads(mi); // this allows us to cleanly read slave_running + init_thread_mask(&thread_mask,mi,1 /* inverse */); + if (thread_mask) { - if (init_master_info(&glob_mi)) - slave_errno = ER_MASTER_INFO; - else if (server_id_supplied && *glob_mi.host) - { - pthread_t hThread; - if (pthread_create(&hThread, &connection_attrib, handle_slave, 0)) - { - slave_errno = ER_SLAVE_THREAD; - } - while (!slave_running) // slave might already be running by now - pthread_cond_wait(&COND_slave_start, &LOCK_slave); - } + if (server_id_supplied && (!mi->inited || (mi->inited && *mi->host))) + slave_errno = start_slave_threads(0 /*no mutex */, + 1 /* wait for start */, + mi, + master_info_file,relay_log_info_file, + thread_mask); else slave_errno = ER_BAD_SLAVE; } else slave_errno = ER_SLAVE_MUST_STOP; - - pthread_mutex_unlock(&LOCK_slave); + + unlock_slave_threads(mi); + if (slave_errno) { - if (net_report) send_error(net, slave_errno); + if (net_report) + send_error(net, slave_errno); return 1; } else if (net_report) @@ -589,8 +590,7 @@ int start_slave(THD* thd , bool net_report) return 0; } - -int stop_slave(THD* thd, bool net_report ) +int stop_slave(THD* thd, MASTER_INFO* mi, bool net_report ) { int slave_errno = 0; if (!thd) thd = current_thd; @@ -598,43 +598,14 @@ int stop_slave(THD* thd, bool net_report ) if (check_access(thd, PROCESS_ACL, any_db)) return 1; - - pthread_mutex_lock(&LOCK_slave); - if (slave_running) - { - abort_slave = 1; - KICK_SLAVE; - // do not abort the slave in the middle of a query, so we do not set - // thd->killed for the slave thread - thd->proc_info = "waiting for slave to die"; - while(slave_running) - { - /* there is a small chance that slave thread might miss the first - alarm. To protect againts it, resend the signal until it reacts - */ - - struct timespec abstime; -#ifdef HAVE_TIMESPEC_TS_SEC - abstime.ts_sec=time(NULL)+2; - abstime.ts_nsec=0; -#elif defined(__WIN__) - abstime.tv_sec=time((time_t*) 0)+2; - abstime.tv_nsec=0; -#else - struct timeval tv; - gettimeofday(&tv,0); - abstime.tv_sec=tv.tv_sec+2; - abstime.tv_nsec=tv.tv_usec*1000; -#endif - pthread_cond_timedwait(&COND_slave_stopped, &LOCK_slave, &abstime); - if (slave_running) - KICK_SLAVE; - } - } - else - slave_errno = ER_SLAVE_NOT_RUNNING; - - pthread_mutex_unlock(&LOCK_slave); + thd->proc_info = "Killing slave"; + int thread_mask; + lock_slave_threads(mi); + init_thread_mask(&thread_mask,mi,0 /* not inverse*/); + slave_errno = (thread_mask) ? + terminate_slave_threads(mi,thread_mask, + 1 /*skip lock */) : ER_SLAVE_NOT_RUNNING; + unlock_slave_threads(mi); thd->proc_info = 0; if (slave_errno) @@ -649,31 +620,43 @@ int stop_slave(THD* thd, bool net_report ) return 0; } - -void reset_slave() +int reset_slave(MASTER_INFO* mi) { MY_STAT stat_area; char fname[FN_REFLEN]; - bool slave_was_running ; - - pthread_mutex_lock(&LOCK_slave); - if ((slave_was_running = slave_running)) + int restart_thread_mask = 0,error=0; + const char* errmsg=0; + + lock_slave_threads(mi); + init_thread_mask(&restart_thread_mask,mi,0 /* not inverse */); + if ((error=terminate_slave_threads(mi,restart_thread_mask,1 /*skip lock*/)) + || (error=purge_relay_logs(&mi->rli,1 /*just reset*/,&errmsg))) + goto err; + + end_master_info(mi); + fn_format(fname, master_info_file, mysql_data_home, "", 4+32); + if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME))) { - pthread_mutex_unlock(&LOCK_slave); - stop_slave(0,0); + error=1; + goto err; } - else - pthread_mutex_unlock(&LOCK_slave); - - end_master_info(&glob_mi); - fn_format(fname, master_info_file, mysql_data_home, "", 4+32); + fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32); if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME))) - return; - if (slave_was_running) - start_slave(0,0); + { + error=1; + goto err; + } + if (restart_thread_mask) + error=start_slave_threads(0 /* mutex not needed*/, + 1 /* wait for start*/, + mi,master_info_file,relay_log_info_file, + restart_thread_mask); + // TODO: fix error messages so they get to the client +err: + unlock_slave_threads(mi); + return error; } - void kill_zombie_dump_threads(uint32 slave_server_id) { pthread_mutex_lock(&LOCK_thread_count); @@ -692,119 +675,114 @@ void kill_zombie_dump_threads(uint32 slave_server_id) make safe_mutex complain and abort. We just to do kill the thread ourselves. */ - - thr_alarm_kill(tmp->real_id); - tmp->killed = 1; - tmp->mysys_var->abort = 1; - pthread_mutex_lock(&tmp->mysys_var->mutex); - if (tmp->mysys_var->current_cond) - { - pthread_mutex_lock(tmp->mysys_var->current_mutex); - pthread_cond_broadcast(tmp->mysys_var->current_cond); - pthread_mutex_unlock(tmp->mysys_var->current_mutex); - } - pthread_mutex_unlock(&tmp->mysys_var->mutex); + tmp->awake(1/*prepare to die*/); } } pthread_mutex_unlock(&LOCK_thread_count); } -int change_master(THD* thd) +int change_master(THD* thd, MASTER_INFO* mi) { - bool slave_was_running; + int error=0,restart_thread_mask; + const char* errmsg=0; + // kill slave thread - pthread_mutex_lock(&LOCK_slave); - if ((slave_was_running = slave_running)) + lock_slave_threads(mi); + init_thread_mask(&restart_thread_mask,mi,0 /*not inverse*/); + if (restart_thread_mask && + (error=terminate_slave_threads(mi, + restart_thread_mask, + 1 /*skip lock*/))) { - abort_slave = 1; - KICK_SLAVE; - thd->proc_info = "waiting for slave to die"; - while (slave_running) - pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done + send_error(&thd->net,error); + unlock_slave_threads(mi); + return 1; } - pthread_mutex_unlock(&LOCK_slave); thd->proc_info = "changing master"; LEX_MASTER_INFO* lex_mi = &thd->lex.mi; - - if (init_master_info(&glob_mi)) + // TODO: see if needs re-write + if (init_master_info(mi,master_info_file,relay_log_info_file)) { send_error(&thd->net, 0, "Could not initialize master info"); + unlock_slave_threads(mi); return 1; } - pthread_mutex_lock(&glob_mi.lock); + pthread_mutex_lock(&mi->data_lock); if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos) { // if we change host or port, we must reset the postion - glob_mi.log_file_name[0] = 0; - glob_mi.pos = 4; // skip magic number - glob_mi.pending = 0; + mi->master_log_name[0] = 0; + mi->master_log_pos = 4; // skip magic number + mi->rli.pending = 0; } if (lex_mi->log_file_name) - strmake(glob_mi.log_file_name, lex_mi->log_file_name, - sizeof(glob_mi.log_file_name)); + strmake(mi->master_log_name, lex_mi->log_file_name, + sizeof(mi->master_log_name)); if (lex_mi->pos) { - glob_mi.pos = lex_mi->pos; - glob_mi.pending = 0; + mi->master_log_pos = lex_mi->pos; + mi->rli.pending = 0; } if (lex_mi->host) - strmake(glob_mi.host, lex_mi->host, sizeof(glob_mi.host)); + strmake(mi->host, lex_mi->host, sizeof(mi->host)); if (lex_mi->user) - strmake(glob_mi.user, lex_mi->user, sizeof(glob_mi.user)); + strmake(mi->user, lex_mi->user, sizeof(mi->user)); if (lex_mi->password) - strmake(glob_mi.password, lex_mi->password, sizeof(glob_mi.password)); + strmake(mi->password, lex_mi->password, sizeof(mi->password)); if (lex_mi->port) - glob_mi.port = lex_mi->port; + mi->port = lex_mi->port; if (lex_mi->connect_retry) - glob_mi.connect_retry = lex_mi->connect_retry; + mi->connect_retry = lex_mi->connect_retry; + + flush_master_info(mi); + pthread_mutex_unlock(&mi->data_lock); + thd->proc_info="purging old relay logs"; + if (purge_relay_logs(&mi->rli,0 /* not only reset, but also reinit*/, + &errmsg)) + { + send_error(&thd->net, 0, "Failed purging old relay logs"); + unlock_slave_threads(mi); + return 1; + } + pthread_mutex_lock(&mi->rli.data_lock); + mi->rli.master_log_pos = mi->master_log_pos; + strnmov(mi->rli.master_log_name,mi->master_log_name, + sizeof(mi->rli.master_log_name)); + if (!mi->rli.master_log_name[0]) // uninitialized case + mi->rli.master_log_pos=0; + pthread_cond_broadcast(&mi->rli.data_cond); + pthread_mutex_unlock(&mi->rli.data_lock); - flush_master_info(&glob_mi); - pthread_mutex_unlock(&glob_mi.lock); thd->proc_info = "starting slave"; - if (slave_was_running) - start_slave(0,0); + if (restart_thread_mask) + error=start_slave_threads(0 /* mutex not needed*/, + 1 /* wait for start*/, + mi,master_info_file,relay_log_info_file, + restart_thread_mask); +err: + unlock_slave_threads(mi); thd->proc_info = 0; - - send_ok(&thd->net); + if (error) + send_error(&thd->net,error); + else + send_ok(&thd->net); return 0; } - -void reset_master() +int reset_master(THD* thd) { if (!mysql_bin_log.is_open()) { my_error(ER_FLUSH_MASTER_BINLOG_CLOSED, MYF(ME_BELL+ME_WAITTANG)); - return; - } - - LOG_INFO linfo; - pthread_mutex_t* log_lock = mysql_bin_log.get_log_lock(); - pthread_mutex_lock(log_lock); - if (mysql_bin_log.find_first_log(&linfo, "")) - { - pthread_mutex_unlock(log_lock); - return; - } - - for(;;) - { - my_delete(linfo.log_file_name, MYF(MY_WME)); - if (mysql_bin_log.find_next_log(&linfo)) - break; + return 1; } - mysql_bin_log.close(1); // exiting close - my_delete(mysql_bin_log.get_index_fname(), MYF(MY_WME)); - mysql_bin_log.set_need_start_event(); - mysql_bin_log.open(opt_bin_logname,LOG_BIN); - pthread_mutex_unlock(log_lock); + return mysql_bin_log.reset_logs(thd); } - int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1, const char* log_file_name2, ulonglong log_pos2) { @@ -891,6 +869,7 @@ int show_binlog_events(THD* thd) if (event_count < limit_end && log.error) { errmsg = "Wrong offset or I/O error"; + pthread_mutex_unlock(mysql_bin_log.get_log_lock()); goto err; } |