diff options
-rw-r--r-- | client/mysqlbinlog.cc | 81 | ||||
-rw-r--r-- | libmysql/Makefile.shared | 3 | ||||
-rw-r--r-- | mysql-test/mysql-test-run.sh | 7 | ||||
-rw-r--r-- | mysql-test/r/rpl000002.result | 2 | ||||
-rw-r--r-- | mysql-test/r/rpl000016.result | 8 | ||||
-rw-r--r-- | mysql-test/r/rpl_log.result | 28 | ||||
-rw-r--r-- | mysql-test/t/rpl000016.test | 3 | ||||
-rw-r--r-- | mysys/mf_iocache.c | 8 | ||||
-rw-r--r-- | sql/log.cc | 24 | ||||
-rw-r--r-- | sql/log_event.cc | 97 | ||||
-rw-r--r-- | sql/log_event.h | 24 | ||||
-rw-r--r-- | sql/repl_failsafe.cc | 611 | ||||
-rw-r--r-- | sql/repl_failsafe.h | 16 | ||||
-rw-r--r-- | sql/slave.cc | 72 | ||||
-rw-r--r-- | sql/slave.h | 4 | ||||
-rw-r--r-- | sql/sql_class.h | 7 | ||||
-rw-r--r-- | sql/sql_parse.cc | 1 | ||||
-rw-r--r-- | sql/sql_repl.cc | 628 | ||||
-rw-r--r-- | sql/sql_repl.h | 17 |
19 files changed, 919 insertions, 722 deletions
diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc index 8cb1b82753e..c0e87b95a92 100644 --- a/client/mysqlbinlog.cc +++ b/client/mysqlbinlog.cc @@ -21,6 +21,8 @@ #include <time.h> #include "log_event.h" +#define PROBE_HEADER_LEN (4+EVENT_LEN_OFFSET) + #define CLIENT_CAPABILITIES (CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG | CLIENT_LOCAL_FILES) char server_version[SERVER_VERSION_LENGTH]; @@ -288,6 +290,52 @@ static void dump_remote_table(NET* net, const char* db, const char* table) } } +static int check_master_version(MYSQL* mysql) +{ + MYSQL_RES* res = 0; + MYSQL_ROW row; + const char* version; + int old_format = 0; + + if (mysql_query(mysql, "SELECT VERSION()") + || !(res = mysql_store_result(mysql))) + { + mysql_close(mysql); + die("Error checking master version: %s", + mysql_error(mysql)); + } + if (!(row = mysql_fetch_row(res))) + { + mysql_free_result(res); + mysql_close(mysql); + die("Master returned no rows for SELECT VERSION()"); + return 1; + } + if (!(version = row[0])) + { + mysql_free_result(res); + mysql_close(mysql); + die("Master reported NULL for the version"); + } + + switch (*version) + { + case '3': + old_format = 1; + break; + case '4': + old_format = 0; + break; + default: + sql_print_error("Master reported unrecognized MySQL version '%s'", + version); + mysql_free_result(res); + mysql_close(mysql); + return 1; + } + mysql_free_result(res); + return old_format; +} static void dump_remote_log_entries(const char* logname) { @@ -295,6 +343,9 @@ static void dump_remote_log_entries(const char* logname) char last_db[FN_REFLEN+1] = ""; uint len; NET* net = &mysql->net; + int old_format; + old_format = check_master_version(mysql); + if(!position) position = 4; // protect the innocent from spam if (position < 4) { @@ -307,7 +358,7 @@ static void dump_remote_log_entries(const char* logname) len = (uint) strlen(logname); int4store(buf + 6, 0); memcpy(buf + 10, logname,len); - if(simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1)) + if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1)) die("Error sending the log dump command"); for(;;) @@ -322,7 +373,7 @@ static void dump_remote_log_entries(const char* logname) len, net->read_pos[5])); Log_event * ev = Log_event::read_log_event( (const char*) net->read_pos + 1 , - len - 1, &error); + len - 1, &error, old_format); if (ev) { ev->print(result_file, short_form, last_db); @@ -335,12 +386,34 @@ static void dump_remote_log_entries(const char* logname) } } +static int check_header (IO_CACHE* file) +{ + char buf[PROBE_HEADER_LEN]; + int old_format; + + my_off_t pos = my_b_tell(file); + my_b_seek(file, (my_off_t)0); + if (my_b_read(file, buf, sizeof(buf))) + die("Failed reading header"); + if (buf[EVENT_TYPE_OFFSET+4] == START_EVENT) + { + uint event_len; + event_len = uint4korr(buf + EVENT_LEN_OFFSET + 4); + old_format = (event_len < LOG_EVENT_HEADER_LEN + START_HEADER_LEN); + } + else + old_format = 0; + my_b_seek(file, pos); + return old_format; +} + static void dump_local_log_entries(const char* logname) { File fd = -1; IO_CACHE cache,*file= &cache; ulonglong rec_count = 0; char last_db[FN_REFLEN+1] = ""; + bool old_format = 0; if (logname && logname[0] != '-') { @@ -349,12 +422,14 @@ static void dump_local_log_entries(const char* logname) if (init_io_cache(file, fd, 0, READ_CACHE, (my_off_t) position, 0, MYF(MY_WME | MY_NABP))) exit(1); + old_format = check_header(file); } else { if (init_io_cache(file, fileno(result_file), 0, READ_CACHE, (my_off_t) 0, 0, MYF(MY_WME | MY_NABP | MY_DONT_CHECK_FILESIZE))) exit(1); + old_format = check_header(file); if (position) { /* skip 'position' characters from stdout */ @@ -385,7 +460,7 @@ static void dump_local_log_entries(const char* logname) char llbuff[21]; my_off_t old_off = my_b_tell(file); - Log_event* ev = Log_event::read_log_event(file); + Log_event* ev = Log_event::read_log_event(file, old_format); if (!ev) { if (file->error) diff --git a/libmysql/Makefile.shared b/libmysql/Makefile.shared index e00a961cabe..d661d69a339 100644 --- a/libmysql/Makefile.shared +++ b/libmysql/Makefile.shared @@ -55,7 +55,8 @@ mysysobjects1 = my_init.lo my_static.lo my_malloc.lo my_realloc.lo \ mf_loadpath.lo my_pthread.lo my_thr_init.lo \ thr_mutex.lo mulalloc.lo string.lo default.lo \ my_compress.lo array.lo my_once.lo list.lo my_net.lo \ - charset.lo hash.lo mf_iocache.lo my_seek.lo \ + charset.lo hash.lo mf_iocache.lo \ + mf_iocache2.lo my_seek.lo \ my_pread.lo mf_cache.lo my_vsnprintf.lo md5.lo # Not needed in the minimum library diff --git a/mysql-test/mysql-test-run.sh b/mysql-test/mysql-test-run.sh index af595e80244..ed7414effd7 100644 --- a/mysql-test/mysql-test-run.sh +++ b/mysql-test/mysql-test-run.sh @@ -168,6 +168,9 @@ while test $# -gt 0; do USE_MANAGER=1 USE_RUNNING_SERVER= ;; + --start-and-exit) + START_AND_EXIT=1 + ;; --skip-innobase) EXTRA_MASTER_MYSQLD_OPT="$EXTRA_MASTER_MYSQLD_OPT --skip-innobase" EXTRA_SLAVE_MYSQLD_OPT="$EXTRA_SLAVE_MYSQLD_OPT --skip-innobase" ;; @@ -1091,6 +1094,10 @@ then mysql_loadstd fi +if [ "x$START_AND_EXIT" = "x1" ] ; then + echo "Servers started, exiting" + exit +fi $ECHO "Starting Tests" diff --git a/mysql-test/r/rpl000002.result b/mysql-test/r/rpl000002.result index e321d82750c..88228321897 100644 --- a/mysql-test/r/rpl000002.result +++ b/mysql-test/r/rpl000002.result @@ -16,7 +16,7 @@ n 2002 show slave hosts; Server_id Host Port Rpl_recovery_rank Master_id -2 127.0.0.1 9307 2 1 +2 127.0.0.1 $SLAVE_MYPORT 2 1 drop table t1; slave stop; drop table if exists t2; diff --git a/mysql-test/r/rpl000016.result b/mysql-test/r/rpl000016.result index 3fb94c5b189..e15d22ea3ee 100644 --- a/mysql-test/r/rpl000016.result +++ b/mysql-test/r/rpl000016.result @@ -15,7 +15,7 @@ create table t1 (s text); insert into t1 values('Could not break slave'),('Tried hard'); show slave status; Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq -127.0.0.1 root 9999 60 master-bin.001 234 Yes 0 0 3 +127.0.0.1 root $MASTER_MYPORT 60 master-bin.001 234 Yes 0 0 3 select * from t1; s Could not break slave @@ -42,7 +42,7 @@ master-bin.003 insert into t2 values (65); show slave status; Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq -127.0.0.1 root 9999 60 master-bin.003 202 Yes 0 0 3 +127.0.0.1 root $MASTER_MYPORT 60 master-bin.003 127 Yes 0 0 2 select * from t2; m 34 @@ -60,12 +60,12 @@ master-bin.005 master-bin.006 show master status; File Position Binlog_do_db Binlog_ignore_db -master-bin.006 710 +master-bin.006 382 slave stop; slave start; show slave status; Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq -127.0.0.1 root 9999 60 master-bin.006 710 Yes 0 0 11 +127.0.0.1 root $MASTER_MYPORT 60 master-bin.006 382 Yes 0 0 6 lock tables t3 read; select count(*) from t3 where n >= 4; count(*) diff --git a/mysql-test/r/rpl_log.result b/mysql-test/r/rpl_log.result index 37b2474f9e6..43108b880b4 100644 --- a/mysql-test/r/rpl_log.result +++ b/mysql-test/r/rpl_log.result @@ -16,7 +16,7 @@ load data infile '../../std_data/words.dat' into table t1; drop table t1; show binlog events; Log_name Pos Event_type Server_id Log_seq Info -master-bin.001 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2 +master-bin.001 4 Start 1 1 Server ver: 4.0.1-alpha-debug-log, Binlog ver: 2 master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key) master-bin.001 172 Intvar 1 3 INSERT_ID=1 master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL) @@ -41,7 +41,7 @@ insert into t1 values (1); drop table t1; show binlog events; Log_name Pos Event_type Server_id Log_seq Info -master-bin.001 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2 +master-bin.001 4 Start 1 1 Server ver: 4.0.1-alpha-debug-log, Binlog ver: 2 master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key) master-bin.001 172 Intvar 1 3 INSERT_ID=1 master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL) @@ -54,10 +54,9 @@ master-bin.001 627 Rotate 1 10 master-bin.002;pos=4 master-bin.001 668 Stop 1 11 show binlog events in 'master-bin.002'; Log_name Pos Event_type Server_id Log_seq Info -master-bin.002 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2 -master-bin.002 79 Query 1 2 use test; create table t1 (n int) -master-bin.002 137 Query 1 3 use test; insert into t1 values (1) -master-bin.002 197 Query 1 4 use test; drop table t1 +master-bin.002 4 Query 1 1 use test; create table t1 (n int) +master-bin.002 62 Query 1 2 use test; insert into t1 values (1) +master-bin.002 122 Query 1 3 use test; drop table t1 show master logs; Log_name master-bin.001 @@ -69,7 +68,7 @@ slave-bin.001 slave-bin.002 show binlog events in 'slave-bin.001' from 4; Log_name Pos Event_type Server_id Log_seq Info -slave-bin.001 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2 +slave-bin.001 4 Start 2 1 Server ver: 4.0.1-alpha-debug-log, Binlog ver: 2 slave-bin.001 79 Slave 2 3 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.001,pos=4 slave-bin.001 132 Query 1 2 use test; create table t1(n int not null auto_increment primary key) slave-bin.001 225 Intvar 1 3 INSERT_ID=1 @@ -83,14 +82,13 @@ slave-bin.001 689 Rotate 1 4 slave-bin.002;pos=4; forced by master slave-bin.001 729 Stop 2 5 show binlog events in 'slave-bin.002' from 4; Log_name Pos Event_type Server_id Log_seq Info -slave-bin.002 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2 -slave-bin.002 79 Slave 2 10 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4 -slave-bin.002 132 Query 1 2 use test; create table t1 (n int) -slave-bin.002 190 Query 1 3 use test; insert into t1 values (1) -slave-bin.002 250 Query 1 4 use test; drop table t1 +slave-bin.002 4 Slave 2 10 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4 +slave-bin.002 57 Query 1 1 use test; create table t1 (n int) +slave-bin.002 115 Query 1 2 use test; insert into t1 values (1) +slave-bin.002 175 Query 1 3 use test; drop table t1 show slave status; Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq -127.0.0.1 root $MASTER_MYPORT 1 master-bin.002 245 Yes 0 0 4 +127.0.0.1 root $MASTER_MYPORT 1 master-bin.002 170 Yes 0 0 3 show new master for slave with master_log_file='master-bin.001' and master_log_pos=4 and master_log_seq=1 and master_server_id=1; Log_name Log_pos @@ -106,8 +104,8 @@ slave-bin.001 439 show new master for slave with master_log_file='master-bin.002' and master_log_pos=4 and master_log_seq=1 and master_server_id=1; Log_name Log_pos -slave-bin.002 132 +slave-bin.002 57 show new master for slave with master_log_file='master-bin.002' and master_log_pos=137 and master_log_seq=3 and master_server_id=1; Log_name Log_pos -slave-bin.002 250 +slave-bin.002 223 diff --git a/mysql-test/t/rpl000016.test b/mysql-test/t/rpl000016.test index 714c51f99a6..76ce580da1f 100644 --- a/mysql-test/t/rpl000016.test +++ b/mysql-test/t/rpl000016.test @@ -23,7 +23,6 @@ insert into t1 values('Could not break slave'),('Tried hard'); save_master_pos; connection slave; sync_with_master; ---replace_result 9306 9999 3334 9999 3335 9999 show slave status; select * from t1; connection master; @@ -70,7 +69,6 @@ insert into t2 values (65); save_master_pos; connection slave; sync_with_master; ---replace_result 9306 9999 3334 9999 3335 9999 show slave status; select * from t2; connection master; @@ -92,7 +90,6 @@ connection slave; slave stop; slave start; sync_with_master; ---replace_result 9306 9999 3334 9999 3335 9999 show slave status; # because of concurrent insert, the table may not be up to date # if we do not lock diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index f9b668ce09f..f29df74b651 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -166,7 +166,8 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, info->seek_not_done= test(file >= 0 && type != READ_FIFO && type != READ_NET); info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP); - info->rc_request_pos=info->rc_pos=info->buffer; + info->rc_request_pos=info->rc_pos= info->write_pos = info->buffer; + info->write_pos = info->write_end = 0; if (type == SEQ_READ_APPEND) { info->append_read_pos = info->write_pos = info->append_buffer; @@ -308,6 +309,11 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, { info->append_read_pos = info->write_pos = info->append_buffer; } + if (!info->write_pos) + info->write_pos = info->buffer; + if (!info->write_end) + info->write_end = info->buffer+info->buffer_length- + (seek_offset & (IO_SIZE-1)); info->type=type; info->error=0; init_read_function(info,type); diff --git a/sql/log.cc b/sql/log.cc index b55d514058f..916e7d20d32 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -81,7 +81,8 @@ static int find_uniq_filename(char *name) MYSQL_LOG::MYSQL_LOG(): last_time(0), query_start(0),index_file(-1), name(0), log_type(LOG_CLOSED),write_error(0), - inited(0), log_seq(1), file_id(1),no_rotate(0) + inited(0), log_seq(1), file_id(1),no_rotate(0), + need_start_event(1) { /* We don't want to intialize LOCK_Log here as the thread system may @@ -136,9 +137,11 @@ bool MYSQL_LOG::open_index( int options) MYF(MY_WME))) < 0); } -void MYSQL_LOG::init(enum_log_type log_type_arg) +void MYSQL_LOG::init(enum_log_type log_type_arg, + enum cache_type io_cache_type_arg) { log_type = log_type_arg; + io_cache_type = io_cache_type_arg; if (!inited) { inited=1; @@ -184,7 +187,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, if ((file=my_open(log_file_name,O_CREAT | O_APPEND | O_WRONLY | O_BINARY, MYF(MY_WME | ME_WAITTANG))) < 0 || - init_io_cache(&log_file, file, IO_SIZE, WRITE_CACHE, + init_io_cache(&log_file, file, IO_SIZE, io_cache_type, my_tell(file,MYF(MY_WME)), 0, MYF(MY_WME | MY_NABP))) goto err; @@ -220,6 +223,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, } else if (log_type == LOG_BIN) { + bool error; /* Explanation of the boolean black magic: if we are supposed to write magic number try write @@ -232,10 +236,13 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, goto err; log_seq = 1; - Start_log_event s; - bool error; - s.set_log_seq(0, this); - s.write(&log_file); + if (need_start_event) + { + Start_log_event s; + s.set_log_seq(0, this); + s.write(&log_file); + need_start_event=0; + } flush_io_cache(&log_file); pthread_mutex_lock(&LOCK_index); error=(my_write(index_file, (byte*) log_file_name, strlen(log_file_name), @@ -715,7 +722,8 @@ bool MYSQL_LOG::write(Log_event* event_info) file == &log_file && flush_io_cache(file)) goto err; error=0; - should_rotate = (file == &log_file && my_b_tell(file) >= max_binlog_size); + should_rotate = (file == &log_file && + (uint)my_b_tell(file) >= max_binlog_size); err: if (error) { diff --git a/sql/log_event.cc b/sql/log_event.cc index 6db0c3ef9f7..d954d69ea71 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -149,12 +149,21 @@ static void cleanup_load_tmpdir() #endif -Log_event::Log_event(const char* buf):cached_event_len(0),temp_buf(0) +Log_event::Log_event(const char* buf, bool old_format): + cached_event_len(0),temp_buf(0) { when = uint4korr(buf); server_id = uint4korr(buf + SERVER_ID_OFFSET); - log_seq = uint4korr(buf + LOG_SEQ_OFFSET); - flags = uint2korr(buf + FLAGS_OFFSET); + if (old_format) + { + log_seq=0; + flags=0; + } + else + { + log_seq = uint4korr(buf + LOG_SEQ_OFFSET); + flags = uint2korr(buf + FLAGS_OFFSET); + } #ifndef MYSQL_CLIENT thd = 0; #endif @@ -441,17 +450,24 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, #define UNLOCK_MUTEX #endif +#ifndef MYSQL_CLIENT +#define LOCK_MUTEX if(log_lock) pthread_mutex_lock(log_lock); +#else +#define LOCK_MUTEX +#endif + + // allocates memory - the caller is responsible for clean-up #ifndef MYSQL_CLIENT -Log_event* Log_event::read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock) +Log_event* Log_event::read_log_event(IO_CACHE* file, + pthread_mutex_t* log_lock, + bool old_format) #else -Log_event* Log_event::read_log_event(IO_CACHE* file) +Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format) #endif { char head[LOG_EVENT_HEADER_LEN]; -#ifndef MYSQL_CLIENT - if(log_lock) pthread_mutex_lock(log_lock); -#endif + LOCK_MUTEX; if (my_b_read(file, (byte *) head, sizeof(head))) { UNLOCK_MUTEX; @@ -489,7 +505,7 @@ Log_event* Log_event::read_log_event(IO_CACHE* file) error = "read error"; goto err; } - if ((res = read_log_event(buf, data_len, &error))) + if ((res = read_log_event(buf, data_len, &error, old_format))) res->register_temp_buf(buf); err: UNLOCK_MUTEX; @@ -502,7 +518,7 @@ err: } Log_event* Log_event::read_log_event(const char* buf, int event_len, - const char **error) + const char **error, bool old_format) { if (event_len < EVENT_LEN_OFFSET || (uint)event_len != uint4korr(buf+EVENT_LEN_OFFSET)) @@ -513,14 +529,14 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len, switch(buf[EVENT_TYPE_OFFSET]) { case QUERY_EVENT: - ev = new Query_log_event(buf, event_len); + ev = new Query_log_event(buf, event_len, old_format); break; case LOAD_EVENT: case NEW_LOAD_EVENT: - ev = new Load_log_event(buf, event_len); + ev = new Load_log_event(buf, event_len, old_format); break; case ROTATE_EVENT: - ev = new Rotate_log_event(buf, event_len); + ev = new Rotate_log_event(buf, event_len, old_format); break; case SLAVE_EVENT: ev = new Slave_log_event(buf, event_len); @@ -538,13 +554,13 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len, ev = new Execute_load_log_event(buf, event_len); break; case START_EVENT: - ev = new Start_log_event(buf); + ev = new Start_log_event(buf, old_format); break; case STOP_EVENT: - ev = new Stop_log_event(buf); + ev = new Stop_log_event(buf, old_format); break; case INTVAR_EVENT: - ev = new Intvar_log_event(buf); + ev = new Intvar_log_event(buf, old_format); break; default: break; @@ -634,7 +650,8 @@ void Rotate_log_event::print(FILE* file, bool short_form, char* last_db) #endif /* #ifdef MYSQL_CLIENT */ -Start_log_event::Start_log_event(const char* buf) :Log_event(buf) +Start_log_event::Start_log_event(const char* buf, + bool old_format) :Log_event(buf, old_format) { binlog_version = uint2korr(buf + LOG_EVENT_HEADER_LEN + ST_BINLOG_VER_OFFSET); @@ -652,8 +669,9 @@ int Start_log_event::write_data(IO_CACHE* file) return (my_b_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0); } -Rotate_log_event::Rotate_log_event(const char* buf, int event_len): - Log_event(buf),new_log_ident(NULL),alloced(0) +Rotate_log_event::Rotate_log_event(const char* buf, int event_len, + bool old_format): + Log_event(buf, old_format),new_log_ident(NULL),alloced(0) { // the caller will ensure that event_len is what we have at // EVENT_LEN_OFFSET @@ -695,8 +713,9 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, } #endif -Query_log_event::Query_log_event(const char* buf, int event_len): - Log_event(buf),data_buf(0), query(NULL), db(NULL) +Query_log_event::Query_log_event(const char* buf, int event_len, + bool old_format): + Log_event(buf, old_format),data_buf(0), query(NULL), db(NULL) { if ((uint)event_len < QUERY_EVENT_OVERHEAD) return; @@ -766,7 +785,8 @@ int Query_log_event::write_data(IO_CACHE* file) my_b_write(file, (byte*) query, q_len)) ? -1 : 0; } -Intvar_log_event::Intvar_log_event(const char* buf):Log_event(buf) +Intvar_log_event::Intvar_log_event(const char* buf, bool old_format): + Log_event(buf, old_format) { buf += LOG_EVENT_HEADER_LEN; type = buf[I_TYPE_OFFSET]; @@ -1003,8 +1023,9 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex, // the caller must do buf[event_len] = 0 before he starts using the // constructed event -Load_log_event::Load_log_event(const char* buf, int event_len): - Log_event(buf),num_fields(0),fields(0), +Load_log_event::Load_log_event(const char* buf, int event_len, + bool old_format): + Log_event(buf, old_format),num_fields(0),fields(0), field_lens(0),field_block_len(0), table_name(0),db(0),fname(0) { @@ -1237,7 +1258,7 @@ void Slave_log_event::init_from_mem_pool(int data_size) } Slave_log_event::Slave_log_event(const char* buf, int event_len): - Log_event(buf),mem_pool(0),master_host(0) + Log_event(buf,0),mem_pool(0),master_host(0) { event_len -= LOG_EVENT_HEADER_LEN; if(event_len < 0) @@ -1291,7 +1312,7 @@ int Create_file_log_event::write_base(IO_CACHE* file) } Create_file_log_event::Create_file_log_event(const char* buf, int len): - Load_log_event(buf,0),fake_base(0),block(0) + Load_log_event(buf,0,0),fake_base(0),block(0) { int block_offset; if (copy_log_event(buf,len)) @@ -1347,7 +1368,7 @@ Append_block_log_event::Append_block_log_event(THD* thd_arg, char* block_arg, #endif Append_block_log_event::Append_block_log_event(const char* buf, int len): - Log_event(buf),block(0) + Log_event(buf, 0),block(0) { if((uint)len < APPEND_BLOCK_EVENT_OVERHEAD) return; @@ -1399,7 +1420,7 @@ Delete_file_log_event::Delete_file_log_event(THD* thd_arg): #endif Delete_file_log_event::Delete_file_log_event(const char* buf, int len): - Log_event(buf),file_id(0) + Log_event(buf, 0),file_id(0) { if((uint)len < DELETE_FILE_EVENT_OVERHEAD) return; @@ -1446,7 +1467,7 @@ Execute_load_log_event::Execute_load_log_event(THD* thd_arg): #endif Execute_load_log_event::Execute_load_log_event(const char* buf,int len): - Log_event(buf),file_id(0) + Log_event(buf, 0),file_id(0) { if((uint)len < EXEC_LOAD_EVENT_OVERHEAD) return; @@ -1657,15 +1678,11 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi) int Start_log_event::exec_event(struct st_master_info* mi) { -#ifdef TO_BE_DELETED - /* - We can't close temporary files or cleanup the tmpdir here, becasue - someone may have just rotated the logs on the master. - We should only do this cleanup when we know the master restarted. - */ - close_temporary_tables(thd); - cleanup_load_tmpdir(); -#endif + if (!mi->old_format) + { + close_temporary_tables(thd); + cleanup_load_tmpdir(); + } return Log_event::exec_event(mi); } @@ -1866,7 +1883,9 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi) slave_print_error(my_errno, "Could not open file '%s'", fname); goto err; } - if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,0)) + if (!(lev = (Load_log_event*)Log_event::read_log_event(&file, + (pthread_mutex_t*)0, + (bool)0)) || lev->get_type_code() != NEW_LOAD_EVENT) { slave_print_error(0, "File '%s' appears corrupted", fname); diff --git a/sql/log_event.h b/sql/log_event.h index 71f0f2a8575..9f9bb46d221 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -242,7 +242,7 @@ public: virtual Log_event_type get_type_code() = 0; virtual bool is_valid() = 0; virtual bool get_cache_stmt() { return 0; } - Log_event(const char* buf); + Log_event(const char* buf, bool old_format); #ifndef MYSQL_CLIENT Log_event(THD* thd_arg, uint16 flags_arg = 0); #endif @@ -268,12 +268,14 @@ public: #ifndef MYSQL_CLIENT // if mutex is 0, the read will proceed without mutex - static Log_event* read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock); + static Log_event* read_log_event(IO_CACHE* file, + pthread_mutex_t* log_lock, + bool old_format); #else // avoid having to link mysqlbinlog against libpthread - static Log_event* read_log_event(IO_CACHE* file); + static Log_event* read_log_event(IO_CACHE* file, bool old_format); #endif static Log_event* read_log_event(const char* buf, int event_len, - const char **error); + const char **error, bool old_format); const char* get_type_str(); #ifndef MYSQL_CLIENT @@ -317,7 +319,7 @@ public: bool get_cache_stmt() { return cache_stmt; } #endif - Query_log_event(const char* buf, int event_len); + Query_log_event(const char* buf, int event_len, bool old_format); ~Query_log_event() { if (data_buf) @@ -411,7 +413,7 @@ public: int exec_event(NET* net, struct st_master_info* mi); #endif - Load_log_event(const char* buf, int event_len); + Load_log_event(const char* buf, int event_len, bool old_format); ~Load_log_event() { } @@ -451,7 +453,7 @@ public: memcpy(server_version, ::server_version, ST_SERVER_VER_LEN); } #endif - Start_log_event(const char* buf); + Start_log_event(const char* buf, bool old_format); ~Start_log_event() {} Log_event_type get_type_code() { return START_EVENT;} int write_data(IO_CACHE* file); @@ -479,7 +481,7 @@ public: :Log_event(thd_arg),val(val_arg),type(type_arg) {} #endif - Intvar_log_event(const char* buf); + Intvar_log_event(const char* buf, bool old_format); ~Intvar_log_event() {} Log_event_type get_type_code() { return INTVAR_EVENT;} const char* get_var_type_name(); @@ -503,7 +505,8 @@ public: Stop_log_event() :Log_event((THD*)0) {} #endif - Stop_log_event(const char* buf):Log_event(buf) + Stop_log_event(const char* buf, bool old_format):Log_event(buf, + old_format) { } ~Stop_log_event() {} @@ -534,7 +537,7 @@ public: alloced(0) {} #endif - Rotate_log_event(const char* buf, int event_len); + Rotate_log_event(const char* buf, int event_len, bool old_format); ~Rotate_log_event() { if (alloced) @@ -686,7 +689,6 @@ public: #endif }; - #endif diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc index c836e6803ee..d846662947d 100644 --- a/sql/repl_failsafe.cc +++ b/sql/repl_failsafe.cc @@ -20,12 +20,21 @@ #include "repl_failsafe.h" #include "sql_repl.h" #include "slave.h" +#include "sql_acl.h" #include "mini_client.h" +#include "log_event.h" #include <mysql.h> +#include <thr_alarm.h> + +#define SLAVE_LIST_CHUNK 128 +#define SLAVE_ERRMSG_SIZE (FN_REFLEN+64) + RPL_STATUS rpl_status=RPL_NULL; pthread_mutex_t LOCK_rpl_status; pthread_cond_t COND_rpl_status; +HASH slave_list; +extern const char* any_db; const char *rpl_role_type[] = {"MASTER","SLAVE",NullS}; TYPELIB rpl_role_typelib = {array_elements(rpl_role_type)-1,"", @@ -37,6 +46,10 @@ const char* rpl_status_type[] = {"AUTH_MASTER","ACTIVE_SLAVE","IDLE_SLAVE", TYPELIB rpl_status_typelib= {array_elements(rpl_status_type)-1,"", rpl_status_type}; +static Slave_log_event* find_slave_event(IO_CACHE* log, + const char* log_file_name, + char* errmsg); + static int init_failsafe_rpl_thread(THD* thd) { DBUG_ENTER("init_failsafe_rpl_thread"); @@ -89,6 +102,333 @@ void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status) pthread_mutex_unlock(&LOCK_rpl_status); } +#define get_object(p, obj) \ +{\ + uint len = (uint)*p++; \ + if (p + len > p_end || len >= sizeof(obj)) \ + goto err; \ + strmake(obj,(char*) p,len); \ + p+= len; \ +}\ + +static inline int cmp_master_pos(Slave_log_event* sev, LEX_MASTER_INFO* mi) +{ + return cmp_master_pos(sev->master_log, sev->master_pos, mi->log_file_name, + mi->pos); +} + +void unregister_slave(THD* thd, bool only_mine, bool need_mutex) +{ + if (need_mutex) + pthread_mutex_lock(&LOCK_slave_list); + if (thd->server_id) + { + SLAVE_INFO* old_si; + if ((old_si = (SLAVE_INFO*)hash_search(&slave_list, + (byte*)&thd->server_id, 4)) && + (!only_mine || old_si->thd == thd)) + hash_delete(&slave_list, (byte*)old_si); + } + if (need_mutex) + pthread_mutex_unlock(&LOCK_slave_list); +} + +int register_slave(THD* thd, uchar* packet, uint packet_length) +{ + SLAVE_INFO *si; + int res = 1; + uchar* p = packet, *p_end = packet + packet_length; + + if (check_access(thd, FILE_ACL, any_db)) + return 1; + + if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME)))) + goto err; + + thd->server_id = si->server_id = uint4korr(p); + p += 4; + get_object(p,si->host); + get_object(p,si->user); + get_object(p,si->password); + si->port = uint2korr(p); + p += 2; + si->rpl_recovery_rank = uint4korr(p); + p += 4; + if (!(si->master_id = uint4korr(p))) + si->master_id = server_id; + si->thd = thd; + pthread_mutex_lock(&LOCK_slave_list); + + unregister_slave(thd,0,0); + res = hash_insert(&slave_list, (byte*) si); + pthread_mutex_unlock(&LOCK_slave_list); + return res; + +err: + if (si) + my_free((gptr) si, MYF(MY_WME)); + return res; +} + +static uint32* slave_list_key(SLAVE_INFO* si, uint* len, + my_bool not_used __attribute__((unused))) +{ + *len = 4; + return &si->server_id; +} + +static void slave_info_free(void *s) +{ + my_free((gptr) s, MYF(MY_WME)); +} + +void init_slave_list() +{ + hash_init(&slave_list, SLAVE_LIST_CHUNK, 0, 0, + (hash_get_key) slave_list_key, slave_info_free, 0); + pthread_mutex_init(&LOCK_slave_list, MY_MUTEX_INIT_FAST); +} + +void end_slave_list() +{ + pthread_mutex_lock(&LOCK_slave_list); + hash_free(&slave_list); + pthread_mutex_unlock(&LOCK_slave_list); + pthread_mutex_destroy(&LOCK_slave_list); +} + +static int find_target_pos(LEX_MASTER_INFO* mi, IO_CACHE* log, char* errmsg) +{ + uint32 log_seq = mi->last_log_seq; + uint32 target_server_id = mi->server_id; + + for (;;) + { + Log_event* ev; + if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*)0, + 0))) + { + if (log->error > 0) + strmov(errmsg, "Binary log truncated in the middle of event"); + else if (log->error < 0) + strmov(errmsg, "I/O error reading binary log"); + else + strmov(errmsg, "Could not find target event in the binary log"); + return 1; + } + + if (ev->log_seq == log_seq && ev->server_id == target_server_id) + { + delete ev; + mi->pos = my_b_tell(log); + return 0; + } + + delete ev; + } +} + + +int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg) +{ + LOG_INFO linfo; + char search_file_name[FN_REFLEN],last_log_name[FN_REFLEN]; + IO_CACHE log; + File file = -1, last_file = -1; + pthread_mutex_t *log_lock; + const char* errmsg_p; + Slave_log_event* sev = 0; + my_off_t last_pos = 0; + int error = 1; + int cmp_res; + LINT_INIT(cmp_res); + + if (!mysql_bin_log.is_open()) + { + strmov(errmsg,"Binary log is not open"); + return 1; + } + + if (!server_id_supplied) + { + strmov(errmsg, "Misconfigured master - server id was not set"); + return 1; + } + + linfo.index_file_offset = 0; + + + search_file_name[0] = 0; + + if (mysql_bin_log.find_first_log(&linfo, search_file_name)) + { + strmov(errmsg,"Could not find first log"); + return 1; + } + thd->current_linfo = &linfo; + + bzero((char*) &log,sizeof(log)); + log_lock = mysql_bin_log.get_log_lock(); + pthread_mutex_lock(log_lock); + + for (;;) + { + if ((file=open_binlog(&log, linfo.log_file_name, &errmsg_p)) < 0) + { + strmov(errmsg, errmsg_p); + goto err; + } + + if (!(sev = find_slave_event(&log, linfo.log_file_name, errmsg))) + goto err; + + cmp_res = cmp_master_pos(sev, mi); + delete sev; + + if (!cmp_res) + { + /* Copy basename */ + fn_format(mi->log_file_name, linfo.log_file_name, "","",1); + mi->pos = my_b_tell(&log); + goto mi_inited; + } + else if (cmp_res > 0) + { + if (!last_pos) + { + strmov(errmsg, + "Slave event in first log points past the target position"); + goto err; + } + end_io_cache(&log); + (void) my_close(file, MYF(MY_WME)); + if (init_io_cache(&log, (file = last_file), IO_SIZE, READ_CACHE, 0, 0, + MYF(MY_WME))) + { + errmsg[0] = 0; + goto err; + } + break; + } + + strmov(last_log_name, linfo.log_file_name); + last_pos = my_b_tell(&log); + + switch (mysql_bin_log.find_next_log(&linfo)) { + case LOG_INFO_EOF: + if (last_file >= 0) + (void)my_close(last_file, MYF(MY_WME)); + last_file = -1; + goto found_log; + case 0: + break; + default: + strmov(errmsg, "Error reading log index"); + goto err; + } + + end_io_cache(&log); + if (last_file >= 0) + (void) my_close(last_file, MYF(MY_WME)); + last_file = file; + } + +found_log: + my_b_seek(&log, last_pos); + if (find_target_pos(mi,&log,errmsg)) + goto err; + fn_format(mi->log_file_name, last_log_name, "","",1); /* Copy basename */ + +mi_inited: + error = 0; +err: + pthread_mutex_unlock(log_lock); + end_io_cache(&log); + pthread_mutex_lock(&LOCK_thread_count); + thd->current_linfo = 0; + pthread_mutex_unlock(&LOCK_thread_count); + if (file >= 0) + (void) my_close(file, MYF(MY_WME)); + if (last_file >= 0 && last_file != file) + (void) my_close(last_file, MYF(MY_WME)); + + return error; +} + +// caller must delete result when done +static Slave_log_event* find_slave_event(IO_CACHE* log, + const char* log_file_name, + char* errmsg) +{ + Log_event* ev; + int i; + bool slave_event_found = 0; + LINT_INIT(ev); + + for (i = 0; i < 2; i++) + { + if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*)0, 0))) + { + my_snprintf(errmsg, SLAVE_ERRMSG_SIZE, + "Error reading event in log '%s'", + (char*)log_file_name); + return 0; + } + if (ev->get_type_code() == SLAVE_EVENT) + { + slave_event_found = 1; + break; + } + delete ev; + } + if (!slave_event_found) + { + my_snprintf(errmsg, SLAVE_ERRMSG_SIZE, + "Could not find slave event in log '%s'", + (char*)log_file_name); + delete ev; + return 0; + } + + return (Slave_log_event*)ev; +} + + +int show_new_master(THD* thd) +{ + DBUG_ENTER("show_new_master"); + List<Item> field_list; + char errmsg[SLAVE_ERRMSG_SIZE]; + LEX_MASTER_INFO* lex_mi = &thd->lex.mi; + + errmsg[0]=0; // Safety + if (translate_master(thd, lex_mi, errmsg)) + { + if (errmsg[0]) + net_printf(&thd->net, ER_ERROR_WHEN_EXECUTING_COMMAND, + "SHOW NEW MASTER", errmsg); + else + send_error(&thd->net, 0); + + DBUG_RETURN(1); + } + else + { + String* packet = &thd->packet; + field_list.push_back(new Item_empty_string("Log_name", 20)); + field_list.push_back(new Item_empty_string("Log_pos", 20)); + if (send_fields(thd, field_list, 1)) + DBUG_RETURN(-1); + packet->length(0); + net_store_data(packet, lex_mi->log_file_name); + net_store_data(packet, (longlong)lex_mi->pos); + if (my_net_write(&thd->net, packet->ptr(), packet->length())) + DBUG_RETURN(-1); + send_eof(&thd->net); + DBUG_RETURN(0); + } +} + int update_slave_list(MYSQL* mysql) { MYSQL_RES* res=0; @@ -216,6 +556,277 @@ err: DBUG_RETURN(0); } +int show_slave_hosts(THD* thd) +{ + List<Item> field_list; + NET* net = &thd->net; + String* packet = &thd->packet; + DBUG_ENTER("show_slave_hosts"); + + field_list.push_back(new Item_empty_string("Server_id", 20)); + field_list.push_back(new Item_empty_string("Host", 20)); + if (opt_show_slave_auth_info) + { + field_list.push_back(new Item_empty_string("User",20)); + field_list.push_back(new Item_empty_string("Password",20)); + } + field_list.push_back(new Item_empty_string("Port",20)); + field_list.push_back(new Item_empty_string("Rpl_recovery_rank", 20)); + field_list.push_back(new Item_empty_string("Master_id", 20)); + + if (send_fields(thd, field_list, 1)) + DBUG_RETURN(-1); + + pthread_mutex_lock(&LOCK_slave_list); + + for (uint i = 0; i < slave_list.records; ++i) + { + SLAVE_INFO* si = (SLAVE_INFO*) hash_element(&slave_list, i); + packet->length(0); + net_store_data(packet, si->server_id); + net_store_data(packet, si->host); + if (opt_show_slave_auth_info) + { + net_store_data(packet, si->user); + net_store_data(packet, si->password); + } + net_store_data(packet, (uint32) si->port); + net_store_data(packet, si->rpl_recovery_rank); + net_store_data(packet, si->master_id); + if (my_net_write(net, (char*)packet->ptr(), packet->length())) + { + pthread_mutex_unlock(&LOCK_slave_list); + DBUG_RETURN(-1); + } + } + pthread_mutex_unlock(&LOCK_slave_list); + send_eof(net); + DBUG_RETURN(0); +} + +int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi) +{ + if (!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0, + mi->port, 0, 0)) + { + sql_print_error("Connection to master failed: %s", + mc_mysql_error(mysql)); + return 1; + } + return 0; +} + + +static inline void cleanup_mysql_results(MYSQL_RES* db_res, + MYSQL_RES** cur, MYSQL_RES** start) +{ + for( ; cur >= start; --cur) + { + if (*cur) + mc_mysql_free_result(*cur); + } + mc_mysql_free_result(db_res); +} + + +static inline int fetch_db_tables(THD* thd, MYSQL* mysql, const char* db, + MYSQL_RES* table_res) +{ + MYSQL_ROW row; + + for( row = mc_mysql_fetch_row(table_res); row; + row = mc_mysql_fetch_row(table_res)) + { + TABLE_LIST table; + const char* table_name = row[0]; + int error; + if (table_rules_on) + { + table.next = 0; + table.db = (char*)db; + table.real_name = (char*)table_name; + table.updating = 1; + if (!tables_ok(thd, &table)) + continue; + } + + if ((error = fetch_nx_table(thd, db, table_name, &glob_mi, mysql))) + return error; + } + + return 0; +} + + +int load_master_data(THD* thd) +{ + MYSQL mysql; + MYSQL_RES* master_status_res = 0; + bool slave_was_running = 0; + int error = 0; + + mc_mysql_init(&mysql); + + // we do not want anyone messing with the slave at all for the entire + // duration of the data load; + pthread_mutex_lock(&LOCK_slave); + + // first, kill the slave + if ((slave_was_running = slave_running)) + { + abort_slave = 1; + KICK_SLAVE; + thd->proc_info = "waiting for slave to die"; + while (slave_running) + pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done + } + + + if (connect_to_master(thd, &mysql, &glob_mi)) + { + net_printf(&thd->net, error = ER_CONNECT_TO_MASTER, + mc_mysql_error(&mysql)); + goto err; + } + + // now that we are connected, get all database and tables in each + { + MYSQL_RES *db_res, **table_res, **table_res_end, **cur_table_res; + uint num_dbs; + + if (mc_mysql_query(&mysql, "show databases", 0) || + !(db_res = mc_mysql_store_result(&mysql))) + { + net_printf(&thd->net, error = ER_QUERY_ON_MASTER, + mc_mysql_error(&mysql)); + goto err; + } + + if (!(num_dbs = (uint) mc_mysql_num_rows(db_res))) + goto err; + // in theory, the master could have no databases at all + // and run with skip-grant + + if (!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*)))) + { + net_printf(&thd->net, error = ER_OUTOFMEMORY); + goto err; + } + + // this is a temporary solution until we have online backup + // capabilities - to be replaced once online backup is working + // we wait to issue FLUSH TABLES WITH READ LOCK for as long as we + // can to minimize the lock time + if (mc_mysql_query(&mysql, "FLUSH TABLES WITH READ LOCK", 0) || + mc_mysql_query(&mysql, "SHOW MASTER STATUS",0) || + !(master_status_res = mc_mysql_store_result(&mysql))) + { + net_printf(&thd->net, error = ER_QUERY_ON_MASTER, + mc_mysql_error(&mysql)); + goto err; + } + + // go through every table in every database, and if the replication + // rules allow replicating it, get it + + table_res_end = table_res + num_dbs; + + for(cur_table_res = table_res; cur_table_res < table_res_end; + cur_table_res++) + { + // since we know how many rows we have, this can never be NULL + MYSQL_ROW row = mc_mysql_fetch_row(db_res); + char* db = row[0]; + + /* + Do not replicate databases excluded by rules + also skip mysql database - in most cases the user will + mess up and not exclude mysql database with the rules when + he actually means to - in this case, he is up for a surprise if + his priv tables get dropped and downloaded from master + TO DO - add special option, not enabled + by default, to allow inclusion of mysql database into load + data from master + */ + + if (!db_ok(db, replicate_do_db, replicate_ignore_db) || + !strcmp(db,"mysql")) + { + *cur_table_res = 0; + continue; + } + + if (mysql_rm_db(thd, db, 1,1) || + mysql_create_db(thd, db, 0, 1)) + { + send_error(&thd->net, 0, 0); + cleanup_mysql_results(db_res, cur_table_res - 1, table_res); + goto err; + } + + if (mc_mysql_select_db(&mysql, db) || + mc_mysql_query(&mysql, "show tables", 0) || + !(*cur_table_res = mc_mysql_store_result(&mysql))) + { + net_printf(&thd->net, error = ER_QUERY_ON_MASTER, + mc_mysql_error(&mysql)); + cleanup_mysql_results(db_res, cur_table_res - 1, table_res); + goto err; + } + + if ((error = fetch_db_tables(thd, &mysql, db, *cur_table_res))) + { + // we do not report the error - fetch_db_tables handles it + cleanup_mysql_results(db_res, cur_table_res, table_res); + goto err; + } + } + + cleanup_mysql_results(db_res, cur_table_res - 1, table_res); + + // adjust position in the master + if (master_status_res) + { + MYSQL_ROW row = mc_mysql_fetch_row(master_status_res); + + /* + We need this check because the master may not be running with + log-bin, but it will still allow us to do all the steps + of LOAD DATA FROM MASTER - no reason to forbid it, really, + although it does not make much sense for the user to do it + */ + if (row[0] && row[1]) + { + strmake(glob_mi.log_file_name, row[0], sizeof(glob_mi.log_file_name)); + glob_mi.pos = atoi(row[1]); // atoi() is ok, since offset is <= 1GB + if (glob_mi.pos < 4) + glob_mi.pos = 4; // don't hit the magic number + glob_mi.pending = 0; + flush_master_info(&glob_mi); + } + + mc_mysql_free_result(master_status_res); + } + + if (mc_mysql_query(&mysql, "UNLOCK TABLES", 0)) + { + net_printf(&thd->net, error = ER_QUERY_ON_MASTER, + mc_mysql_error(&mysql)); + goto err; + } + } + +err: + pthread_mutex_unlock(&LOCK_slave); + if (slave_was_running) + start_slave(0, 0); + mc_mysql_close(&mysql); // safe to call since we always do mc_mysql_init() + if (!error) + send_ok(&thd->net); + + return error; +} + diff --git a/sql/repl_failsafe.h b/sql/repl_failsafe.h index b71dde1dc10..77bc03ce8c0 100644 --- a/sql/repl_failsafe.h +++ b/sql/repl_failsafe.h @@ -2,6 +2,8 @@ #define REPL_FAILSAFE_H #include "mysql.h" +#include "my_sys.h" +#include "slave.h" typedef enum {RPL_AUTH_MASTER=0,RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE, RPL_LOST_SOLDIER,RPL_TROOP_SOLDIER, @@ -19,4 +21,18 @@ pthread_handler_decl(handle_failsafe_rpl,arg); void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status); int find_recovery_captain(THD* thd, MYSQL* mysql); int update_slave_list(MYSQL* mysql); + +extern HASH slave_list; + +int load_master_data(THD* thd); +int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi); + +int show_new_master(THD* thd); +int show_slave_hosts(THD* thd); +int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg); +void init_slave_list(); +void end_slave_list(); +int register_slave(THD* thd, uchar* packet, uint packet_length); +void unregister_slave(THD* thd, bool only_mine, bool need_mutex); + #endif diff --git a/sql/slave.cc b/sql/slave.cc index 8075b5ad75b..b78cd0f0835 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -61,6 +61,8 @@ static int safe_sleep(THD* thd, int sec); static int request_table_dump(MYSQL* mysql, const char* db, const char* table); static int create_table_from_dump(THD* thd, NET* net, const char* db, const char* table_name); +static int check_master_version(MYSQL* mysql, MASTER_INFO* mi); + char* rewrite_db(char* db); static void free_table_ent(TABLE_RULE_ENT* e) @@ -333,6 +335,54 @@ static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val) return 1; } +static int check_master_version(MYSQL* mysql, MASTER_INFO* mi) +{ + MYSQL_RES* res; + MYSQL_ROW row; + const char* version; + const char* errmsg = 0; + + if (mc_mysql_query(mysql, "SELECT VERSION()", 0) + || !(res = mc_mysql_store_result(mysql))) + { + sql_print_error("Error checking master version: %s", + mc_mysql_error(mysql)); + return 1; + } + if (!(row = mc_mysql_fetch_row(res))) + { + errmsg = "Master returned no rows for SELECT VERSION()"; + goto err; + } + if (!(version = row[0])) + { + errmsg = "Master reported NULL for the version"; + goto err; + } + + switch (*version) + { + case '3': + mi->old_format = 1; + break; + case '4': + mi->old_format = 0; + break; + default: + errmsg = "Master reported unrecognized MySQL version"; + goto err; + } +err: + if (res) + mc_mysql_free_result(res); + if (errmsg) + { + sql_print_error(errmsg); + return 1; + } + return 0; +} + static int create_table_from_dump(THD* thd, NET* net, const char* db, const char* table_name) @@ -580,7 +630,7 @@ int init_master_info(MASTER_INFO* mi) mi->inited = 1; // now change the cache from READ to WRITE - must do this // before flush_master_info - reinit_io_cache(&mi->file, WRITE_CACHE, 0L,0,1); + reinit_io_cache(&mi->file, WRITE_CACHE,0L,0,1); error=test(flush_master_info(mi)); pthread_mutex_unlock(&mi->lock); return error; @@ -943,12 +993,14 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) { const char *error_msg; Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1, - event_len, &error_msg); + event_len, &error_msg, + mi->old_format); if (ev) { int type_code = ev->get_type_code(); int exec_res; - if (ev->server_id == ::server_id || slave_skip_counter) + if (ev->server_id == ::server_id || + (slave_skip_counter && ev->get_type_code() != ROTATE_EVENT)) { if(type_code == LOAD_EVENT) skip_load_data_infile(net); @@ -1070,9 +1122,17 @@ connected: // register ourselves with the master // if fails, this is not fatal - we just print the error message and go // on with life - thd->proc_info = "Registering slave on master"; - register_slave_on_master(mysql); - update_slave_list(mysql); + thd->proc_info = "Checking master version"; + if (check_master_version(mysql, &glob_mi)) + { + goto err; + } + if (!glob_mi.old_format) + { + thd->proc_info = "Registering slave on master"; + if (register_slave_on_master(mysql) || update_slave_list(mysql)) + goto err; + } while (!slave_killed(thd)) { diff --git a/sql/slave.h b/sql/slave.h index 5c6772147b6..a5bc4d61309 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -23,8 +23,10 @@ typedef struct st_master_info pthread_mutex_t lock; pthread_cond_t cond; bool inited; + bool old_format; /* master binlog is in 3.23 format */ - st_master_info():pending(0),fd(-1),last_log_seq(0),inited(0) + st_master_info():pending(0),fd(-1),last_log_seq(0),inited(0), + old_format(0) { host[0] = 0; user[0] = 0; password[0] = 0; pthread_mutex_init(&lock, MY_MUTEX_INIT_FAST); diff --git a/sql/sql_class.h b/sql/sql_class.h index 53e20b3b6d2..698a90c1a28 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -72,15 +72,18 @@ class MYSQL_LOG { // we should not try to rotate it or write any rotation events // the user should use FLUSH MASTER instead of FLUSH LOGS for // purging - + enum cache_type io_cache_type; + bool need_start_event; friend class Log_event; public: MYSQL_LOG(); ~MYSQL_LOG(); pthread_mutex_t* get_log_lock() { return &LOCK_log; } + void set_need_start_event() { need_start_event = 1; } void set_index_file_name(const char* index_file_name = 0); - void init(enum_log_type log_type_arg); + void init(enum_log_type log_type_arg, + enum cache_type io_cache_type_arg = WRITE_CACHE); void open(const char *log_name,enum_log_type log_type, const char *new_name=0); void new_file(bool inside_mutex = 0); diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 3830db48613..20a645e52af 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -22,6 +22,7 @@ #include "mysql_priv.h" #include "sql_acl.h" #include "sql_repl.h" +#include "repl_failsafe.h" #include <m_ctype.h> #include <thr_alarm.h> #include <myisam.h> diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 684c084ece3..3542b288087 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -24,58 +24,15 @@ #include <thr_alarm.h> #include <my_dir.h> -#define SLAVE_LIST_CHUNK 128 -#define SLAVE_ERRMSG_SIZE (FN_REFLEN+64) - extern const char* any_db; extern pthread_handler_decl(handle_slave,arg); -HASH slave_list; - #ifndef DBUG_OFF int max_binlog_dump_events = 0; // unlimited bool opt_sporadic_binlog_dump_fail = 0; static int binlog_dump_count = 0; #endif -#ifdef SIGNAL_WITH_VIO_CLOSE -#define KICK_SLAVE { slave_thd->close_active_vio(); \ - thr_alarm_kill(slave_real_id); } -#else -#define KICK_SLAVE thr_alarm_kill(slave_real_id); -#endif - -static Slave_log_event* find_slave_event(IO_CACHE* log, - const char* log_file_name, - char* errmsg); - -static uint32* slave_list_key(SLAVE_INFO* si, uint* len, - my_bool not_used __attribute__((unused))) -{ - *len = 4; - return &si->server_id; -} - -static void slave_info_free(void *s) -{ - my_free((gptr) s, MYF(MY_WME)); -} - -void init_slave_list() -{ - hash_init(&slave_list, SLAVE_LIST_CHUNK, 0, 0, - (hash_get_key) slave_list_key, slave_info_free, 0); - pthread_mutex_init(&LOCK_slave_list, MY_MUTEX_INIT_FAST); -} - -void end_slave_list() -{ - pthread_mutex_lock(&LOCK_slave_list); - hash_free(&slave_list); - pthread_mutex_unlock(&LOCK_slave_list); - pthread_mutex_destroy(&LOCK_slave_list); -} - static int fake_rotate_event(NET* net, String* packet, char* log_file_name, const char**errmsg) { @@ -104,69 +61,6 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, return 0; } -#define get_object(p, obj) \ -{\ - uint len = (uint)*p++; \ - if (p + len > p_end || len >= sizeof(obj)) \ - goto err; \ - strmake(obj,(char*) p,len); \ - p+= len; \ -}\ - -void unregister_slave(THD* thd, bool only_mine, bool need_mutex) -{ - if (need_mutex) - pthread_mutex_lock(&LOCK_slave_list); - if (thd->server_id) - { - SLAVE_INFO* old_si; - if ((old_si = (SLAVE_INFO*)hash_search(&slave_list, - (byte*)&thd->server_id, 4)) && - (!only_mine || old_si->thd == thd)) - hash_delete(&slave_list, (byte*)old_si); - } - if (need_mutex) - pthread_mutex_unlock(&LOCK_slave_list); -} - -int register_slave(THD* thd, uchar* packet, uint packet_length) -{ - SLAVE_INFO *si; - int res = 1; - uchar* p = packet, *p_end = packet + packet_length; - - if (check_access(thd, FILE_ACL, any_db)) - return 1; - - if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME)))) - goto err; - - thd->server_id = si->server_id = uint4korr(p); - p += 4; - get_object(p,si->host); - get_object(p,si->user); - get_object(p,si->password); - si->port = uint2korr(p); - p += 2; - si->rpl_recovery_rank = uint4korr(p); - p += 4; - if (!(si->master_id = uint4korr(p))) - si->master_id = server_id; - si->thd = thd; - pthread_mutex_lock(&LOCK_slave_list); - - unregister_slave(thd,0,0); - res = hash_insert(&slave_list, (byte*) si); - pthread_mutex_unlock(&LOCK_slave_list); - return res; - -err: - if (si) - my_free((gptr) si, MYF(MY_WME)); - return res; -} - - static int send_file(THD *thd) { NET* net = &thd->net; @@ -252,6 +146,8 @@ File open_binlog(IO_CACHE *log, const char *log_file_name, if (my_b_read(log, (byte*) magic, sizeof(magic))) { *errmsg = "I/O error reading the header from the binary log"; + sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno, + log->error); goto err; } if (memcmp(magic, BINLOG_MAGIC, sizeof(magic))) @@ -887,8 +783,13 @@ void reset_master() } LOG_INFO linfo; + pthread_mutex_t* log_lock = mysql_bin_log.get_log_lock(); + pthread_mutex_lock(log_lock); if (mysql_bin_log.find_first_log(&linfo, "")) + { + pthread_mutex_unlock(log_lock); return; + } for(;;) { @@ -898,7 +799,9 @@ void reset_master() } mysql_bin_log.close(1); // exiting close my_delete(mysql_bin_log.get_index_fname(), MYF(MY_WME)); + mysql_bin_log.set_need_start_event(); mysql_bin_log.open(opt_bin_logname,LOG_BIN); + pthread_mutex_unlock(log_lock); } @@ -915,242 +818,6 @@ int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1, return -1; } - -static inline int cmp_master_pos(Slave_log_event* sev, LEX_MASTER_INFO* mi) -{ - return cmp_master_pos(sev->master_log, sev->master_pos, mi->log_file_name, - mi->pos); -} - -static int find_target_pos(LEX_MASTER_INFO* mi, IO_CACHE* log, char* errmsg) -{ - uint32 log_seq = mi->last_log_seq; - uint32 target_server_id = mi->server_id; - - for (;;) - { - Log_event* ev; - if (!(ev = Log_event::read_log_event(log, 0))) - { - if (log->error > 0) - strmov(errmsg, "Binary log truncated in the middle of event"); - else if (log->error < 0) - strmov(errmsg, "I/O error reading binary log"); - else - strmov(errmsg, "Could not find target event in the binary log"); - return 1; - } - - if (ev->log_seq == log_seq && ev->server_id == target_server_id) - { - delete ev; - mi->pos = my_b_tell(log); - return 0; - } - - delete ev; - } -} - - -int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg) -{ - LOG_INFO linfo; - char search_file_name[FN_REFLEN],last_log_name[FN_REFLEN]; - IO_CACHE log; - File file = -1, last_file = -1; - pthread_mutex_t *log_lock; - const char* errmsg_p; - Slave_log_event* sev = 0; - my_off_t last_pos = 0; - int error = 1; - int cmp_res; - LINT_INIT(cmp_res); - - if (!mysql_bin_log.is_open()) - { - strmov(errmsg,"Binary log is not open"); - return 1; - } - - if (!server_id_supplied) - { - strmov(errmsg, "Misconfigured master - server id was not set"); - return 1; - } - - linfo.index_file_offset = 0; - - - search_file_name[0] = 0; - - if (mysql_bin_log.find_first_log(&linfo, search_file_name)) - { - strmov(errmsg,"Could not find first log"); - return 1; - } - thd->current_linfo = &linfo; - - bzero((char*) &log,sizeof(log)); - log_lock = mysql_bin_log.get_log_lock(); - pthread_mutex_lock(log_lock); - - for (;;) - { - if ((file=open_binlog(&log, linfo.log_file_name, &errmsg_p)) < 0) - { - strmov(errmsg, errmsg_p); - goto err; - } - - if (!(sev = find_slave_event(&log, linfo.log_file_name, errmsg))) - goto err; - - cmp_res = cmp_master_pos(sev, mi); - delete sev; - - if (!cmp_res) - { - /* Copy basename */ - fn_format(mi->log_file_name, linfo.log_file_name, "","",1); - mi->pos = my_b_tell(&log); - goto mi_inited; - } - else if (cmp_res > 0) - { - if (!last_pos) - { - strmov(errmsg, - "Slave event in first log points past the target position"); - goto err; - } - end_io_cache(&log); - (void) my_close(file, MYF(MY_WME)); - if (init_io_cache(&log, (file = last_file), IO_SIZE, READ_CACHE, 0, 0, - MYF(MY_WME))) - { - errmsg[0] = 0; - goto err; - } - break; - } - - strmov(last_log_name, linfo.log_file_name); - last_pos = my_b_tell(&log); - - switch (mysql_bin_log.find_next_log(&linfo)) { - case LOG_INFO_EOF: - if (last_file >= 0) - (void)my_close(last_file, MYF(MY_WME)); - last_file = -1; - goto found_log; - case 0: - break; - default: - strmov(errmsg, "Error reading log index"); - goto err; - } - - end_io_cache(&log); - if (last_file >= 0) - (void) my_close(last_file, MYF(MY_WME)); - last_file = file; - } - -found_log: - my_b_seek(&log, last_pos); - if (find_target_pos(mi,&log,errmsg)) - goto err; - fn_format(mi->log_file_name, last_log_name, "","",1); /* Copy basename */ - -mi_inited: - error = 0; -err: - pthread_mutex_unlock(log_lock); - end_io_cache(&log); - pthread_mutex_lock(&LOCK_thread_count); - thd->current_linfo = 0; - pthread_mutex_unlock(&LOCK_thread_count); - if (file >= 0) - (void) my_close(file, MYF(MY_WME)); - if (last_file >= 0 && last_file != file) - (void) my_close(last_file, MYF(MY_WME)); - - return error; -} - -// caller must delete result when done -static Slave_log_event* find_slave_event(IO_CACHE* log, - const char* log_file_name, - char* errmsg) -{ - Log_event* ev; - if (!(ev = Log_event::read_log_event(log, 0))) - { - my_snprintf(errmsg, SLAVE_ERRMSG_SIZE, - "Error reading start event in log '%s'", - (char*)log_file_name); - return 0; - } - delete ev; - - if (!(ev = Log_event::read_log_event(log, 0))) - { - my_snprintf(errmsg, SLAVE_ERRMSG_SIZE, - "Error reading slave event in log '%s'", - (char*)log_file_name); - return 0; - } - - if (ev->get_type_code() != SLAVE_EVENT) - { - my_snprintf(errmsg, SLAVE_ERRMSG_SIZE, - "Second event in log '%s' is not slave event", - (char*)log_file_name); - delete ev; - return 0; - } - - return (Slave_log_event*)ev; -} - - -int show_new_master(THD* thd) -{ - DBUG_ENTER("show_new_master"); - List<Item> field_list; - char errmsg[SLAVE_ERRMSG_SIZE]; - LEX_MASTER_INFO* lex_mi = &thd->lex.mi; - - errmsg[0]=0; // Safety - if (translate_master(thd, lex_mi, errmsg)) - { - if (errmsg[0]) - net_printf(&thd->net, ER_ERROR_WHEN_EXECUTING_COMMAND, - "SHOW NEW MASTER", errmsg); - else - send_error(&thd->net, 0); - - DBUG_RETURN(1); - } - else - { - String* packet = &thd->packet; - field_list.push_back(new Item_empty_string("Log_name", 20)); - field_list.push_back(new Item_empty_string("Log_pos", 20)); - if (send_fields(thd, field_list, 1)) - DBUG_RETURN(-1); - packet->length(0); - net_store_data(packet, lex_mi->log_file_name); - net_store_data(packet, (longlong)lex_mi->pos); - if (my_net_write(&thd->net, packet->ptr(), packet->length())) - DBUG_RETURN(-1); - send_eof(&thd->net); - DBUG_RETURN(0); - } - -} - int show_binlog_events(THD* thd) { DBUG_ENTER("show_binlog_events"); @@ -1202,7 +869,8 @@ int show_binlog_events(THD* thd) pthread_mutex_lock(mysql_bin_log.get_log_lock()); my_b_seek(&log, pos); - for (event_count = 0; (ev = Log_event::read_log_event(&log, 0)); ) + for (event_count = 0; + (ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,0)); ) { if (event_count >= limit_start && ev->net_send(thd, linfo.log_file_name, pos)) @@ -1247,56 +915,6 @@ err: DBUG_RETURN(0); } - -int show_slave_hosts(THD* thd) -{ - List<Item> field_list; - NET* net = &thd->net; - String* packet = &thd->packet; - DBUG_ENTER("show_slave_hosts"); - - field_list.push_back(new Item_empty_string("Server_id", 20)); - field_list.push_back(new Item_empty_string("Host", 20)); - if (opt_show_slave_auth_info) - { - field_list.push_back(new Item_empty_string("User",20)); - field_list.push_back(new Item_empty_string("Password",20)); - } - field_list.push_back(new Item_empty_string("Port",20)); - field_list.push_back(new Item_empty_string("Rpl_recovery_rank", 20)); - field_list.push_back(new Item_empty_string("Master_id", 20)); - - if (send_fields(thd, field_list, 1)) - DBUG_RETURN(-1); - - pthread_mutex_lock(&LOCK_slave_list); - - for (uint i = 0; i < slave_list.records; ++i) - { - SLAVE_INFO* si = (SLAVE_INFO*) hash_element(&slave_list, i); - packet->length(0); - net_store_data(packet, si->server_id); - net_store_data(packet, si->host); - if (opt_show_slave_auth_info) - { - net_store_data(packet, si->user); - net_store_data(packet, si->password); - } - net_store_data(packet, (uint32) si->port); - net_store_data(packet, si->rpl_recovery_rank); - net_store_data(packet, si->master_id); - if (my_net_write(net, (char*)packet->ptr(), packet->length())) - { - pthread_mutex_unlock(&LOCK_slave_list); - DBUG_RETURN(-1); - } - } - pthread_mutex_unlock(&LOCK_slave_list); - send_eof(net); - DBUG_RETURN(0); -} - - int show_binlog_info(THD* thd) { DBUG_ENTER("show_binlog_info"); @@ -1402,230 +1020,6 @@ err: return 1; } - -int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi) -{ - if (!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0, - mi->port, 0, 0)) - { - sql_print_error("Connection to master failed: %s", - mc_mysql_error(mysql)); - return 1; - } - return 0; -} - - -static inline void cleanup_mysql_results(MYSQL_RES* db_res, - MYSQL_RES** cur, MYSQL_RES** start) -{ - for( ; cur >= start; --cur) - { - if (*cur) - mc_mysql_free_result(*cur); - } - mc_mysql_free_result(db_res); -} - - -static inline int fetch_db_tables(THD* thd, MYSQL* mysql, const char* db, - MYSQL_RES* table_res) -{ - MYSQL_ROW row; - - for( row = mc_mysql_fetch_row(table_res); row; - row = mc_mysql_fetch_row(table_res)) - { - TABLE_LIST table; - const char* table_name = row[0]; - int error; - if (table_rules_on) - { - table.next = 0; - table.db = (char*)db; - table.real_name = (char*)table_name; - table.updating = 1; - if (!tables_ok(thd, &table)) - continue; - } - - if ((error = fetch_nx_table(thd, db, table_name, &glob_mi, mysql))) - return error; - } - - return 0; -} - - -int load_master_data(THD* thd) -{ - MYSQL mysql; - MYSQL_RES* master_status_res = 0; - bool slave_was_running = 0; - int error = 0; - - mc_mysql_init(&mysql); - - // we do not want anyone messing with the slave at all for the entire - // duration of the data load; - pthread_mutex_lock(&LOCK_slave); - - // first, kill the slave - if ((slave_was_running = slave_running)) - { - abort_slave = 1; - KICK_SLAVE; - thd->proc_info = "waiting for slave to die"; - while (slave_running) - pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done - } - - - if (connect_to_master(thd, &mysql, &glob_mi)) - { - net_printf(&thd->net, error = ER_CONNECT_TO_MASTER, - mc_mysql_error(&mysql)); - goto err; - } - - // now that we are connected, get all database and tables in each - { - MYSQL_RES *db_res, **table_res, **table_res_end, **cur_table_res; - uint num_dbs; - - if (mc_mysql_query(&mysql, "show databases", 0) || - !(db_res = mc_mysql_store_result(&mysql))) - { - net_printf(&thd->net, error = ER_QUERY_ON_MASTER, - mc_mysql_error(&mysql)); - goto err; - } - - if (!(num_dbs = (uint) mc_mysql_num_rows(db_res))) - goto err; - // in theory, the master could have no databases at all - // and run with skip-grant - - if (!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*)))) - { - net_printf(&thd->net, error = ER_OUTOFMEMORY); - goto err; - } - - // this is a temporary solution until we have online backup - // capabilities - to be replaced once online backup is working - // we wait to issue FLUSH TABLES WITH READ LOCK for as long as we - // can to minimize the lock time - if (mc_mysql_query(&mysql, "FLUSH TABLES WITH READ LOCK", 0) || - mc_mysql_query(&mysql, "SHOW MASTER STATUS",0) || - !(master_status_res = mc_mysql_store_result(&mysql))) - { - net_printf(&thd->net, error = ER_QUERY_ON_MASTER, - mc_mysql_error(&mysql)); - goto err; - } - - // go through every table in every database, and if the replication - // rules allow replicating it, get it - - table_res_end = table_res + num_dbs; - - for(cur_table_res = table_res; cur_table_res < table_res_end; - cur_table_res++) - { - // since we know how many rows we have, this can never be NULL - MYSQL_ROW row = mc_mysql_fetch_row(db_res); - char* db = row[0]; - - /* - Do not replicate databases excluded by rules - also skip mysql database - in most cases the user will - mess up and not exclude mysql database with the rules when - he actually means to - in this case, he is up for a surprise if - his priv tables get dropped and downloaded from master - TO DO - add special option, not enabled - by default, to allow inclusion of mysql database into load - data from master - */ - - if (!db_ok(db, replicate_do_db, replicate_ignore_db) || - !strcmp(db,"mysql")) - { - *cur_table_res = 0; - continue; - } - - if (mysql_rm_db(thd, db, 1,1) || - mysql_create_db(thd, db, 0, 1)) - { - send_error(&thd->net, 0, 0); - cleanup_mysql_results(db_res, cur_table_res - 1, table_res); - goto err; - } - - if (mc_mysql_select_db(&mysql, db) || - mc_mysql_query(&mysql, "show tables", 0) || - !(*cur_table_res = mc_mysql_store_result(&mysql))) - { - net_printf(&thd->net, error = ER_QUERY_ON_MASTER, - mc_mysql_error(&mysql)); - cleanup_mysql_results(db_res, cur_table_res - 1, table_res); - goto err; - } - - if ((error = fetch_db_tables(thd, &mysql, db, *cur_table_res))) - { - // we do not report the error - fetch_db_tables handles it - cleanup_mysql_results(db_res, cur_table_res, table_res); - goto err; - } - } - - cleanup_mysql_results(db_res, cur_table_res - 1, table_res); - - // adjust position in the master - if (master_status_res) - { - MYSQL_ROW row = mc_mysql_fetch_row(master_status_res); - - /* - We need this check because the master may not be running with - log-bin, but it will still allow us to do all the steps - of LOAD DATA FROM MASTER - no reason to forbid it, really, - although it does not make much sense for the user to do it - */ - if (row[0] && row[1]) - { - strmake(glob_mi.log_file_name, row[0], sizeof(glob_mi.log_file_name)); - glob_mi.pos = atoi(row[1]); // atoi() is ok, since offset is <= 1GB - if (glob_mi.pos < 4) - glob_mi.pos = 4; // don't hit the magic number - glob_mi.pending = 0; - flush_master_info(&glob_mi); - } - - mc_mysql_free_result(master_status_res); - } - - if (mc_mysql_query(&mysql, "UNLOCK TABLES", 0)) - { - net_printf(&thd->net, error = ER_QUERY_ON_MASTER, - mc_mysql_error(&mysql)); - goto err; - } - } - -err: - pthread_mutex_unlock(&LOCK_slave); - if (slave_was_running) - start_slave(0, 0); - mc_mysql_close(&mysql); // safe to call since we always do mc_mysql_init() - if (!error) - send_ok(&thd->net); - - return error; -} - int log_loaded_block(IO_CACHE* file) { LOAD_FILE_INFO* lf_info; diff --git a/sql/sql_repl.h b/sql/sql_repl.h index c12e0536058..4b9f741dde7 100644 --- a/sql/sql_repl.h +++ b/sql/sql_repl.h @@ -15,7 +15,6 @@ typedef struct st_slave_info } SLAVE_INFO; extern bool opt_show_slave_auth_info, opt_old_rpl_compat; -extern HASH slave_list; extern char* master_host; extern my_string opt_bin_logname, master_info_file; extern uint32 server_id; @@ -27,26 +26,24 @@ extern int max_binlog_dump_events; extern bool opt_sporadic_binlog_dump_fail; #endif +#ifdef SIGNAL_WITH_VIO_CLOSE +#define KICK_SLAVE { slave_thd->close_active_vio(); \ + thr_alarm_kill(slave_real_id); } +#else +#define KICK_SLAVE thr_alarm_kill(slave_real_id); +#endif + File open_binlog(IO_CACHE *log, const char *log_file_name, const char **errmsg); int start_slave(THD* thd = 0, bool net_report = 1); int stop_slave(THD* thd = 0, bool net_report = 1); -int load_master_data(THD* thd); -int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi); int change_master(THD* thd); -int show_new_master(THD* thd); -int show_slave_hosts(THD* thd); int show_binlog_events(THD* thd); -int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg); int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1, const char* log_file_name2, ulonglong log_pos2); void reset_slave(); void reset_master(); -void init_slave_list(); -void end_slave_list(); -int register_slave(THD* thd, uchar* packet, uint packet_length); -void unregister_slave(THD* thd, bool only_mine, bool need_mutex); int purge_master_logs(THD* thd, const char* to_log); bool log_in_use(const char* log_name); void adjust_linfo_offsets(my_off_t purge_offset); |