summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/mysqlbinlog.cc81
-rw-r--r--libmysql/Makefile.shared3
-rw-r--r--mysql-test/mysql-test-run.sh7
-rw-r--r--mysql-test/r/rpl000002.result2
-rw-r--r--mysql-test/r/rpl000016.result8
-rw-r--r--mysql-test/r/rpl_log.result28
-rw-r--r--mysql-test/t/rpl000016.test3
-rw-r--r--mysys/mf_iocache.c8
-rw-r--r--sql/log.cc24
-rw-r--r--sql/log_event.cc97
-rw-r--r--sql/log_event.h24
-rw-r--r--sql/repl_failsafe.cc611
-rw-r--r--sql/repl_failsafe.h16
-rw-r--r--sql/slave.cc72
-rw-r--r--sql/slave.h4
-rw-r--r--sql/sql_class.h7
-rw-r--r--sql/sql_parse.cc1
-rw-r--r--sql/sql_repl.cc628
-rw-r--r--sql/sql_repl.h17
19 files changed, 919 insertions, 722 deletions
diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc
index 8cb1b82753e..c0e87b95a92 100644
--- a/client/mysqlbinlog.cc
+++ b/client/mysqlbinlog.cc
@@ -21,6 +21,8 @@
#include <time.h>
#include "log_event.h"
+#define PROBE_HEADER_LEN (4+EVENT_LEN_OFFSET)
+
#define CLIENT_CAPABILITIES (CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG | CLIENT_LOCAL_FILES)
char server_version[SERVER_VERSION_LENGTH];
@@ -288,6 +290,52 @@ static void dump_remote_table(NET* net, const char* db, const char* table)
}
}
+static int check_master_version(MYSQL* mysql)
+{
+ MYSQL_RES* res = 0;
+ MYSQL_ROW row;
+ const char* version;
+ int old_format = 0;
+
+ if (mysql_query(mysql, "SELECT VERSION()")
+ || !(res = mysql_store_result(mysql)))
+ {
+ mysql_close(mysql);
+ die("Error checking master version: %s",
+ mysql_error(mysql));
+ }
+ if (!(row = mysql_fetch_row(res)))
+ {
+ mysql_free_result(res);
+ mysql_close(mysql);
+ die("Master returned no rows for SELECT VERSION()");
+ return 1;
+ }
+ if (!(version = row[0]))
+ {
+ mysql_free_result(res);
+ mysql_close(mysql);
+ die("Master reported NULL for the version");
+ }
+
+ switch (*version)
+ {
+ case '3':
+ old_format = 1;
+ break;
+ case '4':
+ old_format = 0;
+ break;
+ default:
+ sql_print_error("Master reported unrecognized MySQL version '%s'",
+ version);
+ mysql_free_result(res);
+ mysql_close(mysql);
+ return 1;
+ }
+ mysql_free_result(res);
+ return old_format;
+}
static void dump_remote_log_entries(const char* logname)
{
@@ -295,6 +343,9 @@ static void dump_remote_log_entries(const char* logname)
char last_db[FN_REFLEN+1] = "";
uint len;
NET* net = &mysql->net;
+ int old_format;
+ old_format = check_master_version(mysql);
+
if(!position) position = 4; // protect the innocent from spam
if (position < 4)
{
@@ -307,7 +358,7 @@ static void dump_remote_log_entries(const char* logname)
len = (uint) strlen(logname);
int4store(buf + 6, 0);
memcpy(buf + 10, logname,len);
- if(simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
+ if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
die("Error sending the log dump command");
for(;;)
@@ -322,7 +373,7 @@ static void dump_remote_log_entries(const char* logname)
len, net->read_pos[5]));
Log_event * ev = Log_event::read_log_event(
(const char*) net->read_pos + 1 ,
- len - 1, &error);
+ len - 1, &error, old_format);
if (ev)
{
ev->print(result_file, short_form, last_db);
@@ -335,12 +386,34 @@ static void dump_remote_log_entries(const char* logname)
}
}
+static int check_header (IO_CACHE* file)
+{
+ char buf[PROBE_HEADER_LEN];
+ int old_format;
+
+ my_off_t pos = my_b_tell(file);
+ my_b_seek(file, (my_off_t)0);
+ if (my_b_read(file, buf, sizeof(buf)))
+ die("Failed reading header");
+ if (buf[EVENT_TYPE_OFFSET+4] == START_EVENT)
+ {
+ uint event_len;
+ event_len = uint4korr(buf + EVENT_LEN_OFFSET + 4);
+ old_format = (event_len < LOG_EVENT_HEADER_LEN + START_HEADER_LEN);
+ }
+ else
+ old_format = 0;
+ my_b_seek(file, pos);
+ return old_format;
+}
+
static void dump_local_log_entries(const char* logname)
{
File fd = -1;
IO_CACHE cache,*file= &cache;
ulonglong rec_count = 0;
char last_db[FN_REFLEN+1] = "";
+ bool old_format = 0;
if (logname && logname[0] != '-')
{
@@ -349,12 +422,14 @@ static void dump_local_log_entries(const char* logname)
if (init_io_cache(file, fd, 0, READ_CACHE, (my_off_t) position, 0,
MYF(MY_WME | MY_NABP)))
exit(1);
+ old_format = check_header(file);
}
else
{
if (init_io_cache(file, fileno(result_file), 0, READ_CACHE, (my_off_t) 0,
0, MYF(MY_WME | MY_NABP | MY_DONT_CHECK_FILESIZE)))
exit(1);
+ old_format = check_header(file);
if (position)
{
/* skip 'position' characters from stdout */
@@ -385,7 +460,7 @@ static void dump_local_log_entries(const char* logname)
char llbuff[21];
my_off_t old_off = my_b_tell(file);
- Log_event* ev = Log_event::read_log_event(file);
+ Log_event* ev = Log_event::read_log_event(file, old_format);
if (!ev)
{
if (file->error)
diff --git a/libmysql/Makefile.shared b/libmysql/Makefile.shared
index e00a961cabe..d661d69a339 100644
--- a/libmysql/Makefile.shared
+++ b/libmysql/Makefile.shared
@@ -55,7 +55,8 @@ mysysobjects1 = my_init.lo my_static.lo my_malloc.lo my_realloc.lo \
mf_loadpath.lo my_pthread.lo my_thr_init.lo \
thr_mutex.lo mulalloc.lo string.lo default.lo \
my_compress.lo array.lo my_once.lo list.lo my_net.lo \
- charset.lo hash.lo mf_iocache.lo my_seek.lo \
+ charset.lo hash.lo mf_iocache.lo \
+ mf_iocache2.lo my_seek.lo \
my_pread.lo mf_cache.lo my_vsnprintf.lo md5.lo
# Not needed in the minimum library
diff --git a/mysql-test/mysql-test-run.sh b/mysql-test/mysql-test-run.sh
index af595e80244..ed7414effd7 100644
--- a/mysql-test/mysql-test-run.sh
+++ b/mysql-test/mysql-test-run.sh
@@ -168,6 +168,9 @@ while test $# -gt 0; do
USE_MANAGER=1
USE_RUNNING_SERVER=
;;
+ --start-and-exit)
+ START_AND_EXIT=1
+ ;;
--skip-innobase)
EXTRA_MASTER_MYSQLD_OPT="$EXTRA_MASTER_MYSQLD_OPT --skip-innobase"
EXTRA_SLAVE_MYSQLD_OPT="$EXTRA_SLAVE_MYSQLD_OPT --skip-innobase" ;;
@@ -1091,6 +1094,10 @@ then
mysql_loadstd
fi
+if [ "x$START_AND_EXIT" = "x1" ] ; then
+ echo "Servers started, exiting"
+ exit
+fi
$ECHO "Starting Tests"
diff --git a/mysql-test/r/rpl000002.result b/mysql-test/r/rpl000002.result
index e321d82750c..88228321897 100644
--- a/mysql-test/r/rpl000002.result
+++ b/mysql-test/r/rpl000002.result
@@ -16,7 +16,7 @@ n
2002
show slave hosts;
Server_id Host Port Rpl_recovery_rank Master_id
-2 127.0.0.1 9307 2 1
+2 127.0.0.1 $SLAVE_MYPORT 2 1
drop table t1;
slave stop;
drop table if exists t2;
diff --git a/mysql-test/r/rpl000016.result b/mysql-test/r/rpl000016.result
index 3fb94c5b189..e15d22ea3ee 100644
--- a/mysql-test/r/rpl000016.result
+++ b/mysql-test/r/rpl000016.result
@@ -15,7 +15,7 @@ create table t1 (s text);
insert into t1 values('Could not break slave'),('Tried hard');
show slave status;
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
-127.0.0.1 root 9999 60 master-bin.001 234 Yes 0 0 3
+127.0.0.1 root $MASTER_MYPORT 60 master-bin.001 234 Yes 0 0 3
select * from t1;
s
Could not break slave
@@ -42,7 +42,7 @@ master-bin.003
insert into t2 values (65);
show slave status;
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
-127.0.0.1 root 9999 60 master-bin.003 202 Yes 0 0 3
+127.0.0.1 root $MASTER_MYPORT 60 master-bin.003 127 Yes 0 0 2
select * from t2;
m
34
@@ -60,12 +60,12 @@ master-bin.005
master-bin.006
show master status;
File Position Binlog_do_db Binlog_ignore_db
-master-bin.006 710
+master-bin.006 382
slave stop;
slave start;
show slave status;
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
-127.0.0.1 root 9999 60 master-bin.006 710 Yes 0 0 11
+127.0.0.1 root $MASTER_MYPORT 60 master-bin.006 382 Yes 0 0 6
lock tables t3 read;
select count(*) from t3 where n >= 4;
count(*)
diff --git a/mysql-test/r/rpl_log.result b/mysql-test/r/rpl_log.result
index 37b2474f9e6..43108b880b4 100644
--- a/mysql-test/r/rpl_log.result
+++ b/mysql-test/r/rpl_log.result
@@ -16,7 +16,7 @@ load data infile '../../std_data/words.dat' into table t1;
drop table t1;
show binlog events;
Log_name Pos Event_type Server_id Log_seq Info
-master-bin.001 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2
+master-bin.001 4 Start 1 1 Server ver: 4.0.1-alpha-debug-log, Binlog ver: 2
master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
master-bin.001 172 Intvar 1 3 INSERT_ID=1
master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
@@ -41,7 +41,7 @@ insert into t1 values (1);
drop table t1;
show binlog events;
Log_name Pos Event_type Server_id Log_seq Info
-master-bin.001 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2
+master-bin.001 4 Start 1 1 Server ver: 4.0.1-alpha-debug-log, Binlog ver: 2
master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
master-bin.001 172 Intvar 1 3 INSERT_ID=1
master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
@@ -54,10 +54,9 @@ master-bin.001 627 Rotate 1 10 master-bin.002;pos=4
master-bin.001 668 Stop 1 11
show binlog events in 'master-bin.002';
Log_name Pos Event_type Server_id Log_seq Info
-master-bin.002 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2
-master-bin.002 79 Query 1 2 use test; create table t1 (n int)
-master-bin.002 137 Query 1 3 use test; insert into t1 values (1)
-master-bin.002 197 Query 1 4 use test; drop table t1
+master-bin.002 4 Query 1 1 use test; create table t1 (n int)
+master-bin.002 62 Query 1 2 use test; insert into t1 values (1)
+master-bin.002 122 Query 1 3 use test; drop table t1
show master logs;
Log_name
master-bin.001
@@ -69,7 +68,7 @@ slave-bin.001
slave-bin.002
show binlog events in 'slave-bin.001' from 4;
Log_name Pos Event_type Server_id Log_seq Info
-slave-bin.001 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2
+slave-bin.001 4 Start 2 1 Server ver: 4.0.1-alpha-debug-log, Binlog ver: 2
slave-bin.001 79 Slave 2 3 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.001,pos=4
slave-bin.001 132 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
slave-bin.001 225 Intvar 1 3 INSERT_ID=1
@@ -83,14 +82,13 @@ slave-bin.001 689 Rotate 1 4 slave-bin.002;pos=4; forced by master
slave-bin.001 729 Stop 2 5
show binlog events in 'slave-bin.002' from 4;
Log_name Pos Event_type Server_id Log_seq Info
-slave-bin.002 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2
-slave-bin.002 79 Slave 2 10 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4
-slave-bin.002 132 Query 1 2 use test; create table t1 (n int)
-slave-bin.002 190 Query 1 3 use test; insert into t1 values (1)
-slave-bin.002 250 Query 1 4 use test; drop table t1
+slave-bin.002 4 Slave 2 10 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4
+slave-bin.002 57 Query 1 1 use test; create table t1 (n int)
+slave-bin.002 115 Query 1 2 use test; insert into t1 values (1)
+slave-bin.002 175 Query 1 3 use test; drop table t1
show slave status;
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
-127.0.0.1 root $MASTER_MYPORT 1 master-bin.002 245 Yes 0 0 4
+127.0.0.1 root $MASTER_MYPORT 1 master-bin.002 170 Yes 0 0 3
show new master for slave with master_log_file='master-bin.001' and
master_log_pos=4 and master_log_seq=1 and master_server_id=1;
Log_name Log_pos
@@ -106,8 +104,8 @@ slave-bin.001 439
show new master for slave with master_log_file='master-bin.002' and
master_log_pos=4 and master_log_seq=1 and master_server_id=1;
Log_name Log_pos
-slave-bin.002 132
+slave-bin.002 57
show new master for slave with master_log_file='master-bin.002' and
master_log_pos=137 and master_log_seq=3 and master_server_id=1;
Log_name Log_pos
-slave-bin.002 250
+slave-bin.002 223
diff --git a/mysql-test/t/rpl000016.test b/mysql-test/t/rpl000016.test
index 714c51f99a6..76ce580da1f 100644
--- a/mysql-test/t/rpl000016.test
+++ b/mysql-test/t/rpl000016.test
@@ -23,7 +23,6 @@ insert into t1 values('Could not break slave'),('Tried hard');
save_master_pos;
connection slave;
sync_with_master;
---replace_result 9306 9999 3334 9999 3335 9999
show slave status;
select * from t1;
connection master;
@@ -70,7 +69,6 @@ insert into t2 values (65);
save_master_pos;
connection slave;
sync_with_master;
---replace_result 9306 9999 3334 9999 3335 9999
show slave status;
select * from t2;
connection master;
@@ -92,7 +90,6 @@ connection slave;
slave stop;
slave start;
sync_with_master;
---replace_result 9306 9999 3334 9999 3335 9999
show slave status;
# because of concurrent insert, the table may not be up to date
# if we do not lock
diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c
index f9b668ce09f..f29df74b651 100644
--- a/mysys/mf_iocache.c
+++ b/mysys/mf_iocache.c
@@ -166,7 +166,8 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
info->seek_not_done= test(file >= 0 && type != READ_FIFO &&
type != READ_NET);
info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
- info->rc_request_pos=info->rc_pos=info->buffer;
+ info->rc_request_pos=info->rc_pos= info->write_pos = info->buffer;
+ info->write_pos = info->write_end = 0;
if (type == SEQ_READ_APPEND)
{
info->append_read_pos = info->write_pos = info->append_buffer;
@@ -308,6 +309,11 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
{
info->append_read_pos = info->write_pos = info->append_buffer;
}
+ if (!info->write_pos)
+ info->write_pos = info->buffer;
+ if (!info->write_end)
+ info->write_end = info->buffer+info->buffer_length-
+ (seek_offset & (IO_SIZE-1));
info->type=type;
info->error=0;
init_read_function(info,type);
diff --git a/sql/log.cc b/sql/log.cc
index b55d514058f..916e7d20d32 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -81,7 +81,8 @@ static int find_uniq_filename(char *name)
MYSQL_LOG::MYSQL_LOG(): last_time(0), query_start(0),index_file(-1),
name(0), log_type(LOG_CLOSED),write_error(0),
- inited(0), log_seq(1), file_id(1),no_rotate(0)
+ inited(0), log_seq(1), file_id(1),no_rotate(0),
+ need_start_event(1)
{
/*
We don't want to intialize LOCK_Log here as the thread system may
@@ -136,9 +137,11 @@ bool MYSQL_LOG::open_index( int options)
MYF(MY_WME))) < 0);
}
-void MYSQL_LOG::init(enum_log_type log_type_arg)
+void MYSQL_LOG::init(enum_log_type log_type_arg,
+ enum cache_type io_cache_type_arg)
{
log_type = log_type_arg;
+ io_cache_type = io_cache_type_arg;
if (!inited)
{
inited=1;
@@ -184,7 +187,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
if ((file=my_open(log_file_name,O_CREAT | O_APPEND | O_WRONLY | O_BINARY,
MYF(MY_WME | ME_WAITTANG))) < 0 ||
- init_io_cache(&log_file, file, IO_SIZE, WRITE_CACHE,
+ init_io_cache(&log_file, file, IO_SIZE, io_cache_type,
my_tell(file,MYF(MY_WME)), 0, MYF(MY_WME | MY_NABP)))
goto err;
@@ -220,6 +223,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
}
else if (log_type == LOG_BIN)
{
+ bool error;
/*
Explanation of the boolean black magic:
if we are supposed to write magic number try write
@@ -232,10 +236,13 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
goto err;
log_seq = 1;
- Start_log_event s;
- bool error;
- s.set_log_seq(0, this);
- s.write(&log_file);
+ if (need_start_event)
+ {
+ Start_log_event s;
+ s.set_log_seq(0, this);
+ s.write(&log_file);
+ need_start_event=0;
+ }
flush_io_cache(&log_file);
pthread_mutex_lock(&LOCK_index);
error=(my_write(index_file, (byte*) log_file_name, strlen(log_file_name),
@@ -715,7 +722,8 @@ bool MYSQL_LOG::write(Log_event* event_info)
file == &log_file && flush_io_cache(file))
goto err;
error=0;
- should_rotate = (file == &log_file && my_b_tell(file) >= max_binlog_size);
+ should_rotate = (file == &log_file &&
+ (uint)my_b_tell(file) >= max_binlog_size);
err:
if (error)
{
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 6db0c3ef9f7..d954d69ea71 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -149,12 +149,21 @@ static void cleanup_load_tmpdir()
#endif
-Log_event::Log_event(const char* buf):cached_event_len(0),temp_buf(0)
+Log_event::Log_event(const char* buf, bool old_format):
+ cached_event_len(0),temp_buf(0)
{
when = uint4korr(buf);
server_id = uint4korr(buf + SERVER_ID_OFFSET);
- log_seq = uint4korr(buf + LOG_SEQ_OFFSET);
- flags = uint2korr(buf + FLAGS_OFFSET);
+ if (old_format)
+ {
+ log_seq=0;
+ flags=0;
+ }
+ else
+ {
+ log_seq = uint4korr(buf + LOG_SEQ_OFFSET);
+ flags = uint2korr(buf + FLAGS_OFFSET);
+ }
#ifndef MYSQL_CLIENT
thd = 0;
#endif
@@ -441,17 +450,24 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
#define UNLOCK_MUTEX
#endif
+#ifndef MYSQL_CLIENT
+#define LOCK_MUTEX if(log_lock) pthread_mutex_lock(log_lock);
+#else
+#define LOCK_MUTEX
+#endif
+
+
// allocates memory - the caller is responsible for clean-up
#ifndef MYSQL_CLIENT
-Log_event* Log_event::read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock)
+Log_event* Log_event::read_log_event(IO_CACHE* file,
+ pthread_mutex_t* log_lock,
+ bool old_format)
#else
-Log_event* Log_event::read_log_event(IO_CACHE* file)
+Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format)
#endif
{
char head[LOG_EVENT_HEADER_LEN];
-#ifndef MYSQL_CLIENT
- if(log_lock) pthread_mutex_lock(log_lock);
-#endif
+ LOCK_MUTEX;
if (my_b_read(file, (byte *) head, sizeof(head)))
{
UNLOCK_MUTEX;
@@ -489,7 +505,7 @@ Log_event* Log_event::read_log_event(IO_CACHE* file)
error = "read error";
goto err;
}
- if ((res = read_log_event(buf, data_len, &error)))
+ if ((res = read_log_event(buf, data_len, &error, old_format)))
res->register_temp_buf(buf);
err:
UNLOCK_MUTEX;
@@ -502,7 +518,7 @@ err:
}
Log_event* Log_event::read_log_event(const char* buf, int event_len,
- const char **error)
+ const char **error, bool old_format)
{
if (event_len < EVENT_LEN_OFFSET ||
(uint)event_len != uint4korr(buf+EVENT_LEN_OFFSET))
@@ -513,14 +529,14 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
switch(buf[EVENT_TYPE_OFFSET])
{
case QUERY_EVENT:
- ev = new Query_log_event(buf, event_len);
+ ev = new Query_log_event(buf, event_len, old_format);
break;
case LOAD_EVENT:
case NEW_LOAD_EVENT:
- ev = new Load_log_event(buf, event_len);
+ ev = new Load_log_event(buf, event_len, old_format);
break;
case ROTATE_EVENT:
- ev = new Rotate_log_event(buf, event_len);
+ ev = new Rotate_log_event(buf, event_len, old_format);
break;
case SLAVE_EVENT:
ev = new Slave_log_event(buf, event_len);
@@ -538,13 +554,13 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
ev = new Execute_load_log_event(buf, event_len);
break;
case START_EVENT:
- ev = new Start_log_event(buf);
+ ev = new Start_log_event(buf, old_format);
break;
case STOP_EVENT:
- ev = new Stop_log_event(buf);
+ ev = new Stop_log_event(buf, old_format);
break;
case INTVAR_EVENT:
- ev = new Intvar_log_event(buf);
+ ev = new Intvar_log_event(buf, old_format);
break;
default:
break;
@@ -634,7 +650,8 @@ void Rotate_log_event::print(FILE* file, bool short_form, char* last_db)
#endif /* #ifdef MYSQL_CLIENT */
-Start_log_event::Start_log_event(const char* buf) :Log_event(buf)
+Start_log_event::Start_log_event(const char* buf,
+ bool old_format) :Log_event(buf, old_format)
{
binlog_version = uint2korr(buf + LOG_EVENT_HEADER_LEN +
ST_BINLOG_VER_OFFSET);
@@ -652,8 +669,9 @@ int Start_log_event::write_data(IO_CACHE* file)
return (my_b_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0);
}
-Rotate_log_event::Rotate_log_event(const char* buf, int event_len):
- Log_event(buf),new_log_ident(NULL),alloced(0)
+Rotate_log_event::Rotate_log_event(const char* buf, int event_len,
+ bool old_format):
+ Log_event(buf, old_format),new_log_ident(NULL),alloced(0)
{
// the caller will ensure that event_len is what we have at
// EVENT_LEN_OFFSET
@@ -695,8 +713,9 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
}
#endif
-Query_log_event::Query_log_event(const char* buf, int event_len):
- Log_event(buf),data_buf(0), query(NULL), db(NULL)
+Query_log_event::Query_log_event(const char* buf, int event_len,
+ bool old_format):
+ Log_event(buf, old_format),data_buf(0), query(NULL), db(NULL)
{
if ((uint)event_len < QUERY_EVENT_OVERHEAD)
return;
@@ -766,7 +785,8 @@ int Query_log_event::write_data(IO_CACHE* file)
my_b_write(file, (byte*) query, q_len)) ? -1 : 0;
}
-Intvar_log_event::Intvar_log_event(const char* buf):Log_event(buf)
+Intvar_log_event::Intvar_log_event(const char* buf, bool old_format):
+ Log_event(buf, old_format)
{
buf += LOG_EVENT_HEADER_LEN;
type = buf[I_TYPE_OFFSET];
@@ -1003,8 +1023,9 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex,
// the caller must do buf[event_len] = 0 before he starts using the
// constructed event
-Load_log_event::Load_log_event(const char* buf, int event_len):
- Log_event(buf),num_fields(0),fields(0),
+Load_log_event::Load_log_event(const char* buf, int event_len,
+ bool old_format):
+ Log_event(buf, old_format),num_fields(0),fields(0),
field_lens(0),field_block_len(0),
table_name(0),db(0),fname(0)
{
@@ -1237,7 +1258,7 @@ void Slave_log_event::init_from_mem_pool(int data_size)
}
Slave_log_event::Slave_log_event(const char* buf, int event_len):
- Log_event(buf),mem_pool(0),master_host(0)
+ Log_event(buf,0),mem_pool(0),master_host(0)
{
event_len -= LOG_EVENT_HEADER_LEN;
if(event_len < 0)
@@ -1291,7 +1312,7 @@ int Create_file_log_event::write_base(IO_CACHE* file)
}
Create_file_log_event::Create_file_log_event(const char* buf, int len):
- Load_log_event(buf,0),fake_base(0),block(0)
+ Load_log_event(buf,0,0),fake_base(0),block(0)
{
int block_offset;
if (copy_log_event(buf,len))
@@ -1347,7 +1368,7 @@ Append_block_log_event::Append_block_log_event(THD* thd_arg, char* block_arg,
#endif
Append_block_log_event::Append_block_log_event(const char* buf, int len):
- Log_event(buf),block(0)
+ Log_event(buf, 0),block(0)
{
if((uint)len < APPEND_BLOCK_EVENT_OVERHEAD)
return;
@@ -1399,7 +1420,7 @@ Delete_file_log_event::Delete_file_log_event(THD* thd_arg):
#endif
Delete_file_log_event::Delete_file_log_event(const char* buf, int len):
- Log_event(buf),file_id(0)
+ Log_event(buf, 0),file_id(0)
{
if((uint)len < DELETE_FILE_EVENT_OVERHEAD)
return;
@@ -1446,7 +1467,7 @@ Execute_load_log_event::Execute_load_log_event(THD* thd_arg):
#endif
Execute_load_log_event::Execute_load_log_event(const char* buf,int len):
- Log_event(buf),file_id(0)
+ Log_event(buf, 0),file_id(0)
{
if((uint)len < EXEC_LOAD_EVENT_OVERHEAD)
return;
@@ -1657,15 +1678,11 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
int Start_log_event::exec_event(struct st_master_info* mi)
{
-#ifdef TO_BE_DELETED
- /*
- We can't close temporary files or cleanup the tmpdir here, becasue
- someone may have just rotated the logs on the master.
- We should only do this cleanup when we know the master restarted.
- */
- close_temporary_tables(thd);
- cleanup_load_tmpdir();
-#endif
+ if (!mi->old_format)
+ {
+ close_temporary_tables(thd);
+ cleanup_load_tmpdir();
+ }
return Log_event::exec_event(mi);
}
@@ -1866,7 +1883,9 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi)
slave_print_error(my_errno, "Could not open file '%s'", fname);
goto err;
}
- if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,0))
+ if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
+ (pthread_mutex_t*)0,
+ (bool)0))
|| lev->get_type_code() != NEW_LOAD_EVENT)
{
slave_print_error(0, "File '%s' appears corrupted", fname);
diff --git a/sql/log_event.h b/sql/log_event.h
index 71f0f2a8575..9f9bb46d221 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -242,7 +242,7 @@ public:
virtual Log_event_type get_type_code() = 0;
virtual bool is_valid() = 0;
virtual bool get_cache_stmt() { return 0; }
- Log_event(const char* buf);
+ Log_event(const char* buf, bool old_format);
#ifndef MYSQL_CLIENT
Log_event(THD* thd_arg, uint16 flags_arg = 0);
#endif
@@ -268,12 +268,14 @@ public:
#ifndef MYSQL_CLIENT
// if mutex is 0, the read will proceed without mutex
- static Log_event* read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock);
+ static Log_event* read_log_event(IO_CACHE* file,
+ pthread_mutex_t* log_lock,
+ bool old_format);
#else // avoid having to link mysqlbinlog against libpthread
- static Log_event* read_log_event(IO_CACHE* file);
+ static Log_event* read_log_event(IO_CACHE* file, bool old_format);
#endif
static Log_event* read_log_event(const char* buf, int event_len,
- const char **error);
+ const char **error, bool old_format);
const char* get_type_str();
#ifndef MYSQL_CLIENT
@@ -317,7 +319,7 @@ public:
bool get_cache_stmt() { return cache_stmt; }
#endif
- Query_log_event(const char* buf, int event_len);
+ Query_log_event(const char* buf, int event_len, bool old_format);
~Query_log_event()
{
if (data_buf)
@@ -411,7 +413,7 @@ public:
int exec_event(NET* net, struct st_master_info* mi);
#endif
- Load_log_event(const char* buf, int event_len);
+ Load_log_event(const char* buf, int event_len, bool old_format);
~Load_log_event()
{
}
@@ -451,7 +453,7 @@ public:
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
}
#endif
- Start_log_event(const char* buf);
+ Start_log_event(const char* buf, bool old_format);
~Start_log_event() {}
Log_event_type get_type_code() { return START_EVENT;}
int write_data(IO_CACHE* file);
@@ -479,7 +481,7 @@ public:
:Log_event(thd_arg),val(val_arg),type(type_arg)
{}
#endif
- Intvar_log_event(const char* buf);
+ Intvar_log_event(const char* buf, bool old_format);
~Intvar_log_event() {}
Log_event_type get_type_code() { return INTVAR_EVENT;}
const char* get_var_type_name();
@@ -503,7 +505,8 @@ public:
Stop_log_event() :Log_event((THD*)0)
{}
#endif
- Stop_log_event(const char* buf):Log_event(buf)
+ Stop_log_event(const char* buf, bool old_format):Log_event(buf,
+ old_format)
{
}
~Stop_log_event() {}
@@ -534,7 +537,7 @@ public:
alloced(0)
{}
#endif
- Rotate_log_event(const char* buf, int event_len);
+ Rotate_log_event(const char* buf, int event_len, bool old_format);
~Rotate_log_event()
{
if (alloced)
@@ -686,7 +689,6 @@ public:
#endif
};
-
#endif
diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc
index c836e6803ee..d846662947d 100644
--- a/sql/repl_failsafe.cc
+++ b/sql/repl_failsafe.cc
@@ -20,12 +20,21 @@
#include "repl_failsafe.h"
#include "sql_repl.h"
#include "slave.h"
+#include "sql_acl.h"
#include "mini_client.h"
+#include "log_event.h"
#include <mysql.h>
+#include <thr_alarm.h>
+
+#define SLAVE_LIST_CHUNK 128
+#define SLAVE_ERRMSG_SIZE (FN_REFLEN+64)
+
RPL_STATUS rpl_status=RPL_NULL;
pthread_mutex_t LOCK_rpl_status;
pthread_cond_t COND_rpl_status;
+HASH slave_list;
+extern const char* any_db;
const char *rpl_role_type[] = {"MASTER","SLAVE",NullS};
TYPELIB rpl_role_typelib = {array_elements(rpl_role_type)-1,"",
@@ -37,6 +46,10 @@ const char* rpl_status_type[] = {"AUTH_MASTER","ACTIVE_SLAVE","IDLE_SLAVE",
TYPELIB rpl_status_typelib= {array_elements(rpl_status_type)-1,"",
rpl_status_type};
+static Slave_log_event* find_slave_event(IO_CACHE* log,
+ const char* log_file_name,
+ char* errmsg);
+
static int init_failsafe_rpl_thread(THD* thd)
{
DBUG_ENTER("init_failsafe_rpl_thread");
@@ -89,6 +102,333 @@ void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status)
pthread_mutex_unlock(&LOCK_rpl_status);
}
+#define get_object(p, obj) \
+{\
+ uint len = (uint)*p++; \
+ if (p + len > p_end || len >= sizeof(obj)) \
+ goto err; \
+ strmake(obj,(char*) p,len); \
+ p+= len; \
+}\
+
+static inline int cmp_master_pos(Slave_log_event* sev, LEX_MASTER_INFO* mi)
+{
+ return cmp_master_pos(sev->master_log, sev->master_pos, mi->log_file_name,
+ mi->pos);
+}
+
+void unregister_slave(THD* thd, bool only_mine, bool need_mutex)
+{
+ if (need_mutex)
+ pthread_mutex_lock(&LOCK_slave_list);
+ if (thd->server_id)
+ {
+ SLAVE_INFO* old_si;
+ if ((old_si = (SLAVE_INFO*)hash_search(&slave_list,
+ (byte*)&thd->server_id, 4)) &&
+ (!only_mine || old_si->thd == thd))
+ hash_delete(&slave_list, (byte*)old_si);
+ }
+ if (need_mutex)
+ pthread_mutex_unlock(&LOCK_slave_list);
+}
+
+int register_slave(THD* thd, uchar* packet, uint packet_length)
+{
+ SLAVE_INFO *si;
+ int res = 1;
+ uchar* p = packet, *p_end = packet + packet_length;
+
+ if (check_access(thd, FILE_ACL, any_db))
+ return 1;
+
+ if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
+ goto err;
+
+ thd->server_id = si->server_id = uint4korr(p);
+ p += 4;
+ get_object(p,si->host);
+ get_object(p,si->user);
+ get_object(p,si->password);
+ si->port = uint2korr(p);
+ p += 2;
+ si->rpl_recovery_rank = uint4korr(p);
+ p += 4;
+ if (!(si->master_id = uint4korr(p)))
+ si->master_id = server_id;
+ si->thd = thd;
+ pthread_mutex_lock(&LOCK_slave_list);
+
+ unregister_slave(thd,0,0);
+ res = hash_insert(&slave_list, (byte*) si);
+ pthread_mutex_unlock(&LOCK_slave_list);
+ return res;
+
+err:
+ if (si)
+ my_free((gptr) si, MYF(MY_WME));
+ return res;
+}
+
+static uint32* slave_list_key(SLAVE_INFO* si, uint* len,
+ my_bool not_used __attribute__((unused)))
+{
+ *len = 4;
+ return &si->server_id;
+}
+
+static void slave_info_free(void *s)
+{
+ my_free((gptr) s, MYF(MY_WME));
+}
+
+void init_slave_list()
+{
+ hash_init(&slave_list, SLAVE_LIST_CHUNK, 0, 0,
+ (hash_get_key) slave_list_key, slave_info_free, 0);
+ pthread_mutex_init(&LOCK_slave_list, MY_MUTEX_INIT_FAST);
+}
+
+void end_slave_list()
+{
+ pthread_mutex_lock(&LOCK_slave_list);
+ hash_free(&slave_list);
+ pthread_mutex_unlock(&LOCK_slave_list);
+ pthread_mutex_destroy(&LOCK_slave_list);
+}
+
+static int find_target_pos(LEX_MASTER_INFO* mi, IO_CACHE* log, char* errmsg)
+{
+ uint32 log_seq = mi->last_log_seq;
+ uint32 target_server_id = mi->server_id;
+
+ for (;;)
+ {
+ Log_event* ev;
+ if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*)0,
+ 0)))
+ {
+ if (log->error > 0)
+ strmov(errmsg, "Binary log truncated in the middle of event");
+ else if (log->error < 0)
+ strmov(errmsg, "I/O error reading binary log");
+ else
+ strmov(errmsg, "Could not find target event in the binary log");
+ return 1;
+ }
+
+ if (ev->log_seq == log_seq && ev->server_id == target_server_id)
+ {
+ delete ev;
+ mi->pos = my_b_tell(log);
+ return 0;
+ }
+
+ delete ev;
+ }
+}
+
+
+int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg)
+{
+ LOG_INFO linfo;
+ char search_file_name[FN_REFLEN],last_log_name[FN_REFLEN];
+ IO_CACHE log;
+ File file = -1, last_file = -1;
+ pthread_mutex_t *log_lock;
+ const char* errmsg_p;
+ Slave_log_event* sev = 0;
+ my_off_t last_pos = 0;
+ int error = 1;
+ int cmp_res;
+ LINT_INIT(cmp_res);
+
+ if (!mysql_bin_log.is_open())
+ {
+ strmov(errmsg,"Binary log is not open");
+ return 1;
+ }
+
+ if (!server_id_supplied)
+ {
+ strmov(errmsg, "Misconfigured master - server id was not set");
+ return 1;
+ }
+
+ linfo.index_file_offset = 0;
+
+
+ search_file_name[0] = 0;
+
+ if (mysql_bin_log.find_first_log(&linfo, search_file_name))
+ {
+ strmov(errmsg,"Could not find first log");
+ return 1;
+ }
+ thd->current_linfo = &linfo;
+
+ bzero((char*) &log,sizeof(log));
+ log_lock = mysql_bin_log.get_log_lock();
+ pthread_mutex_lock(log_lock);
+
+ for (;;)
+ {
+ if ((file=open_binlog(&log, linfo.log_file_name, &errmsg_p)) < 0)
+ {
+ strmov(errmsg, errmsg_p);
+ goto err;
+ }
+
+ if (!(sev = find_slave_event(&log, linfo.log_file_name, errmsg)))
+ goto err;
+
+ cmp_res = cmp_master_pos(sev, mi);
+ delete sev;
+
+ if (!cmp_res)
+ {
+ /* Copy basename */
+ fn_format(mi->log_file_name, linfo.log_file_name, "","",1);
+ mi->pos = my_b_tell(&log);
+ goto mi_inited;
+ }
+ else if (cmp_res > 0)
+ {
+ if (!last_pos)
+ {
+ strmov(errmsg,
+ "Slave event in first log points past the target position");
+ goto err;
+ }
+ end_io_cache(&log);
+ (void) my_close(file, MYF(MY_WME));
+ if (init_io_cache(&log, (file = last_file), IO_SIZE, READ_CACHE, 0, 0,
+ MYF(MY_WME)))
+ {
+ errmsg[0] = 0;
+ goto err;
+ }
+ break;
+ }
+
+ strmov(last_log_name, linfo.log_file_name);
+ last_pos = my_b_tell(&log);
+
+ switch (mysql_bin_log.find_next_log(&linfo)) {
+ case LOG_INFO_EOF:
+ if (last_file >= 0)
+ (void)my_close(last_file, MYF(MY_WME));
+ last_file = -1;
+ goto found_log;
+ case 0:
+ break;
+ default:
+ strmov(errmsg, "Error reading log index");
+ goto err;
+ }
+
+ end_io_cache(&log);
+ if (last_file >= 0)
+ (void) my_close(last_file, MYF(MY_WME));
+ last_file = file;
+ }
+
+found_log:
+ my_b_seek(&log, last_pos);
+ if (find_target_pos(mi,&log,errmsg))
+ goto err;
+ fn_format(mi->log_file_name, last_log_name, "","",1); /* Copy basename */
+
+mi_inited:
+ error = 0;
+err:
+ pthread_mutex_unlock(log_lock);
+ end_io_cache(&log);
+ pthread_mutex_lock(&LOCK_thread_count);
+ thd->current_linfo = 0;
+ pthread_mutex_unlock(&LOCK_thread_count);
+ if (file >= 0)
+ (void) my_close(file, MYF(MY_WME));
+ if (last_file >= 0 && last_file != file)
+ (void) my_close(last_file, MYF(MY_WME));
+
+ return error;
+}
+
+// caller must delete result when done
+static Slave_log_event* find_slave_event(IO_CACHE* log,
+ const char* log_file_name,
+ char* errmsg)
+{
+ Log_event* ev;
+ int i;
+ bool slave_event_found = 0;
+ LINT_INIT(ev);
+
+ for (i = 0; i < 2; i++)
+ {
+ if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*)0, 0)))
+ {
+ my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
+ "Error reading event in log '%s'",
+ (char*)log_file_name);
+ return 0;
+ }
+ if (ev->get_type_code() == SLAVE_EVENT)
+ {
+ slave_event_found = 1;
+ break;
+ }
+ delete ev;
+ }
+ if (!slave_event_found)
+ {
+ my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
+ "Could not find slave event in log '%s'",
+ (char*)log_file_name);
+ delete ev;
+ return 0;
+ }
+
+ return (Slave_log_event*)ev;
+}
+
+
+int show_new_master(THD* thd)
+{
+ DBUG_ENTER("show_new_master");
+ List<Item> field_list;
+ char errmsg[SLAVE_ERRMSG_SIZE];
+ LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
+
+ errmsg[0]=0; // Safety
+ if (translate_master(thd, lex_mi, errmsg))
+ {
+ if (errmsg[0])
+ net_printf(&thd->net, ER_ERROR_WHEN_EXECUTING_COMMAND,
+ "SHOW NEW MASTER", errmsg);
+ else
+ send_error(&thd->net, 0);
+
+ DBUG_RETURN(1);
+ }
+ else
+ {
+ String* packet = &thd->packet;
+ field_list.push_back(new Item_empty_string("Log_name", 20));
+ field_list.push_back(new Item_empty_string("Log_pos", 20));
+ if (send_fields(thd, field_list, 1))
+ DBUG_RETURN(-1);
+ packet->length(0);
+ net_store_data(packet, lex_mi->log_file_name);
+ net_store_data(packet, (longlong)lex_mi->pos);
+ if (my_net_write(&thd->net, packet->ptr(), packet->length()))
+ DBUG_RETURN(-1);
+ send_eof(&thd->net);
+ DBUG_RETURN(0);
+ }
+}
+
int update_slave_list(MYSQL* mysql)
{
MYSQL_RES* res=0;
@@ -216,6 +556,277 @@ err:
DBUG_RETURN(0);
}
+int show_slave_hosts(THD* thd)
+{
+ List<Item> field_list;
+ NET* net = &thd->net;
+ String* packet = &thd->packet;
+ DBUG_ENTER("show_slave_hosts");
+
+ field_list.push_back(new Item_empty_string("Server_id", 20));
+ field_list.push_back(new Item_empty_string("Host", 20));
+ if (opt_show_slave_auth_info)
+ {
+ field_list.push_back(new Item_empty_string("User",20));
+ field_list.push_back(new Item_empty_string("Password",20));
+ }
+ field_list.push_back(new Item_empty_string("Port",20));
+ field_list.push_back(new Item_empty_string("Rpl_recovery_rank", 20));
+ field_list.push_back(new Item_empty_string("Master_id", 20));
+
+ if (send_fields(thd, field_list, 1))
+ DBUG_RETURN(-1);
+
+ pthread_mutex_lock(&LOCK_slave_list);
+
+ for (uint i = 0; i < slave_list.records; ++i)
+ {
+ SLAVE_INFO* si = (SLAVE_INFO*) hash_element(&slave_list, i);
+ packet->length(0);
+ net_store_data(packet, si->server_id);
+ net_store_data(packet, si->host);
+ if (opt_show_slave_auth_info)
+ {
+ net_store_data(packet, si->user);
+ net_store_data(packet, si->password);
+ }
+ net_store_data(packet, (uint32) si->port);
+ net_store_data(packet, si->rpl_recovery_rank);
+ net_store_data(packet, si->master_id);
+ if (my_net_write(net, (char*)packet->ptr(), packet->length()))
+ {
+ pthread_mutex_unlock(&LOCK_slave_list);
+ DBUG_RETURN(-1);
+ }
+ }
+ pthread_mutex_unlock(&LOCK_slave_list);
+ send_eof(net);
+ DBUG_RETURN(0);
+}
+
+int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi)
+{
+ if (!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
+ mi->port, 0, 0))
+ {
+ sql_print_error("Connection to master failed: %s",
+ mc_mysql_error(mysql));
+ return 1;
+ }
+ return 0;
+}
+
+
+static inline void cleanup_mysql_results(MYSQL_RES* db_res,
+ MYSQL_RES** cur, MYSQL_RES** start)
+{
+ for( ; cur >= start; --cur)
+ {
+ if (*cur)
+ mc_mysql_free_result(*cur);
+ }
+ mc_mysql_free_result(db_res);
+}
+
+
+static inline int fetch_db_tables(THD* thd, MYSQL* mysql, const char* db,
+ MYSQL_RES* table_res)
+{
+ MYSQL_ROW row;
+
+ for( row = mc_mysql_fetch_row(table_res); row;
+ row = mc_mysql_fetch_row(table_res))
+ {
+ TABLE_LIST table;
+ const char* table_name = row[0];
+ int error;
+ if (table_rules_on)
+ {
+ table.next = 0;
+ table.db = (char*)db;
+ table.real_name = (char*)table_name;
+ table.updating = 1;
+ if (!tables_ok(thd, &table))
+ continue;
+ }
+
+ if ((error = fetch_nx_table(thd, db, table_name, &glob_mi, mysql)))
+ return error;
+ }
+
+ return 0;
+}
+
+
+int load_master_data(THD* thd)
+{
+ MYSQL mysql;
+ MYSQL_RES* master_status_res = 0;
+ bool slave_was_running = 0;
+ int error = 0;
+
+ mc_mysql_init(&mysql);
+
+ // we do not want anyone messing with the slave at all for the entire
+ // duration of the data load;
+ pthread_mutex_lock(&LOCK_slave);
+
+ // first, kill the slave
+ if ((slave_was_running = slave_running))
+ {
+ abort_slave = 1;
+ KICK_SLAVE;
+ thd->proc_info = "waiting for slave to die";
+ while (slave_running)
+ pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done
+ }
+
+
+ if (connect_to_master(thd, &mysql, &glob_mi))
+ {
+ net_printf(&thd->net, error = ER_CONNECT_TO_MASTER,
+ mc_mysql_error(&mysql));
+ goto err;
+ }
+
+ // now that we are connected, get all database and tables in each
+ {
+ MYSQL_RES *db_res, **table_res, **table_res_end, **cur_table_res;
+ uint num_dbs;
+
+ if (mc_mysql_query(&mysql, "show databases", 0) ||
+ !(db_res = mc_mysql_store_result(&mysql)))
+ {
+ net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
+ mc_mysql_error(&mysql));
+ goto err;
+ }
+
+ if (!(num_dbs = (uint) mc_mysql_num_rows(db_res)))
+ goto err;
+ // in theory, the master could have no databases at all
+ // and run with skip-grant
+
+ if (!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*))))
+ {
+ net_printf(&thd->net, error = ER_OUTOFMEMORY);
+ goto err;
+ }
+
+ // this is a temporary solution until we have online backup
+ // capabilities - to be replaced once online backup is working
+ // we wait to issue FLUSH TABLES WITH READ LOCK for as long as we
+ // can to minimize the lock time
+ if (mc_mysql_query(&mysql, "FLUSH TABLES WITH READ LOCK", 0) ||
+ mc_mysql_query(&mysql, "SHOW MASTER STATUS",0) ||
+ !(master_status_res = mc_mysql_store_result(&mysql)))
+ {
+ net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
+ mc_mysql_error(&mysql));
+ goto err;
+ }
+
+ // go through every table in every database, and if the replication
+ // rules allow replicating it, get it
+
+ table_res_end = table_res + num_dbs;
+
+ for(cur_table_res = table_res; cur_table_res < table_res_end;
+ cur_table_res++)
+ {
+ // since we know how many rows we have, this can never be NULL
+ MYSQL_ROW row = mc_mysql_fetch_row(db_res);
+ char* db = row[0];
+
+ /*
+ Do not replicate databases excluded by rules
+ also skip mysql database - in most cases the user will
+ mess up and not exclude mysql database with the rules when
+ he actually means to - in this case, he is up for a surprise if
+ his priv tables get dropped and downloaded from master
+ TO DO - add special option, not enabled
+ by default, to allow inclusion of mysql database into load
+ data from master
+ */
+
+ if (!db_ok(db, replicate_do_db, replicate_ignore_db) ||
+ !strcmp(db,"mysql"))
+ {
+ *cur_table_res = 0;
+ continue;
+ }
+
+ if (mysql_rm_db(thd, db, 1,1) ||
+ mysql_create_db(thd, db, 0, 1))
+ {
+ send_error(&thd->net, 0, 0);
+ cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
+ goto err;
+ }
+
+ if (mc_mysql_select_db(&mysql, db) ||
+ mc_mysql_query(&mysql, "show tables", 0) ||
+ !(*cur_table_res = mc_mysql_store_result(&mysql)))
+ {
+ net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
+ mc_mysql_error(&mysql));
+ cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
+ goto err;
+ }
+
+ if ((error = fetch_db_tables(thd, &mysql, db, *cur_table_res)))
+ {
+ // we do not report the error - fetch_db_tables handles it
+ cleanup_mysql_results(db_res, cur_table_res, table_res);
+ goto err;
+ }
+ }
+
+ cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
+
+ // adjust position in the master
+ if (master_status_res)
+ {
+ MYSQL_ROW row = mc_mysql_fetch_row(master_status_res);
+
+ /*
+ We need this check because the master may not be running with
+ log-bin, but it will still allow us to do all the steps
+ of LOAD DATA FROM MASTER - no reason to forbid it, really,
+ although it does not make much sense for the user to do it
+ */
+ if (row[0] && row[1])
+ {
+ strmake(glob_mi.log_file_name, row[0], sizeof(glob_mi.log_file_name));
+ glob_mi.pos = atoi(row[1]); // atoi() is ok, since offset is <= 1GB
+ if (glob_mi.pos < 4)
+ glob_mi.pos = 4; // don't hit the magic number
+ glob_mi.pending = 0;
+ flush_master_info(&glob_mi);
+ }
+
+ mc_mysql_free_result(master_status_res);
+ }
+
+ if (mc_mysql_query(&mysql, "UNLOCK TABLES", 0))
+ {
+ net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
+ mc_mysql_error(&mysql));
+ goto err;
+ }
+ }
+
+err:
+ pthread_mutex_unlock(&LOCK_slave);
+ if (slave_was_running)
+ start_slave(0, 0);
+ mc_mysql_close(&mysql); // safe to call since we always do mc_mysql_init()
+ if (!error)
+ send_ok(&thd->net);
+
+ return error;
+}
+
diff --git a/sql/repl_failsafe.h b/sql/repl_failsafe.h
index b71dde1dc10..77bc03ce8c0 100644
--- a/sql/repl_failsafe.h
+++ b/sql/repl_failsafe.h
@@ -2,6 +2,8 @@
#define REPL_FAILSAFE_H
#include "mysql.h"
+#include "my_sys.h"
+#include "slave.h"
typedef enum {RPL_AUTH_MASTER=0,RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE,
RPL_LOST_SOLDIER,RPL_TROOP_SOLDIER,
@@ -19,4 +21,18 @@ pthread_handler_decl(handle_failsafe_rpl,arg);
void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status);
int find_recovery_captain(THD* thd, MYSQL* mysql);
int update_slave_list(MYSQL* mysql);
+
+extern HASH slave_list;
+
+int load_master_data(THD* thd);
+int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi);
+
+int show_new_master(THD* thd);
+int show_slave_hosts(THD* thd);
+int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg);
+void init_slave_list();
+void end_slave_list();
+int register_slave(THD* thd, uchar* packet, uint packet_length);
+void unregister_slave(THD* thd, bool only_mine, bool need_mutex);
+
#endif
diff --git a/sql/slave.cc b/sql/slave.cc
index 8075b5ad75b..b78cd0f0835 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -61,6 +61,8 @@ static int safe_sleep(THD* thd, int sec);
static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db,
const char* table_name);
+static int check_master_version(MYSQL* mysql, MASTER_INFO* mi);
+
char* rewrite_db(char* db);
static void free_table_ent(TABLE_RULE_ENT* e)
@@ -333,6 +335,54 @@ static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
return 1;
}
+static int check_master_version(MYSQL* mysql, MASTER_INFO* mi)
+{
+ MYSQL_RES* res;
+ MYSQL_ROW row;
+ const char* version;
+ const char* errmsg = 0;
+
+ if (mc_mysql_query(mysql, "SELECT VERSION()", 0)
+ || !(res = mc_mysql_store_result(mysql)))
+ {
+ sql_print_error("Error checking master version: %s",
+ mc_mysql_error(mysql));
+ return 1;
+ }
+ if (!(row = mc_mysql_fetch_row(res)))
+ {
+ errmsg = "Master returned no rows for SELECT VERSION()";
+ goto err;
+ }
+ if (!(version = row[0]))
+ {
+ errmsg = "Master reported NULL for the version";
+ goto err;
+ }
+
+ switch (*version)
+ {
+ case '3':
+ mi->old_format = 1;
+ break;
+ case '4':
+ mi->old_format = 0;
+ break;
+ default:
+ errmsg = "Master reported unrecognized MySQL version";
+ goto err;
+ }
+err:
+ if (res)
+ mc_mysql_free_result(res);
+ if (errmsg)
+ {
+ sql_print_error(errmsg);
+ return 1;
+ }
+ return 0;
+}
+
static int create_table_from_dump(THD* thd, NET* net, const char* db,
const char* table_name)
@@ -580,7 +630,7 @@ int init_master_info(MASTER_INFO* mi)
mi->inited = 1;
// now change the cache from READ to WRITE - must do this
// before flush_master_info
- reinit_io_cache(&mi->file, WRITE_CACHE, 0L,0,1);
+ reinit_io_cache(&mi->file, WRITE_CACHE,0L,0,1);
error=test(flush_master_info(mi));
pthread_mutex_unlock(&mi->lock);
return error;
@@ -943,12 +993,14 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
{
const char *error_msg;
Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1,
- event_len, &error_msg);
+ event_len, &error_msg,
+ mi->old_format);
if (ev)
{
int type_code = ev->get_type_code();
int exec_res;
- if (ev->server_id == ::server_id || slave_skip_counter)
+ if (ev->server_id == ::server_id ||
+ (slave_skip_counter && ev->get_type_code() != ROTATE_EVENT))
{
if(type_code == LOAD_EVENT)
skip_load_data_infile(net);
@@ -1070,9 +1122,17 @@ connected:
// register ourselves with the master
// if fails, this is not fatal - we just print the error message and go
// on with life
- thd->proc_info = "Registering slave on master";
- register_slave_on_master(mysql);
- update_slave_list(mysql);
+ thd->proc_info = "Checking master version";
+ if (check_master_version(mysql, &glob_mi))
+ {
+ goto err;
+ }
+ if (!glob_mi.old_format)
+ {
+ thd->proc_info = "Registering slave on master";
+ if (register_slave_on_master(mysql) || update_slave_list(mysql))
+ goto err;
+ }
while (!slave_killed(thd))
{
diff --git a/sql/slave.h b/sql/slave.h
index 5c6772147b6..a5bc4d61309 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -23,8 +23,10 @@ typedef struct st_master_info
pthread_mutex_t lock;
pthread_cond_t cond;
bool inited;
+ bool old_format; /* master binlog is in 3.23 format */
- st_master_info():pending(0),fd(-1),last_log_seq(0),inited(0)
+ st_master_info():pending(0),fd(-1),last_log_seq(0),inited(0),
+ old_format(0)
{
host[0] = 0; user[0] = 0; password[0] = 0;
pthread_mutex_init(&lock, MY_MUTEX_INIT_FAST);
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 53e20b3b6d2..698a90c1a28 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -72,15 +72,18 @@ class MYSQL_LOG {
// we should not try to rotate it or write any rotation events
// the user should use FLUSH MASTER instead of FLUSH LOGS for
// purging
-
+ enum cache_type io_cache_type;
+ bool need_start_event;
friend class Log_event;
public:
MYSQL_LOG();
~MYSQL_LOG();
pthread_mutex_t* get_log_lock() { return &LOCK_log; }
+ void set_need_start_event() { need_start_event = 1; }
void set_index_file_name(const char* index_file_name = 0);
- void init(enum_log_type log_type_arg);
+ void init(enum_log_type log_type_arg,
+ enum cache_type io_cache_type_arg = WRITE_CACHE);
void open(const char *log_name,enum_log_type log_type,
const char *new_name=0);
void new_file(bool inside_mutex = 0);
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 3830db48613..20a645e52af 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -22,6 +22,7 @@
#include "mysql_priv.h"
#include "sql_acl.h"
#include "sql_repl.h"
+#include "repl_failsafe.h"
#include <m_ctype.h>
#include <thr_alarm.h>
#include <myisam.h>
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 684c084ece3..3542b288087 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -24,58 +24,15 @@
#include <thr_alarm.h>
#include <my_dir.h>
-#define SLAVE_LIST_CHUNK 128
-#define SLAVE_ERRMSG_SIZE (FN_REFLEN+64)
-
extern const char* any_db;
extern pthread_handler_decl(handle_slave,arg);
-HASH slave_list;
-
#ifndef DBUG_OFF
int max_binlog_dump_events = 0; // unlimited
bool opt_sporadic_binlog_dump_fail = 0;
static int binlog_dump_count = 0;
#endif
-#ifdef SIGNAL_WITH_VIO_CLOSE
-#define KICK_SLAVE { slave_thd->close_active_vio(); \
- thr_alarm_kill(slave_real_id); }
-#else
-#define KICK_SLAVE thr_alarm_kill(slave_real_id);
-#endif
-
-static Slave_log_event* find_slave_event(IO_CACHE* log,
- const char* log_file_name,
- char* errmsg);
-
-static uint32* slave_list_key(SLAVE_INFO* si, uint* len,
- my_bool not_used __attribute__((unused)))
-{
- *len = 4;
- return &si->server_id;
-}
-
-static void slave_info_free(void *s)
-{
- my_free((gptr) s, MYF(MY_WME));
-}
-
-void init_slave_list()
-{
- hash_init(&slave_list, SLAVE_LIST_CHUNK, 0, 0,
- (hash_get_key) slave_list_key, slave_info_free, 0);
- pthread_mutex_init(&LOCK_slave_list, MY_MUTEX_INIT_FAST);
-}
-
-void end_slave_list()
-{
- pthread_mutex_lock(&LOCK_slave_list);
- hash_free(&slave_list);
- pthread_mutex_unlock(&LOCK_slave_list);
- pthread_mutex_destroy(&LOCK_slave_list);
-}
-
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
const char**errmsg)
{
@@ -104,69 +61,6 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
return 0;
}
-#define get_object(p, obj) \
-{\
- uint len = (uint)*p++; \
- if (p + len > p_end || len >= sizeof(obj)) \
- goto err; \
- strmake(obj,(char*) p,len); \
- p+= len; \
-}\
-
-void unregister_slave(THD* thd, bool only_mine, bool need_mutex)
-{
- if (need_mutex)
- pthread_mutex_lock(&LOCK_slave_list);
- if (thd->server_id)
- {
- SLAVE_INFO* old_si;
- if ((old_si = (SLAVE_INFO*)hash_search(&slave_list,
- (byte*)&thd->server_id, 4)) &&
- (!only_mine || old_si->thd == thd))
- hash_delete(&slave_list, (byte*)old_si);
- }
- if (need_mutex)
- pthread_mutex_unlock(&LOCK_slave_list);
-}
-
-int register_slave(THD* thd, uchar* packet, uint packet_length)
-{
- SLAVE_INFO *si;
- int res = 1;
- uchar* p = packet, *p_end = packet + packet_length;
-
- if (check_access(thd, FILE_ACL, any_db))
- return 1;
-
- if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
- goto err;
-
- thd->server_id = si->server_id = uint4korr(p);
- p += 4;
- get_object(p,si->host);
- get_object(p,si->user);
- get_object(p,si->password);
- si->port = uint2korr(p);
- p += 2;
- si->rpl_recovery_rank = uint4korr(p);
- p += 4;
- if (!(si->master_id = uint4korr(p)))
- si->master_id = server_id;
- si->thd = thd;
- pthread_mutex_lock(&LOCK_slave_list);
-
- unregister_slave(thd,0,0);
- res = hash_insert(&slave_list, (byte*) si);
- pthread_mutex_unlock(&LOCK_slave_list);
- return res;
-
-err:
- if (si)
- my_free((gptr) si, MYF(MY_WME));
- return res;
-}
-
-
static int send_file(THD *thd)
{
NET* net = &thd->net;
@@ -252,6 +146,8 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
if (my_b_read(log, (byte*) magic, sizeof(magic)))
{
*errmsg = "I/O error reading the header from the binary log";
+ sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno,
+ log->error);
goto err;
}
if (memcmp(magic, BINLOG_MAGIC, sizeof(magic)))
@@ -887,8 +783,13 @@ void reset_master()
}
LOG_INFO linfo;
+ pthread_mutex_t* log_lock = mysql_bin_log.get_log_lock();
+ pthread_mutex_lock(log_lock);
if (mysql_bin_log.find_first_log(&linfo, ""))
+ {
+ pthread_mutex_unlock(log_lock);
return;
+ }
for(;;)
{
@@ -898,7 +799,9 @@ void reset_master()
}
mysql_bin_log.close(1); // exiting close
my_delete(mysql_bin_log.get_index_fname(), MYF(MY_WME));
+ mysql_bin_log.set_need_start_event();
mysql_bin_log.open(opt_bin_logname,LOG_BIN);
+ pthread_mutex_unlock(log_lock);
}
@@ -915,242 +818,6 @@ int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
return -1;
}
-
-static inline int cmp_master_pos(Slave_log_event* sev, LEX_MASTER_INFO* mi)
-{
- return cmp_master_pos(sev->master_log, sev->master_pos, mi->log_file_name,
- mi->pos);
-}
-
-static int find_target_pos(LEX_MASTER_INFO* mi, IO_CACHE* log, char* errmsg)
-{
- uint32 log_seq = mi->last_log_seq;
- uint32 target_server_id = mi->server_id;
-
- for (;;)
- {
- Log_event* ev;
- if (!(ev = Log_event::read_log_event(log, 0)))
- {
- if (log->error > 0)
- strmov(errmsg, "Binary log truncated in the middle of event");
- else if (log->error < 0)
- strmov(errmsg, "I/O error reading binary log");
- else
- strmov(errmsg, "Could not find target event in the binary log");
- return 1;
- }
-
- if (ev->log_seq == log_seq && ev->server_id == target_server_id)
- {
- delete ev;
- mi->pos = my_b_tell(log);
- return 0;
- }
-
- delete ev;
- }
-}
-
-
-int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg)
-{
- LOG_INFO linfo;
- char search_file_name[FN_REFLEN],last_log_name[FN_REFLEN];
- IO_CACHE log;
- File file = -1, last_file = -1;
- pthread_mutex_t *log_lock;
- const char* errmsg_p;
- Slave_log_event* sev = 0;
- my_off_t last_pos = 0;
- int error = 1;
- int cmp_res;
- LINT_INIT(cmp_res);
-
- if (!mysql_bin_log.is_open())
- {
- strmov(errmsg,"Binary log is not open");
- return 1;
- }
-
- if (!server_id_supplied)
- {
- strmov(errmsg, "Misconfigured master - server id was not set");
- return 1;
- }
-
- linfo.index_file_offset = 0;
-
-
- search_file_name[0] = 0;
-
- if (mysql_bin_log.find_first_log(&linfo, search_file_name))
- {
- strmov(errmsg,"Could not find first log");
- return 1;
- }
- thd->current_linfo = &linfo;
-
- bzero((char*) &log,sizeof(log));
- log_lock = mysql_bin_log.get_log_lock();
- pthread_mutex_lock(log_lock);
-
- for (;;)
- {
- if ((file=open_binlog(&log, linfo.log_file_name, &errmsg_p)) < 0)
- {
- strmov(errmsg, errmsg_p);
- goto err;
- }
-
- if (!(sev = find_slave_event(&log, linfo.log_file_name, errmsg)))
- goto err;
-
- cmp_res = cmp_master_pos(sev, mi);
- delete sev;
-
- if (!cmp_res)
- {
- /* Copy basename */
- fn_format(mi->log_file_name, linfo.log_file_name, "","",1);
- mi->pos = my_b_tell(&log);
- goto mi_inited;
- }
- else if (cmp_res > 0)
- {
- if (!last_pos)
- {
- strmov(errmsg,
- "Slave event in first log points past the target position");
- goto err;
- }
- end_io_cache(&log);
- (void) my_close(file, MYF(MY_WME));
- if (init_io_cache(&log, (file = last_file), IO_SIZE, READ_CACHE, 0, 0,
- MYF(MY_WME)))
- {
- errmsg[0] = 0;
- goto err;
- }
- break;
- }
-
- strmov(last_log_name, linfo.log_file_name);
- last_pos = my_b_tell(&log);
-
- switch (mysql_bin_log.find_next_log(&linfo)) {
- case LOG_INFO_EOF:
- if (last_file >= 0)
- (void)my_close(last_file, MYF(MY_WME));
- last_file = -1;
- goto found_log;
- case 0:
- break;
- default:
- strmov(errmsg, "Error reading log index");
- goto err;
- }
-
- end_io_cache(&log);
- if (last_file >= 0)
- (void) my_close(last_file, MYF(MY_WME));
- last_file = file;
- }
-
-found_log:
- my_b_seek(&log, last_pos);
- if (find_target_pos(mi,&log,errmsg))
- goto err;
- fn_format(mi->log_file_name, last_log_name, "","",1); /* Copy basename */
-
-mi_inited:
- error = 0;
-err:
- pthread_mutex_unlock(log_lock);
- end_io_cache(&log);
- pthread_mutex_lock(&LOCK_thread_count);
- thd->current_linfo = 0;
- pthread_mutex_unlock(&LOCK_thread_count);
- if (file >= 0)
- (void) my_close(file, MYF(MY_WME));
- if (last_file >= 0 && last_file != file)
- (void) my_close(last_file, MYF(MY_WME));
-
- return error;
-}
-
-// caller must delete result when done
-static Slave_log_event* find_slave_event(IO_CACHE* log,
- const char* log_file_name,
- char* errmsg)
-{
- Log_event* ev;
- if (!(ev = Log_event::read_log_event(log, 0)))
- {
- my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
- "Error reading start event in log '%s'",
- (char*)log_file_name);
- return 0;
- }
- delete ev;
-
- if (!(ev = Log_event::read_log_event(log, 0)))
- {
- my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
- "Error reading slave event in log '%s'",
- (char*)log_file_name);
- return 0;
- }
-
- if (ev->get_type_code() != SLAVE_EVENT)
- {
- my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
- "Second event in log '%s' is not slave event",
- (char*)log_file_name);
- delete ev;
- return 0;
- }
-
- return (Slave_log_event*)ev;
-}
-
-
-int show_new_master(THD* thd)
-{
- DBUG_ENTER("show_new_master");
- List<Item> field_list;
- char errmsg[SLAVE_ERRMSG_SIZE];
- LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
-
- errmsg[0]=0; // Safety
- if (translate_master(thd, lex_mi, errmsg))
- {
- if (errmsg[0])
- net_printf(&thd->net, ER_ERROR_WHEN_EXECUTING_COMMAND,
- "SHOW NEW MASTER", errmsg);
- else
- send_error(&thd->net, 0);
-
- DBUG_RETURN(1);
- }
- else
- {
- String* packet = &thd->packet;
- field_list.push_back(new Item_empty_string("Log_name", 20));
- field_list.push_back(new Item_empty_string("Log_pos", 20));
- if (send_fields(thd, field_list, 1))
- DBUG_RETURN(-1);
- packet->length(0);
- net_store_data(packet, lex_mi->log_file_name);
- net_store_data(packet, (longlong)lex_mi->pos);
- if (my_net_write(&thd->net, packet->ptr(), packet->length()))
- DBUG_RETURN(-1);
- send_eof(&thd->net);
- DBUG_RETURN(0);
- }
-
-}
-
int show_binlog_events(THD* thd)
{
DBUG_ENTER("show_binlog_events");
@@ -1202,7 +869,8 @@ int show_binlog_events(THD* thd)
pthread_mutex_lock(mysql_bin_log.get_log_lock());
my_b_seek(&log, pos);
- for (event_count = 0; (ev = Log_event::read_log_event(&log, 0)); )
+ for (event_count = 0;
+ (ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,0)); )
{
if (event_count >= limit_start &&
ev->net_send(thd, linfo.log_file_name, pos))
@@ -1247,56 +915,6 @@ err:
DBUG_RETURN(0);
}
-
-int show_slave_hosts(THD* thd)
-{
- List<Item> field_list;
- NET* net = &thd->net;
- String* packet = &thd->packet;
- DBUG_ENTER("show_slave_hosts");
-
- field_list.push_back(new Item_empty_string("Server_id", 20));
- field_list.push_back(new Item_empty_string("Host", 20));
- if (opt_show_slave_auth_info)
- {
- field_list.push_back(new Item_empty_string("User",20));
- field_list.push_back(new Item_empty_string("Password",20));
- }
- field_list.push_back(new Item_empty_string("Port",20));
- field_list.push_back(new Item_empty_string("Rpl_recovery_rank", 20));
- field_list.push_back(new Item_empty_string("Master_id", 20));
-
- if (send_fields(thd, field_list, 1))
- DBUG_RETURN(-1);
-
- pthread_mutex_lock(&LOCK_slave_list);
-
- for (uint i = 0; i < slave_list.records; ++i)
- {
- SLAVE_INFO* si = (SLAVE_INFO*) hash_element(&slave_list, i);
- packet->length(0);
- net_store_data(packet, si->server_id);
- net_store_data(packet, si->host);
- if (opt_show_slave_auth_info)
- {
- net_store_data(packet, si->user);
- net_store_data(packet, si->password);
- }
- net_store_data(packet, (uint32) si->port);
- net_store_data(packet, si->rpl_recovery_rank);
- net_store_data(packet, si->master_id);
- if (my_net_write(net, (char*)packet->ptr(), packet->length()))
- {
- pthread_mutex_unlock(&LOCK_slave_list);
- DBUG_RETURN(-1);
- }
- }
- pthread_mutex_unlock(&LOCK_slave_list);
- send_eof(net);
- DBUG_RETURN(0);
-}
-
-
int show_binlog_info(THD* thd)
{
DBUG_ENTER("show_binlog_info");
@@ -1402,230 +1020,6 @@ err:
return 1;
}
-
-int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi)
-{
- if (!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
- mi->port, 0, 0))
- {
- sql_print_error("Connection to master failed: %s",
- mc_mysql_error(mysql));
- return 1;
- }
- return 0;
-}
-
-
-static inline void cleanup_mysql_results(MYSQL_RES* db_res,
- MYSQL_RES** cur, MYSQL_RES** start)
-{
- for( ; cur >= start; --cur)
- {
- if (*cur)
- mc_mysql_free_result(*cur);
- }
- mc_mysql_free_result(db_res);
-}
-
-
-static inline int fetch_db_tables(THD* thd, MYSQL* mysql, const char* db,
- MYSQL_RES* table_res)
-{
- MYSQL_ROW row;
-
- for( row = mc_mysql_fetch_row(table_res); row;
- row = mc_mysql_fetch_row(table_res))
- {
- TABLE_LIST table;
- const char* table_name = row[0];
- int error;
- if (table_rules_on)
- {
- table.next = 0;
- table.db = (char*)db;
- table.real_name = (char*)table_name;
- table.updating = 1;
- if (!tables_ok(thd, &table))
- continue;
- }
-
- if ((error = fetch_nx_table(thd, db, table_name, &glob_mi, mysql)))
- return error;
- }
-
- return 0;
-}
-
-
-int load_master_data(THD* thd)
-{
- MYSQL mysql;
- MYSQL_RES* master_status_res = 0;
- bool slave_was_running = 0;
- int error = 0;
-
- mc_mysql_init(&mysql);
-
- // we do not want anyone messing with the slave at all for the entire
- // duration of the data load;
- pthread_mutex_lock(&LOCK_slave);
-
- // first, kill the slave
- if ((slave_was_running = slave_running))
- {
- abort_slave = 1;
- KICK_SLAVE;
- thd->proc_info = "waiting for slave to die";
- while (slave_running)
- pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done
- }
-
-
- if (connect_to_master(thd, &mysql, &glob_mi))
- {
- net_printf(&thd->net, error = ER_CONNECT_TO_MASTER,
- mc_mysql_error(&mysql));
- goto err;
- }
-
- // now that we are connected, get all database and tables in each
- {
- MYSQL_RES *db_res, **table_res, **table_res_end, **cur_table_res;
- uint num_dbs;
-
- if (mc_mysql_query(&mysql, "show databases", 0) ||
- !(db_res = mc_mysql_store_result(&mysql)))
- {
- net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
- mc_mysql_error(&mysql));
- goto err;
- }
-
- if (!(num_dbs = (uint) mc_mysql_num_rows(db_res)))
- goto err;
- // in theory, the master could have no databases at all
- // and run with skip-grant
-
- if (!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*))))
- {
- net_printf(&thd->net, error = ER_OUTOFMEMORY);
- goto err;
- }
-
- // this is a temporary solution until we have online backup
- // capabilities - to be replaced once online backup is working
- // we wait to issue FLUSH TABLES WITH READ LOCK for as long as we
- // can to minimize the lock time
- if (mc_mysql_query(&mysql, "FLUSH TABLES WITH READ LOCK", 0) ||
- mc_mysql_query(&mysql, "SHOW MASTER STATUS",0) ||
- !(master_status_res = mc_mysql_store_result(&mysql)))
- {
- net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
- mc_mysql_error(&mysql));
- goto err;
- }
-
- // go through every table in every database, and if the replication
- // rules allow replicating it, get it
-
- table_res_end = table_res + num_dbs;
-
- for(cur_table_res = table_res; cur_table_res < table_res_end;
- cur_table_res++)
- {
- // since we know how many rows we have, this can never be NULL
- MYSQL_ROW row = mc_mysql_fetch_row(db_res);
- char* db = row[0];
-
- /*
- Do not replicate databases excluded by rules
- also skip mysql database - in most cases the user will
- mess up and not exclude mysql database with the rules when
- he actually means to - in this case, he is up for a surprise if
- his priv tables get dropped and downloaded from master
- TO DO - add special option, not enabled
- by default, to allow inclusion of mysql database into load
- data from master
- */
-
- if (!db_ok(db, replicate_do_db, replicate_ignore_db) ||
- !strcmp(db,"mysql"))
- {
- *cur_table_res = 0;
- continue;
- }
-
- if (mysql_rm_db(thd, db, 1,1) ||
- mysql_create_db(thd, db, 0, 1))
- {
- send_error(&thd->net, 0, 0);
- cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
- goto err;
- }
-
- if (mc_mysql_select_db(&mysql, db) ||
- mc_mysql_query(&mysql, "show tables", 0) ||
- !(*cur_table_res = mc_mysql_store_result(&mysql)))
- {
- net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
- mc_mysql_error(&mysql));
- cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
- goto err;
- }
-
- if ((error = fetch_db_tables(thd, &mysql, db, *cur_table_res)))
- {
- // we do not report the error - fetch_db_tables handles it
- cleanup_mysql_results(db_res, cur_table_res, table_res);
- goto err;
- }
- }
-
- cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
-
- // adjust position in the master
- if (master_status_res)
- {
- MYSQL_ROW row = mc_mysql_fetch_row(master_status_res);
-
- /*
- We need this check because the master may not be running with
- log-bin, but it will still allow us to do all the steps
- of LOAD DATA FROM MASTER - no reason to forbid it, really,
- although it does not make much sense for the user to do it
- */
- if (row[0] && row[1])
- {
- strmake(glob_mi.log_file_name, row[0], sizeof(glob_mi.log_file_name));
- glob_mi.pos = atoi(row[1]); // atoi() is ok, since offset is <= 1GB
- if (glob_mi.pos < 4)
- glob_mi.pos = 4; // don't hit the magic number
- glob_mi.pending = 0;
- flush_master_info(&glob_mi);
- }
-
- mc_mysql_free_result(master_status_res);
- }
-
- if (mc_mysql_query(&mysql, "UNLOCK TABLES", 0))
- {
- net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
- mc_mysql_error(&mysql));
- goto err;
- }
- }
-
-err:
- pthread_mutex_unlock(&LOCK_slave);
- if (slave_was_running)
- start_slave(0, 0);
- mc_mysql_close(&mysql); // safe to call since we always do mc_mysql_init()
- if (!error)
- send_ok(&thd->net);
-
- return error;
-}
-
int log_loaded_block(IO_CACHE* file)
{
LOAD_FILE_INFO* lf_info;
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
index c12e0536058..4b9f741dde7 100644
--- a/sql/sql_repl.h
+++ b/sql/sql_repl.h
@@ -15,7 +15,6 @@ typedef struct st_slave_info
} SLAVE_INFO;
extern bool opt_show_slave_auth_info, opt_old_rpl_compat;
-extern HASH slave_list;
extern char* master_host;
extern my_string opt_bin_logname, master_info_file;
extern uint32 server_id;
@@ -27,26 +26,24 @@ extern int max_binlog_dump_events;
extern bool opt_sporadic_binlog_dump_fail;
#endif
+#ifdef SIGNAL_WITH_VIO_CLOSE
+#define KICK_SLAVE { slave_thd->close_active_vio(); \
+ thr_alarm_kill(slave_real_id); }
+#else
+#define KICK_SLAVE thr_alarm_kill(slave_real_id);
+#endif
+
File open_binlog(IO_CACHE *log, const char *log_file_name,
const char **errmsg);
int start_slave(THD* thd = 0, bool net_report = 1);
int stop_slave(THD* thd = 0, bool net_report = 1);
-int load_master_data(THD* thd);
-int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi);
int change_master(THD* thd);
-int show_new_master(THD* thd);
-int show_slave_hosts(THD* thd);
int show_binlog_events(THD* thd);
-int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg);
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
const char* log_file_name2, ulonglong log_pos2);
void reset_slave();
void reset_master();
-void init_slave_list();
-void end_slave_list();
-int register_slave(THD* thd, uchar* packet, uint packet_length);
-void unregister_slave(THD* thd, bool only_mine, bool need_mutex);
int purge_master_logs(THD* thd, const char* to_log);
bool log_in_use(const char* log_name);
void adjust_linfo_offsets(my_off_t purge_offset);