summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <sasha@mysql.sashanet.com>2002-01-19 19:16:52 -0700
committerunknown <sasha@mysql.sashanet.com>2002-01-19 19:16:52 -0700
commit5df61c3cdc4499197e420a76b25b942dce0f3ccc (patch)
tree87da2fd65f79c28f4b97c4619f95b07797107d82
parent0831ce1c616296196eff82065da469156b4def82 (diff)
downloadmariadb-git-5df61c3cdc4499197e420a76b25b942dce0f3ccc.tar.gz
Here comes a nasty patch, although I am not ready to push it yet. I will
first pull, merge,test, and get it to work. The main change is the new replication code - now we have two slave threads SQL thread and I/O thread. I have also re-written a lot of the code to prepare for multi-master implementation. I also documented IO_CACHE quite extensively and to some extend, THD class. Makefile.am: moved tags target script into a separate file include/my_sys.h: fixes in IO_CACHE for SEQ_READ_APPEND + some documentation libmysqld/lib_sql.cc: updated replication locks, but now I see I did it wrong and it won't compile. Will fix before the push. mysql-test/r/rpl000014.result: test result update mysql-test/r/rpl000015.result: test result update mysql-test/r/rpl000016.result: test result update mysql-test/r/rpl_log.result: test result update mysql-test/t/rpl000016-slave.sh: remove relay logs mysql-test/t/rpl000017-slave.sh: remove relay logs mysql-test/t/rpl_log.test: updated test mysys/mf_iocache.c: IO_CACHE updates to make replication work mysys/mf_iocache2.c: IO_CACHE update to make replication work mysys/thr_mutex.c: cosmetic change sql/item_func.cc: new replication code sql/lex.h: new replication sql/log.cc: new replication sql/log_event.cc: new replication sql/log_event.h: new replication sql/mini_client.cc: new replication sql/mini_client.h: new replication sql/mysql_priv.h: new replication sql/mysqld.cc: new replication sql/repl_failsafe.cc: new replication sql/slave.cc: new replication sql/slave.h: new replication sql/sql_class.cc: new replication sql/sql_class.h: new replication sql/sql_lex.h: new replication sql/sql_parse.cc: new replication sql/sql_repl.cc: new replication sql/sql_repl.h: new replication sql/sql_show.cc: new replication sql/sql_yacc.yy: new replication sql/stacktrace.c: more robust stack tracing sql/structs.h: new replication code BitKeeper/etc/ignore: Added mysql-test/r/rpl000002.eval mysql-test/r/rpl000014.eval mysql-test/r/rpl000015.eval mysql-test/r/rpl000016.eval mysql-test/r/slave-running.eval mysql-test/r/slave-stopped.eval to the ignore list
-rw-r--r--.bzrignore6
-rw-r--r--Makefile.am7
-rw-r--r--include/my_sys.h80
-rw-r--r--libmysqld/lib_sql.cc11
-rw-r--r--mysql-test/r/rpl000014.result16
-rw-r--r--mysql-test/r/rpl000015.result16
-rw-r--r--mysql-test/r/rpl000016.result12
-rw-r--r--mysql-test/r/rpl_log.result116
-rwxr-xr-xmysql-test/resolve-stack8
-rw-r--r--mysql-test/t/rpl000016-slave.opt1
-rwxr-xr-xmysql-test/t/rpl000016-slave.sh1
-rwxr-xr-xmysql-test/t/rpl000017-slave.sh1
-rw-r--r--mysql-test/t/rpl_log.test10
-rw-r--r--mysys/mf_iocache.c57
-rw-r--r--mysys/mf_iocache2.c18
-rw-r--r--mysys/thr_mutex.c3
-rw-r--r--sql/item_func.cc4
-rw-r--r--sql/lex.h1
-rw-r--r--sql/log.cc296
-rw-r--r--sql/log_event.cc194
-rw-r--r--sql/log_event.h40
-rw-r--r--sql/mini_client.cc43
-rw-r--r--sql/mini_client.h54
-rw-r--r--sql/mysql_priv.h10
-rw-r--r--sql/mysqld.cc84
-rw-r--r--sql/repl_failsafe.cc89
-rw-r--r--sql/slave.cc1416
-rw-r--r--sql/slave.h343
-rw-r--r--sql/sql_class.cc11
-rw-r--r--sql/sql_class.h108
-rw-r--r--sql/sql_lex.h1
-rw-r--r--sql/sql_parse.cc51
-rw-r--r--sql/sql_repl.cc333
-rw-r--r--sql/sql_repl.h18
-rw-r--r--sql/sql_show.cc9
-rw-r--r--sql/sql_yacc.yy21
-rw-r--r--sql/stacktrace.c2
-rw-r--r--sql/structs.h2
-rwxr-xr-xsupport-files/build-tags9
39 files changed, 2467 insertions, 1035 deletions
diff --git a/.bzrignore b/.bzrignore
index b741024bb95..41f886b0102 100644
--- a/.bzrignore
+++ b/.bzrignore
@@ -446,3 +446,9 @@ vio/test-sslclient
vio/test-sslserver
vio/viotest-ssl
sql-bench/test-transactions
+mysql-test/r/rpl000002.eval
+mysql-test/r/rpl000014.eval
+mysql-test/r/rpl000015.eval
+mysql-test/r/rpl000016.eval
+mysql-test/r/slave-running.eval
+mysql-test/r/slave-stopped.eval
diff --git a/Makefile.am b/Makefile.am
index 7257617b8e6..9b8381e63d5 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -74,14 +74,11 @@ bin-dist: all
$(top_builddir)/scripts/make_binary_distribution
tags:
- rm -f TAGS
- find -not -path \*SCCS\* -and \
- \( -name \*.cc -or -name \*.h -or -name \*.yy -or -name \*.c \) \
- -print -exec etags -o TAGS --append {} \;
-
+ support-files/build-tags
.PHONY: init-db bin-dist
# Test installation
test:
cd mysql-test ; ./mysql-test-run
+
diff --git a/include/my_sys.h b/include/my_sys.h
index 2a5dfb4d184..3fd939154d2 100644
--- a/include/my_sys.h
+++ b/include/my_sys.h
@@ -296,35 +296,105 @@ typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*);
typedef struct st_io_cache /* Used when cacheing files */
{
+ /* pos_in_file is offset in file corresponding to the first byte of
+ byte* buffer. end_of_file is the offset of end of file for READ_CACHE
+ and WRITE_CACHE. For SEQ_READ_APPEND it the maximum of the actual
+ end of file and the position represented by read_end.
+ */
my_off_t pos_in_file,end_of_file;
+ /* read_pos points to current read position in the buffer
+ read_end is the non-inclusive boundary in the buffer for the currently
+ valid read area
+ buffer is the read buffer
+ not sure about request_pos except that it is used in async_io
+ */
byte *read_pos,*read_end,*buffer,*request_pos;
+ /* write_buffer is used only in WRITE caches and in SEQ_READ_APPEND to
+ buffer writes
+ append_read_pos is only used in SEQ_READ_APPEND, and points to the
+ current read position in the write buffer. Note that reads in
+ SEQ_READ_APPEND caches can happen from both read buffer (byte* buffer),
+ and write buffer (byte* write_buffer).
+ write_pos points to current write position in the write buffer and
+ write_end is the non-inclusive boundary of the valid write area
+ */
byte *write_buffer, *append_read_pos, *write_pos, *write_end;
+ /* current_pos and current_end are convenience variables used by
+ my_b_tell() and other routines that need to know the current offset
+ current_pos points to &write_pos, and current_end to &write_end in a
+ WRITE_CACHE, and &read_pos and &read_end respectively otherwise
+ */
byte **current_pos, **current_end;
-/* The lock is for append buffer used in READ_APPEND cache */
+/* The lock is for append buffer used in SEQ_READ_APPEND cache */
#ifdef THREAD
pthread_mutex_t append_buffer_lock;
/* need mutex copying from append buffer to read buffer */
-#endif
+#endif
+ /* a caller will use my_b_read() macro to read from the cache
+ if the data is already in cache, it will be simply copied with
+ memcpy() and internal variables will be accordinging updated with
+ no functions invoked. However, if the data is not fully in the cache,
+ my_b_read() will call read_function to fetch the data. read_function
+ must never be invoked directly
+ */
int (*read_function)(struct st_io_cache *,byte *,uint);
+ /* same idea as in the case of read_function, except my_b_write() needs to
+ be replaced with my_b_append() for a SEQ_READ_APPEND cache
+ */
int (*write_function)(struct st_io_cache *,const byte *,uint);
+ /* specifies the type of the cache. Depending on the type of the cache
+ certain operations might not be available and yield unpredicatable
+ results. Details to be documented later
+ */
enum cache_type type;
- /* callbacks when the actual read I/O happens */
+ /* callbacks when the actual read I/O happens. These were added and
+ are currently used for binary logging of LOAD DATA INFILE - when a
+ block is read from the file, we create a block create/append event, and
+ when IO_CACHE is closed, we create an end event. These functions could,
+ of course be used for other things
+ */
IO_CACHE_CALLBACK pre_read;
IO_CACHE_CALLBACK post_read;
IO_CACHE_CALLBACK pre_close;
void* arg; /* for use by pre/post_read */
char *file_name; /* if used with 'open_cached_file' */
char *dir,*prefix;
- File file;
+ File file; /* file descriptor */
+ /* seek_not_done is set by my_b_seek() to inform the upcoming read/write
+ operation that a seek needs to be preformed prior to the actual I/O
+ error is 0 if the cache operation was successful, -1 if there was a
+ "hard" error, and the actual number of I/O-ed bytes if the read/write was
+ partial
+ */
int seek_not_done,error;
+ /* buffer_length is the size of memory allocated for buffer or write_buffer
+ read_length is the same as buffer_length except when we use async io
+ not sure why we need it
+ */
uint buffer_length,read_length;
myf myflags; /* Flags used to my_read/my_write */
/*
+ alloced_buffer is 1 if the buffer was allocated by init_io_cache() and
+ 0 if it was supplied by the user
Currently READ_NET is the only one that will use a buffer allocated
somewhere else
*/
my_bool alloced_buffer;
+ /* init_count is incremented every time we call init_io_cache()
+ It is not reset in end_io_cache(). This variable
+ was introduced for slave relay logs - RELAY_LOG_INFO stores a pointer
+ to IO_CACHE that could in some cases refer to the IO_CACHE of the
+ currently active relay log. The IO_CACHE then could be closed,
+ re-opened and start pointing to a different log file. In that case,
+ we could not know reliably if this happened without init_count
+ one must be careful with bzero() prior to the subsequent init_io_cache()
+ call
+ */
+ int init_count;
#ifdef HAVE_AIOWAIT
+ /* as inidicated by ifdef, this is for async I/O, we will have
+ Sinisa comment this some time
+ */
uint inited;
my_off_t aio_read_pos;
my_aio_result aio_result;
@@ -369,6 +439,8 @@ typedef int (*qsort2_cmp)(const void *, const void *, const void *);
#define my_b_tell(info) ((info)->pos_in_file + \
(uint) (*(info)->current_pos - (info)->request_pos))
+#define my_b_append_tell(info) ((info)->end_of_file + \
+ (uint) ((info)->write_pos - (info)->write_buffer))
#define my_b_bytes_in_cache(info) (uint) (*(info)->current_end - \
*(info)->current_pos)
diff --git a/libmysqld/lib_sql.cc b/libmysqld/lib_sql.cc
index d241ceaada0..f22cc1b6101 100644
--- a/libmysqld/lib_sql.cc
+++ b/libmysqld/lib_sql.cc
@@ -396,8 +396,8 @@ int STDCALL mysql_server_init(int argc, char **argv, char **groups)
(void) pthread_mutex_init(&LOCK_bytes_sent,MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_bytes_received,MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_timezone,MY_MUTEX_INIT_FAST);
- (void) pthread_mutex_init(&LOCK_binlog_update, MY_MUTEX_INIT_FAST); // QQ NOT USED
- (void) pthread_mutex_init(&LOCK_slave, MY_MUTEX_INIT_FAST);
+ (void) pthread_mutex_init(&LOCK_slave_io, MY_MUTEX_INIT_FAST);
+ (void) pthread_mutex_init(&LOCK_slave_sql, MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_server_id, MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_user_conn, MY_MUTEX_INIT_FAST);
(void) pthread_cond_init(&COND_thread_count,NULL);
@@ -406,8 +406,11 @@ int STDCALL mysql_server_init(int argc, char **argv, char **groups)
(void) pthread_cond_init(&COND_flush_thread_cache,NULL);
(void) pthread_cond_init(&COND_manager,NULL);
(void) pthread_cond_init(&COND_binlog_update, NULL);
- (void) pthread_cond_init(&COND_slave_stopped, NULL);
- (void) pthread_cond_init(&COND_slave_start, NULL);
+ (void) pthread_cond_init(&COND_slave_log_update, NULL);
+ (void) pthread_cond_init(&COND_slave_sql_stop, NULL);
+ (void) pthread_cond_init(&COND_slave_sql_start, NULL);
+ (void) pthread_cond_init(&COND_slave_sql_stop, NULL);
+ (void) pthread_cond_init(&COND_slave_sql_start, NULL);
if (set_default_charset_by_name(default_charset, MYF(MY_WME)))
{
diff --git a/mysql-test/r/rpl000014.result b/mysql-test/r/rpl000014.result
index e5130792d78..a0646b9c39f 100644
--- a/mysql-test/r/rpl000014.result
+++ b/mysql-test/r/rpl000014.result
@@ -7,22 +7,22 @@ show master status;
File Position Binlog_do_db Binlog_ignore_db
master-bin.001 79
show slave status;
-Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
-127.0.0.1 root $MASTER_MYPORT 1 master-bin.001 79 Yes 0 0 1
+Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos
+127.0.0.1 root 9306 1 master-bin.001 79 mysql-relay-bin.002 120 master-bin.001 Yes Yes 0 0 79
change master to master_log_pos=73;
slave stop;
change master to master_log_pos=73;
show slave status;
-Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
-127.0.0.1 root $MASTER_MYPORT 1 master-bin.001 73 No 0 0 1
+Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos
+127.0.0.1 root 9306 1 master-bin.001 73 mysql-relay-bin.001 4 master-bin.001 No No 0 0 73
slave start;
show slave status;
-Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
-127.0.0.1 root $MASTER_MYPORT 1 master-bin.001 73 Yes 0 0 1
+Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos
+127.0.0.1 root 9306 1 master-bin.001 73 mysql-relay-bin.001 4 master-bin.001 Yes Yes 0 0 73
change master to master_log_pos=173;
show slave status;
-Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
-127.0.0.1 root $MASTER_MYPORT 1 master-bin.001 173 Yes 0 0 1
+Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos
+127.0.0.1 root 9306 1 master-bin.001 173 mysql-relay-bin.001 4 master-bin.001 Yes Yes 0 0 173
show master status;
File Position Binlog_do_db Binlog_ignore_db
master-bin.001 79
diff --git a/mysql-test/r/rpl000015.result b/mysql-test/r/rpl000015.result
index e3bde4a5abd..571c4c0b1e3 100644
--- a/mysql-test/r/rpl000015.result
+++ b/mysql-test/r/rpl000015.result
@@ -4,21 +4,21 @@ File Position Binlog_do_db Binlog_ignore_db
master-bin.001 79
reset slave;
show slave status;
-Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
- 0 0 0 No 0 0 0
+Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos
+ 0 0 0 0 No No 0 0 0
change master to master_host='127.0.0.1';
show slave status;
-Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
-127.0.0.1 test 3306 60 4 No 0 0 0
+Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos
+127.0.0.1 test 3306 60 4 mysql-relay-bin.001 4 No No 0 0 0
change master to master_host='127.0.0.1',master_user='root',
master_password='',master_port=9306;
show slave status;
-Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
-127.0.0.1 root 9306 60 4 No 0 0 0
+Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos
+127.0.0.1 root 9306 60 4 mysql-relay-bin.001 4 No No 0 0 0
slave start;
show slave status;
-Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
-127.0.0.1 root 9306 60 master-bin.001 79 Yes 0 0 1
+Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos
+127.0.0.1 root 9306 60 master-bin.001 79 mysql-relay-bin.001 120 master-bin.001 Yes Yes 0 0 79
drop table if exists t1;
create table t1 (n int);
insert into t1 values (10),(45),(90);
diff --git a/mysql-test/r/rpl000016.result b/mysql-test/r/rpl000016.result
index 405e0302161..e3510ec2d94 100644
--- a/mysql-test/r/rpl000016.result
+++ b/mysql-test/r/rpl000016.result
@@ -14,8 +14,8 @@ drop table if exists t1;
create table t1 (s text);
insert into t1 values('Could not break slave'),('Tried hard');
show slave status;
-Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
-127.0.0.1 root 9306 60 master-bin.001 234 Yes 0 0 3
+Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos
+127.0.0.1 root 9306 60 master-bin.001 234 mysql-relay-bin.001 275 master-bin.001 Yes Yes 0 0 234
select * from t1;
s
Could not break slave
@@ -42,8 +42,8 @@ Log_name
master-bin.003
insert into t2 values (65);
show slave status;
-Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
-127.0.0.1 root 9306 60 master-bin.003 155 Yes 0 0 3
+Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos
+127.0.0.1 root 9306 60 master-bin.003 155 mysql-relay-bin.001 793 master-bin.003 Yes Yes 0 0 155
select * from t2;
m
34
@@ -65,8 +65,8 @@ master-bin.006 445
slave stop;
slave start;
show slave status;
-Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
-127.0.0.1 root 9306 60 master-bin.006 445 Yes 0 0 7
+Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos
+127.0.0.1 root 9306 60 master-bin.006 445 mysql-relay-bin.004 1376 master-bin.006 Yes Yes 0 0 445
lock tables t3 read;
select count(*) from t3 where n >= 4;
count(*)
diff --git a/mysql-test/r/rpl_log.result b/mysql-test/r/rpl_log.result
index 27a19f49874..4e6588d39f2 100644
--- a/mysql-test/r/rpl_log.result
+++ b/mysql-test/r/rpl_log.result
@@ -15,48 +15,48 @@ create table t1 (word char(20) not null);
load data infile '../../std_data/words.dat' into table t1;
drop table t1;
show binlog events;
-Log_name Pos Event_type Server_id Log_seq Info
-master-bin.001 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2
-master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
-master-bin.001 172 Intvar 1 3 INSERT_ID=1
-master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
-master-bin.001 263 Query 1 5 use test; drop table t1
-master-bin.001 311 Query 1 6 use test; create table t1 (word char(20) not null)
-master-bin.001 386 Create_file 1 7 db=test;table=t1;file_id=1;block_len=81
-master-bin.001 556 Exec_load 1 8 ;file_id=1
-master-bin.001 579 Query 1 9 use test; drop table t1
+Log_name Pos Event_type Server_id Orig_log_pos Info
+master-bin.001 4 Start 1 4 Server ver: 4.0.1-alpha-debug-log, Binlog ver: 3
+master-bin.001 79 Query 1 79 use test; create table t1(n int not null auto_increment primary key)
+master-bin.001 172 Intvar 1 172 INSERT_ID=1
+master-bin.001 200 Query 1 200 use test; insert into t1 values (NULL)
+master-bin.001 263 Query 1 263 use test; drop table t1
+master-bin.001 311 Query 1 311 use test; create table t1 (word char(20) not null)
+master-bin.001 386 Create_file 1 386 db=test;table=t1;file_id=1;block_len=81
+master-bin.001 556 Exec_load 1 556 ;file_id=1
+master-bin.001 579 Query 1 579 use test; drop table t1
show binlog events from 79 limit 1;
-Log_name Pos Event_type Server_id Log_seq Info
-master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
+Log_name Pos Event_type Server_id Orig_log_pos Info
+master-bin.001 79 Query 1 79 use test; create table t1(n int not null auto_increment primary key)
show binlog events from 79 limit 2;
-Log_name Pos Event_type Server_id Log_seq Info
-master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
-master-bin.001 172 Intvar 1 3 INSERT_ID=1
+Log_name Pos Event_type Server_id Orig_log_pos Info
+master-bin.001 79 Query 1 79 use test; create table t1(n int not null auto_increment primary key)
+master-bin.001 172 Intvar 1 172 INSERT_ID=1
show binlog events from 79 limit 2,1;
-Log_name Pos Event_type Server_id Log_seq Info
-master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
+Log_name Pos Event_type Server_id Orig_log_pos Info
+master-bin.001 200 Query 1 200 use test; insert into t1 values (NULL)
flush logs;
create table t1 (n int);
insert into t1 values (1);
drop table t1;
show binlog events;
-Log_name Pos Event_type Server_id Log_seq Info
-master-bin.001 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2
-master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
-master-bin.001 172 Intvar 1 3 INSERT_ID=1
-master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
-master-bin.001 263 Query 1 5 use test; drop table t1
-master-bin.001 311 Query 1 6 use test; create table t1 (word char(20) not null)
-master-bin.001 386 Create_file 1 7 db=test;table=t1;file_id=1;block_len=81
-master-bin.001 556 Exec_load 1 8 ;file_id=1
-master-bin.001 579 Query 1 9 use test; drop table t1
-master-bin.001 627 Rotate 1 10 master-bin.002;pos=4
-master-bin.001 668 Stop 1 11
+Log_name Pos Event_type Server_id Orig_log_pos Info
+master-bin.001 4 Start 1 4 Server ver: 4.0.1-alpha-debug-log, Binlog ver: 3
+master-bin.001 79 Query 1 79 use test; create table t1(n int not null auto_increment primary key)
+master-bin.001 172 Intvar 1 172 INSERT_ID=1
+master-bin.001 200 Query 1 200 use test; insert into t1 values (NULL)
+master-bin.001 263 Query 1 263 use test; drop table t1
+master-bin.001 311 Query 1 311 use test; create table t1 (word char(20) not null)
+master-bin.001 386 Create_file 1 386 db=test;table=t1;file_id=1;block_len=81
+master-bin.001 556 Exec_load 1 556 ;file_id=1
+master-bin.001 579 Query 1 579 use test; drop table t1
+master-bin.001 627 Rotate 1 627 master-bin.002;pos=4
+master-bin.001 668 Stop 1 668
show binlog events in 'master-bin.002';
-Log_name Pos Event_type Server_id Log_seq Info
-master-bin.002 4 Query 1 1 use test; create table t1 (n int)
-master-bin.002 62 Query 1 2 use test; insert into t1 values (1)
-master-bin.002 122 Query 1 3 use test; drop table t1
+Log_name Pos Event_type Server_id Orig_log_pos Info
+master-bin.002 4 Query 1 4 use test; create table t1 (n int)
+master-bin.002 62 Query 1 62 use test; insert into t1 values (1)
+master-bin.002 122 Query 1 122 use test; drop table t1
show master logs;
Log_name
master-bin.001
@@ -67,45 +67,45 @@ Log_name
slave-bin.001
slave-bin.002
show binlog events in 'slave-bin.001' from 4;
-Log_name Pos Event_type Server_id Log_seq Info
-slave-bin.001 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2
-slave-bin.001 79 Slave 2 3 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.001,pos=4
-slave-bin.001 132 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
-slave-bin.001 225 Intvar 1 3 INSERT_ID=1
-slave-bin.001 253 Query 1 4 use test; insert into t1 values (NULL)
-slave-bin.001 316 Query 1 5 use test; drop table t1
-slave-bin.001 364 Query 1 6 use test; create table t1 (word char(20) not null)
-slave-bin.001 439 Create_file 1 7 db=test;table=t1;file_id=1;block_len=81
-slave-bin.001 618 Exec_load 1 8 ;file_id=1
-slave-bin.001 641 Query 1 9 use test; drop table t1
-slave-bin.001 689 Rotate 1 4 slave-bin.002;pos=4; forced by master
-slave-bin.001 729 Stop 2 5
+Log_name Pos Event_type Server_id Orig_log_pos Info
+slave-bin.001 4 Start 2 4 Server ver: 4.0.1-alpha-debug-log, Binlog ver: 3
+slave-bin.001 79 Slave 2 79 host=127.0.0.1,port=9306,log=master-bin.001,pos=4
+slave-bin.001 132 Query 1 79 use test; create table t1(n int not null auto_increment primary key)
+slave-bin.001 225 Intvar 1 200 INSERT_ID=1
+slave-bin.001 253 Query 1 200 use test; insert into t1 values (NULL)
+slave-bin.001 316 Query 1 263 use test; drop table t1
+slave-bin.001 364 Query 1 311 use test; create table t1 (word char(20) not null)
+slave-bin.001 439 Create_file 1 386 db=test;table=t1;file_id=1;block_len=81
+slave-bin.001 618 Exec_load 1 556 ;file_id=1
+slave-bin.001 641 Query 1 579 use test; drop table t1
+slave-bin.001 689 Rotate 1 627 slave-bin.002;pos=4; forced by master
+slave-bin.001 729 Stop 2 729
show binlog events in 'slave-bin.002' from 4;
-Log_name Pos Event_type Server_id Log_seq Info
-slave-bin.002 4 Slave 2 10 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4
-slave-bin.002 57 Query 1 1 use test; create table t1 (n int)
-slave-bin.002 115 Query 1 2 use test; insert into t1 values (1)
-slave-bin.002 175 Query 1 3 use test; drop table t1
+Log_name Pos Event_type Server_id Orig_log_pos Info
+slave-bin.002 4 Slave 2 627 host=127.0.0.1,port=9306,log=master-bin.002,pos=4
+slave-bin.002 57 Query 1 4 use test; create table t1 (n int)
+slave-bin.002 115 Query 1 62 use test; insert into t1 values (1)
+slave-bin.002 175 Query 1 122 use test; drop table t1
show slave status;
-Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
-127.0.0.1 root $MASTER_MYPORT 1 master-bin.002 170 Yes 0 0 3
+Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos
+127.0.0.1 root 9306 1 master-bin.002 170 mysql-relay-bin.002 935 master-bin.002 Yes Yes 0 0 170
show new master for slave with master_log_file='master-bin.001' and
-master_log_pos=4 and master_log_seq=1 and master_server_id=1;
+master_log_pos=4 and master_server_id=1;
Log_name Log_pos
slave-bin.001 132
show new master for slave with master_log_file='master-bin.001' and
-master_log_pos=79 and master_log_seq=2 and master_server_id=1;
+master_log_pos=79 and master_server_id=1;
Log_name Log_pos
slave-bin.001 225
show new master for slave with master_log_file='master-bin.001' and
-master_log_pos=311 and master_log_seq=6 and master_server_id=1;
+master_log_pos=311 and master_server_id=1;
Log_name Log_pos
slave-bin.001 439
show new master for slave with master_log_file='master-bin.002' and
-master_log_pos=4 and master_log_seq=1 and master_server_id=1;
+master_log_pos=4 and master_server_id=1;
Log_name Log_pos
slave-bin.002 57
show new master for slave with master_log_file='master-bin.002' and
-master_log_pos=137 and master_log_seq=3 and master_server_id=1;
+master_log_pos=122 and master_server_id=1;
Log_name Log_pos
slave-bin.002 223
diff --git a/mysql-test/resolve-stack b/mysql-test/resolve-stack
new file mode 100755
index 00000000000..cdbe362c752
--- /dev/null
+++ b/mysql-test/resolve-stack
@@ -0,0 +1,8 @@
+#! /bin/sh
+# A shortcut for resolving stacks when debugging when
+# we cannot duplicate the crash in a debugger and have to
+# resort to using stack traces
+
+nm --numeric-sort ../sql/mysqld > var/tmp/mysqld.sym
+echo "Please type or paste the numeric stack trace,Ctrl-C to quit:"
+../extra/resolve_stack_dump -s var/tmp/mysqld.sym
diff --git a/mysql-test/t/rpl000016-slave.opt b/mysql-test/t/rpl000016-slave.opt
new file mode 100644
index 00000000000..f27601e0d7d
--- /dev/null
+++ b/mysql-test/t/rpl000016-slave.opt
@@ -0,0 +1 @@
+-O max_binlog_size=2048
diff --git a/mysql-test/t/rpl000016-slave.sh b/mysql-test/t/rpl000016-slave.sh
index 62748605af1..9259f593e54 100755
--- a/mysql-test/t/rpl000016-slave.sh
+++ b/mysql-test/t/rpl000016-slave.sh
@@ -1 +1,2 @@
rm -f $MYSQL_TEST_DIR/var/slave-data/master.info
+rm -f $MYSQL_TEST_DIR/var/slave-data/*relay*
diff --git a/mysql-test/t/rpl000017-slave.sh b/mysql-test/t/rpl000017-slave.sh
index c717500ae4a..555427a2376 100755
--- a/mysql-test/t/rpl000017-slave.sh
+++ b/mysql-test/t/rpl000017-slave.sh
@@ -1,3 +1,4 @@
+rm -f $MYSQL_TEST_DIR/var/slave-data/*relay*
cat > $MYSQL_TEST_DIR/var/slave-data/master.info <<EOF
master-bin.001
4
diff --git a/mysql-test/t/rpl_log.test b/mysql-test/t/rpl_log.test
index 841524d57e6..0a5c62ed8d5 100644
--- a/mysql-test/t/rpl_log.test
+++ b/mysql-test/t/rpl_log.test
@@ -37,13 +37,13 @@ show binlog events in 'slave-bin.001' from 4;
show binlog events in 'slave-bin.002' from 4;
show slave status;
show new master for slave with master_log_file='master-bin.001' and
- master_log_pos=4 and master_log_seq=1 and master_server_id=1;
+ master_log_pos=4 and master_server_id=1;
show new master for slave with master_log_file='master-bin.001' and
- master_log_pos=79 and master_log_seq=2 and master_server_id=1;
+ master_log_pos=79 and master_server_id=1;
show new master for slave with master_log_file='master-bin.001' and
- master_log_pos=311 and master_log_seq=6 and master_server_id=1;
+ master_log_pos=311 and master_server_id=1;
show new master for slave with master_log_file='master-bin.002' and
- master_log_pos=4 and master_log_seq=1 and master_server_id=1;
+ master_log_pos=4 and master_server_id=1;
show new master for slave with master_log_file='master-bin.002' and
- master_log_pos=137 and master_log_seq=3 and master_server_id=1;
+ master_log_pos=122 and master_server_id=1;
diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c
index b0868851177..c492a295cc1 100644
--- a/mysys/mf_iocache.c
+++ b/mysys/mf_iocache.c
@@ -123,6 +123,8 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
info->pos_in_file= seek_offset;
info->pre_close = info->pre_read = info->post_read = 0;
info->arg = 0;
+ info->init_count++; /* we assume the user had set it to 0 prior to
+ first call */
info->alloced_buffer = 0;
info->buffer=0;
info->seek_not_done= test(file >= 0);
@@ -447,11 +449,13 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
info->end_of_file)
goto read_append_buffer;
- if (info->seek_not_done)
- { /* File touched, do seek */
- VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)));
- info->seek_not_done=0;
- }
+ /*
+ With read-append cache we must always do a seek before we read,
+ because the write could have moved the file pointer astray
+ */
+ VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)));
+ info->seek_not_done=0;
+
diff_length=(uint) (pos_in_file & (IO_SIZE-1));
/* now the second stage begins - read from file descriptor */
@@ -507,6 +511,13 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
memcpy(Buffer,info->buffer,(size_t) length);
Count -= length;
Buffer += length;
+
+ /*
+ added the line below to make
+ DBUG_ASSERT(pos_in_file==info->end_of_file) pass.
+ otherwise this does not appear to be needed
+ */
+ pos_in_file += length;
goto read_append_buffer;
}
}
@@ -528,10 +539,13 @@ read_append_buffer:
/* First copy the data to Count */
uint len_in_buff = (uint) (info->write_pos - info->append_read_pos);
uint copy_len;
+ uint transfer_len;
DBUG_ASSERT(info->append_read_pos <= info->write_pos);
- DBUG_ASSERT(pos_in_file == info->end_of_file);
-
+ /*
+ TODO: figure out if the below assert is needed or correct.
+ */
+ DBUG_ASSERT(pos_in_file == info->end_of_file);
copy_len=min(Count, len_in_buff);
memcpy(Buffer, info->append_read_pos, copy_len);
info->append_read_pos += copy_len;
@@ -541,11 +555,12 @@ read_append_buffer:
/* Fill read buffer with data from write buffer */
memcpy(info->buffer, info->append_read_pos,
- (size_t) (len_in_buff - copy_len));
+ (size_t) (transfer_len=len_in_buff - copy_len));
info->read_pos= info->buffer;
- info->read_end= info->buffer+(len_in_buff - copy_len);
+ info->read_end= info->buffer+transfer_len;
info->append_read_pos=info->write_pos;
- info->pos_in_file+=len_in_buff;
+ info->pos_in_file=pos_in_file+copy_len;
+ info->end_of_file+=len_in_buff;
}
unlock_append_buffer(info);
return Count ? 1 : 0;
@@ -887,12 +902,10 @@ int flush_io_cache(IO_CACHE *info)
if ((length=(uint) (info->write_pos - info->write_buffer)))
{
pos_in_file=info->pos_in_file;
- if (append_cache)
- {
- pos_in_file=info->end_of_file;
- info->seek_not_done=1;
- }
- if (info->seek_not_done)
+ /* if we have append cache, we always open the file with
+ O_APPEND which moves the pos to EOF automatically on every write
+ */
+ if (!append_cache && info->seek_not_done)
{ /* File touched, do seek */
if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
MY_FILEPOS_ERROR)
@@ -902,20 +915,24 @@ int flush_io_cache(IO_CACHE *info)
if (!append_cache)
info->seek_not_done=0;
}
- info->write_pos= info->write_buffer;
if (!append_cache)
info->pos_in_file+=length;
info->write_end= (info->write_buffer+info->buffer_length-
((pos_in_file+length) & (IO_SIZE-1)));
- /* Set this to be used if we are using SEQ_READ_APPEND */
- info->append_read_pos = info->write_buffer;
if (my_write(info->file,info->write_buffer,length,
info->myflags | MY_NABP))
info->error= -1;
else
info->error= 0;
- set_if_bigger(info->end_of_file,(pos_in_file+length));
+ if (!append_cache)
+ {
+ set_if_bigger(info->end_of_file,(pos_in_file+length));
+ }
+ else
+ info->end_of_file+=(info->write_pos-info->append_read_pos);
+
+ info->append_read_pos=info->write_pos=info->write_buffer;
DBUG_RETURN(info->error);
}
}
diff --git a/mysys/mf_iocache2.c b/mysys/mf_iocache2.c
index a2f10ca3b9f..269e9892308 100644
--- a/mysys/mf_iocache2.c
+++ b/mysys/mf_iocache2.c
@@ -32,12 +32,26 @@
void my_b_seek(IO_CACHE *info,my_off_t pos)
{
- my_off_t offset = (pos - info->pos_in_file);
+ my_off_t offset;
DBUG_ENTER("my_b_seek");
DBUG_PRINT("enter",("pos: %lu", (ulong) pos));
- if (info->type == READ_CACHE)
+ /*
+ TODO: verify that it is OK to do seek in the non-append
+ area in SEQ_READ_APPEND cache
+ */
+ /* TODO:
+ a) see if this always works
+ b) see if there is a better way to make it work
+ */
+ if (info->type == SEQ_READ_APPEND)
+ flush_io_cache(info);
+
+ offset=(pos - info->pos_in_file);
+
+ if (info->type == READ_CACHE || info->type == SEQ_READ_APPEND)
{
+ /* TODO: explain why this works if pos < info->pos_in_file */
if ((ulonglong) offset < (ulonglong) (info->read_end - info->buffer))
{
/* The read is in the current buffer; Reuse it */
diff --git a/mysys/thr_mutex.c b/mysys/thr_mutex.c
index 340f1461823..f3d17e6a5fd 100644
--- a/mysys/thr_mutex.c
+++ b/mysys/thr_mutex.c
@@ -70,7 +70,8 @@ int safe_mutex_lock(safe_mutex_t *mp,const char *file, uint line)
}
if (mp->count++)
{
- fprintf(stderr,"safe_mutex: Error in thread libray: Got mutex at %s, line %d more than 1 time\n", file,line);
+ fprintf(stderr,"safe_mutex: Error in thread libray: Got mutex at %s, \
+line %d more than 1 time\n", file,line);
fflush(stderr);
abort();
}
diff --git a/sql/item_func.cc b/sql/item_func.cc
index fe68d8f47c2..ff6a102fe1f 100644
--- a/sql/item_func.cc
+++ b/sql/item_func.cc
@@ -1453,11 +1453,13 @@ longlong Item_master_pos_wait::val_int()
return 0;
}
ulong pos = (ulong)args[1]->val_int();
- if ((event_count = glob_mi.wait_for_pos(thd, log_name, pos)) == -1)
+ LOCK_ACTIVE_MI;
+ if ((event_count = active_mi->rli.wait_for_pos(thd, log_name, pos)) == -1)
{
null_value = 1;
event_count=0;
}
+ UNLOCK_ACTIVE_MI;
return event_count;
}
diff --git a/sql/lex.h b/sql/lex.h
index 37fe38b76a1..894fdfb5362 100644
--- a/sql/lex.h
+++ b/sql/lex.h
@@ -218,7 +218,6 @@ static SYMBOL symbols[] = {
{ "MASTER_HOST", SYM(MASTER_HOST_SYM),0,0},
{ "MASTER_LOG_FILE", SYM(MASTER_LOG_FILE_SYM),0,0},
{ "MASTER_LOG_POS", SYM(MASTER_LOG_POS_SYM),0,0},
- { "MASTER_LOG_SEQ", SYM(MASTER_LOG_SEQ_SYM),0,0},
{ "MASTER_PASSWORD", SYM(MASTER_PASSWORD_SYM),0,0},
{ "MASTER_PORT", SYM(MASTER_PORT_SYM),0,0},
{ "MASTER_SERVER_ID", SYM(MASTER_SERVER_ID_SYM),0,0},
diff --git a/sql/log.cc b/sql/log.cc
index fa45d938b24..ec7396bda3c 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -29,6 +29,7 @@
#include <my_dir.h>
#include <stdarg.h>
#include <m_ctype.h> // For test_if_number
+#include <assert.h>
MYSQL_LOG mysql_log,mysql_update_log,mysql_slow_log,mysql_bin_log;
extern I_List<i_string> binlog_do_db, binlog_ignore_db;
@@ -81,7 +82,7 @@ static int find_uniq_filename(char *name)
MYSQL_LOG::MYSQL_LOG(): last_time(0), query_start(0),index_file(-1),
name(0), log_type(LOG_CLOSED),write_error(0),
- inited(0), log_seq(1), file_id(1),no_rotate(0),
+ inited(0), file_id(1),no_rotate(0),
need_start_event(1)
{
/*
@@ -138,15 +139,18 @@ bool MYSQL_LOG::open_index( int options)
}
void MYSQL_LOG::init(enum_log_type log_type_arg,
- enum cache_type io_cache_type_arg)
+ enum cache_type io_cache_type_arg,
+ bool no_auto_events_arg)
{
log_type = log_type_arg;
io_cache_type = io_cache_type_arg;
+ no_auto_events = no_auto_events_arg;
if (!inited)
{
inited=1;
(void) pthread_mutex_init(&LOCK_log,MY_MUTEX_INIT_SLOW);
(void) pthread_mutex_init(&LOCK_index, MY_MUTEX_INIT_SLOW);
+ (void) pthread_cond_init(&update_cond, 0);
}
}
@@ -160,16 +164,17 @@ void MYSQL_LOG::close_index()
}
void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
- const char *new_name)
+ const char *new_name, enum cache_type io_cache_type_arg,
+ bool no_auto_events_arg)
{
MY_STAT tmp_stat;
char buff[512];
File file= -1;
bool do_magic;
-
+ int open_flags = O_CREAT | O_APPEND | O_BINARY;
if (!inited && log_type_arg == LOG_BIN && *fn_ext(log_name))
no_rotate = 1;
- init(log_type_arg);
+ init(log_type_arg,io_cache_type_arg,no_auto_events_arg);
if (!(name=my_strdup(log_name,MYF(MY_WME))))
goto err;
@@ -177,7 +182,12 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
strmov(log_file_name,new_name);
else if (generate_new_name(log_file_name, name))
goto err;
-
+
+ if (io_cache_type == SEQ_READ_APPEND)
+ open_flags |= O_RDWR;
+ else
+ open_flags |= O_WRONLY;
+
if (log_type == LOG_BIN && !index_file_name[0])
fn_format(index_file_name, name, mysql_data_home, ".index", 6);
@@ -185,7 +195,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
do_magic = ((log_type == LOG_BIN) && !my_stat(log_file_name,
&tmp_stat, MYF(0)));
- if ((file=my_open(log_file_name,O_CREAT | O_APPEND | O_WRONLY | O_BINARY,
+ if ((file=my_open(log_file_name,open_flags,
MYF(MY_WME | ME_WAITTANG))) < 0 ||
init_io_cache(&log_file, file, IO_SIZE, io_cache_type,
my_tell(file,MYF(MY_WME)), 0, MYF(MY_WME | MY_NABP)))
@@ -235,11 +245,10 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
open_index(O_APPEND | O_RDWR | O_CREAT))
goto err;
- log_seq = 1;
- if (need_start_event)
+ if (need_start_event && !no_auto_events)
{
Start_log_event s;
- s.set_log_seq(0, this);
+ s.set_log_pos(this);
s.write(&log_file);
need_start_event=0;
}
@@ -264,9 +273,7 @@ err:
end_io_cache(&log_file);
x_free(name); name=0;
log_type=LOG_CLOSED;
-
return;
-
}
int MYSQL_LOG::get_current_log(LOG_INFO* linfo)
@@ -279,7 +286,8 @@ int MYSQL_LOG::get_current_log(LOG_INFO* linfo)
}
// if log_name is "" we stop at the first entry
-int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name)
+int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name,
+ bool need_mutex)
{
if (index_file < 0)
return LOG_INFO_INVALID;
@@ -290,7 +298,8 @@ int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name)
// mutex needed because we need to make sure the file pointer does not move
// from under our feet
- pthread_mutex_lock(&LOCK_index);
+ if (need_mutex)
+ pthread_mutex_lock(&LOCK_index);
if (init_io_cache(&io_cache, index_file, IO_SIZE, READ_CACHE, (my_off_t) 0,
0, MYF(MY_WME)))
{
@@ -319,14 +328,15 @@ int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name)
error = 0;
err:
- pthread_mutex_unlock(&LOCK_index);
+ if (need_mutex)
+ pthread_mutex_unlock(&LOCK_index);
end_io_cache(&io_cache);
return error;
}
-int MYSQL_LOG::find_next_log(LOG_INFO* linfo)
+int MYSQL_LOG::find_next_log(LOG_INFO* linfo, bool need_lock)
{
// mutex needed because we need to make sure the file pointer does not move
// from under our feet
@@ -335,8 +345,8 @@ int MYSQL_LOG::find_next_log(LOG_INFO* linfo)
char* fname = linfo->log_file_name;
IO_CACHE io_cache;
uint length;
-
- pthread_mutex_lock(&LOCK_index);
+ if (need_lock)
+ pthread_mutex_lock(&LOCK_index);
if (init_io_cache(&io_cache, index_file, IO_SIZE,
READ_CACHE, (my_off_t) linfo->index_file_offset, 0,
MYF(MY_WME)))
@@ -354,12 +364,126 @@ int MYSQL_LOG::find_next_log(LOG_INFO* linfo)
error = 0;
err:
- pthread_mutex_unlock(&LOCK_index);
+ if (need_lock)
+ pthread_mutex_unlock(&LOCK_index);
end_io_cache(&io_cache);
return error;
}
-
+int MYSQL_LOG::reset_logs(THD* thd)
+{
+ LOG_INFO linfo;
+ int error=0;
+ const char* save_name;
+ enum_log_type save_log_type;
+ pthread_mutex_lock(&LOCK_log);
+ if (find_first_log(&linfo,""))
+ {
+ error=1;
+ goto err;
+ }
+
+ for(;;)
+ {
+ my_delete(linfo.log_file_name, MYF(MY_WME));
+ if (find_next_log(&linfo))
+ break;
+ }
+ save_name=name;
+ name=0;
+ save_log_type=log_type;
+ close(1);
+ my_delete(index_file_name, MYF(MY_WME));
+ if (thd && !thd->slave_thread)
+ need_start_event=1;
+ open(save_name,save_log_type,0,io_cache_type,no_auto_events);
+ my_free((gptr)save_name,MYF(0));
+err:
+ pthread_mutex_unlock(&LOCK_log);
+ return error;
+}
+
+int MYSQL_LOG::purge_first_log(struct st_relay_log_info* rli)
+{
+ // pre-conditions
+ DBUG_ASSERT(is_open());
+ DBUG_ASSERT(index_file >= 0);
+ DBUG_ASSERT(rli->slave_running == 1);
+ DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->relay_log_name));
+ // assume that we have previously read the first log and
+ // stored it in rli->relay_log_name
+ DBUG_ASSERT(rli->linfo.index_file_offset ==
+ strlen(rli->relay_log_name) + 1);
+
+ int tmp_fd;
+
+
+ char* fname, *io_buf;
+ int error = 0;
+ if (!(fname = (char*)my_malloc(IO_SIZE+FN_REFLEN, MYF(MY_WME))))
+ return 1;
+ pthread_mutex_lock(&LOCK_index);
+ my_seek(index_file,rli->linfo.index_file_offset,
+ MY_SEEK_SET, MYF(MY_WME));
+ io_buf = fname + FN_REFLEN;
+ strxmov(fname,rli->relay_log_name,".tmp",NullS);
+
+ if ((tmp_fd = my_open(fname,O_CREAT|O_BINARY|O_RDWR, MYF(MY_WME))) < 0)
+ {
+ error = 1;
+ goto err;
+ }
+ for (;;)
+ {
+ int bytes_read;
+ bytes_read = my_read(index_file, io_buf, IO_SIZE, MYF(0));
+ if (bytes_read < 0) // error
+ {
+ error=1;
+ goto err;
+ }
+ if (!bytes_read)
+ break; // end of file
+ // otherwise, we've read something and need to write it out
+ if (my_write(tmp_fd, io_buf, bytes_read, MYF(MY_WME|MY_NABP)))
+ {
+ error=1;
+ goto err;
+ }
+ }
+err:
+ if (tmp_fd)
+ my_close(tmp_fd, MYF(MY_WME));
+ if (error)
+ my_delete(fname, MYF(0)); // do not report error if the file is not there
+ else
+ {
+ my_close(index_file, MYF(MY_WME));
+ if (my_rename(fname,index_file_name,MYF(MY_WME)) ||
+ (index_file=my_open(index_file_name,O_BINARY|O_RDWR|O_APPEND,
+ MYF(MY_WME)))<0 ||
+ my_delete(rli->relay_log_name, MYF(MY_WME)))
+ error=1;
+ if ((error=find_first_log(&rli->linfo,"",0/*no mutex*/)))
+ {
+ char buff[22];
+ sql_print_error("next log error=%d,offset=%s,log=%s",error,
+ llstr(rli->linfo.index_file_offset,buff),
+ rli->linfo.log_file_name);
+ goto err2;
+ }
+ rli->relay_log_pos = 4;
+ strnmov(rli->relay_log_name,rli->linfo.log_file_name,
+ sizeof(rli->relay_log_name));
+ }
+ // no need to free io_buf because we allocated both fname and io_buf in
+ // one malloc()
+err2:
+ pthread_mutex_unlock(&LOCK_index);
+ my_free(fname, MYF(MY_WME));
+ return error;
+}
+
int MYSQL_LOG::purge_logs(THD* thd, const char* to_log)
{
if (index_file < 0) return LOG_INFO_INVALID;
@@ -395,9 +519,8 @@ int MYSQL_LOG::purge_logs(THD* thd, const char* to_log)
goto err;
}
logs_to_keep_inited = 1;
-
- for(;;)
+ for (;;)
{
my_off_t init_purge_offset= my_b_tell(&io_cache);
if (!(fname_len=my_b_gets(&io_cache, fname, FN_REFLEN)))
@@ -409,14 +532,14 @@ int MYSQL_LOG::purge_logs(THD* thd, const char* to_log)
}
fname[--fname_len]=0; // kill \n
- if(!memcmp(fname, to_log, fname_len + 1 ))
+ if (!memcmp(fname, to_log, fname_len + 1 ))
{
found_log = 1;
purge_offset = init_purge_offset;
}
// if one of the logs before the target is in use
- if(!found_log && log_in_use(fname))
+ if (!found_log && log_in_use(fname))
{
error = LOG_INFO_IN_USE;
goto err;
@@ -432,13 +555,13 @@ int MYSQL_LOG::purge_logs(THD* thd, const char* to_log)
}
end_io_cache(&io_cache);
- if(!found_log)
+ if (!found_log)
{
error = LOG_INFO_EOF;
goto err;
}
- for(i = 0; i < logs_to_purge.elements; i++)
+ for (i = 0; i < logs_to_purge.elements; i++)
{
char* l;
get_dynamic(&logs_to_purge, (gptr)&l, i);
@@ -461,9 +584,9 @@ during log purge for write");
#else
my_close(index_file, MYF(MY_WME));
my_delete(index_file_name, MYF(MY_WME));
- if(!(index_file = my_open(index_file_name,
+ if ((index_file = my_open(index_file_name,
O_CREAT | O_BINARY | O_RDWR | O_APPEND,
- MYF(MY_WME))))
+ MYF(MY_WME)))<0)
{
sql_print_error("Could not re-open the binlog index file \
during log purge for write");
@@ -472,7 +595,7 @@ during log purge for write");
}
#endif
- for(i = 0; i < logs_to_keep.elements; i++)
+ for (i = 0; i < logs_to_keep.elements; i++)
{
char* l;
get_dynamic(&logs_to_keep, (gptr)&l, i);
@@ -490,15 +613,14 @@ during log purge for write");
err:
pthread_mutex_unlock(&LOCK_index);
- if(logs_to_purge_inited)
+ if (logs_to_purge_inited)
delete_dynamic(&logs_to_purge);
- if(logs_to_keep_inited)
+ if (logs_to_keep_inited)
delete_dynamic(&logs_to_keep);
end_io_cache(&io_cache);
return error;
}
-
// we assume that buf has at least FN_REFLEN bytes alloced
void MYSQL_LOG::make_log_name(char* buf, const char* log_ident)
{
@@ -543,30 +665,36 @@ void MYSQL_LOG::new_file(bool inside_mutex)
}
if (log_type == LOG_BIN)
{
- /*
- We log the whole file name for log file as the user may decide
- to change base names at some point.
- */
- THD* thd = current_thd;
- Rotate_log_event r(thd,new_name+dirname_length(new_name));
- r.set_log_seq(0, this);
-
- /*
- This log rotation could have been initiated by a master of
- the slave running with log-bin we set the flag on rotate
- event to prevent inifinite log rotation loop
- */
- if (thd && slave_thd && thd == slave_thd)
- r.flags |= LOG_EVENT_FORCED_ROTATE_F;
- r.write(&log_file);
- VOID(pthread_cond_broadcast(&COND_binlog_update));
+ if (!no_auto_events)
+ {
+ /*
+ We log the whole file name for log file as the user may decide
+ to change base names at some point.
+ */
+ THD* thd = current_thd;
+ Rotate_log_event r(thd,new_name+dirname_length(new_name));
+ r.set_log_pos(this);
+
+ /*
+ This log rotation could have been initiated by a master of
+ the slave running with log-bin we set the flag on rotate
+ event to prevent inifinite log rotation loop
+ */
+ if (thd && thd->slave_thread)
+ r.flags |= LOG_EVENT_FORCED_ROTATE_F;
+ r.write(&log_file);
+ }
+ // update needs to be signaled even if there is no rotate event
+ // log rotation should give the waiting thread a signal to
+ // discover EOF and move on to the next log
+ signal_update();
}
else
strmov(new_name, old_name); // Reopen old file name
}
name=0;
close();
- open(old_name, log_type, new_name);
+ open(old_name, log_type, new_name, io_cache_type, no_auto_events);
my_free(old_name,MYF(0));
last_time=query_start=0;
write_error=0;
@@ -575,6 +703,31 @@ void MYSQL_LOG::new_file(bool inside_mutex)
}
}
+bool MYSQL_LOG::appendv(const char* buf, uint len,...)
+{
+ bool error = 0;
+ va_list(args);
+ va_start(args,len);
+
+ pthread_mutex_lock(&LOCK_log);
+ do
+ {
+ if (my_b_append(&log_file,buf,len))
+ {
+ error = 1;
+ break;
+ }
+ if ((uint)my_b_append_tell(&log_file) > max_binlog_size)
+ {
+ new_file(1);
+ }
+ } while ((buf=va_arg(args,const char*)) && (len=va_arg(args,uint)));
+
+ if (!error)
+ signal_update();
+ pthread_mutex_unlock(&LOCK_log);
+ return error;
+}
bool MYSQL_LOG::write(THD *thd,enum enum_server_command command,
const char *format,...)
@@ -684,11 +837,12 @@ bool MYSQL_LOG::write(Log_event* event_info)
return 0;
}
error=1;
-
+ // no check for auto events flag here - this write method should
+ // never be called if auto-events are enabled
if (thd && thd->last_insert_id_used)
{
Intvar_log_event e(thd,(uchar)LAST_INSERT_ID_EVENT,thd->last_insert_id);
- e.set_log_seq(thd, this);
+ e.set_log_pos(this);
if (thd->server_id)
e.server_id = thd->server_id;
if (e.write(file))
@@ -697,7 +851,7 @@ bool MYSQL_LOG::write(Log_event* event_info)
if (thd && thd->insert_id_used)
{
Intvar_log_event e(thd,(uchar)INSERT_ID_EVENT,thd->last_insert_id);
- e.set_log_seq(thd, this);
+ e.set_log_pos(this);
if (thd->server_id)
e.server_id = thd->server_id;
if (e.write(file))
@@ -712,12 +866,12 @@ bool MYSQL_LOG::write(Log_event* event_info)
// just in case somebody wants it later
thd->query_length = (uint)(p - buf);
Query_log_event e(thd, buf);
- e.set_log_seq(thd, this);
+ e.set_log_pos(this);
if (e.write(file))
goto err;
thd->query_length = save_query_length; // clean up
}
- event_info->set_log_seq(thd, this);
+ event_info->set_log_pos(this);
if (event_info->write(file) ||
file == &log_file && flush_io_cache(file))
goto err;
@@ -734,7 +888,7 @@ err:
write_error=1;
}
if (file == &log_file)
- VOID(pthread_cond_broadcast(&COND_binlog_update));
+ signal_update();
}
if (should_rotate)
new_file(1); // inside mutex
@@ -765,7 +919,7 @@ bool MYSQL_LOG::write(IO_CACHE *cache)
if (is_open())
{
uint length;
-
+ //QQ: this looks like a bug - why READ_CACHE?
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
{
sql_print_error(ER(ER_ERROR_ON_WRITE), cache->file_name, errno);
@@ -800,7 +954,7 @@ err:
if (error)
write_error=1;
else
- VOID(pthread_cond_broadcast(&COND_binlog_update));
+ signal_update();
VOID(pthread_mutex_unlock(&LOCK_log));
@@ -930,21 +1084,37 @@ bool MYSQL_LOG::write(THD *thd,const char *query, uint query_length,
return error;
}
+void MYSQL_LOG:: wait_for_update(THD* thd)
+{
+ const char* old_msg = thd->enter_cond(&update_cond, &LOCK_log,
+ "Slave: waiting for binlog update");
+ pthread_cond_wait(&update_cond, &LOCK_log);
+ // this is not a bug - we unlock the mutex for the caller, and expect him
+ // to lock it and then not unlock it upon return. This is a rather odd
+ // way of doing things, but this is the cleanest way I could think of to
+ // solve the race deadlock caused by THD::awake() first acquiring mysys_var
+ // mutex and then the current mutex, while wait_for_update being called with
+ // the current mutex already aquired and THD::exit_cond() trying to acquire
+ // mysys_var mutex. We do need the mutex to be acquired prior to the
+ // invocation of wait_for_update in all cases, so mutex acquisition inside
+ // wait_for_update() is not an option
+ pthread_mutex_unlock(&LOCK_log);
+ thd->exit_cond(old_msg);
+}
void MYSQL_LOG::close(bool exiting)
{ // One can't set log_type here!
if (is_open())
{
- File file=log_file.file;
- if (log_type == LOG_BIN)
+ if (log_type == LOG_BIN && !no_auto_events)
{
Stop_log_event s;
- s.set_log_seq(0, this);
+ s.set_log_pos(this);
s.write(&log_file);
- VOID(pthread_cond_broadcast(&COND_binlog_update));
+ signal_update();
}
end_io_cache(&log_file);
- if (my_close(file,MYF(0)) < 0 && ! write_error)
+ if (my_close(log_file.file,MYF(0)) < 0 && ! write_error)
{
write_error=1;
sql_print_error(ER(ER_ERROR_ON_WRITE),name,errno);
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 72198038a07..7eb7c57ae40 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -24,6 +24,8 @@
#include <my_dir.h>
#endif /* MYSQL_CLIENT */
+#include <assert.h>
+
#ifdef MYSQL_CLIENT
static void pretty_print_str(FILE* file, char* str, int len)
{
@@ -118,14 +120,14 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg):
if (thd)
{
server_id = thd->server_id;
- log_seq = thd->log_seq;
when = thd->start_time;
+ log_pos = thd->log_pos;
}
else
{
server_id = ::server_id;
- log_seq = 0;
when = time(NULL);
+ log_pos=0;
}
}
@@ -156,12 +158,12 @@ Log_event::Log_event(const char* buf, bool old_format):
server_id = uint4korr(buf + SERVER_ID_OFFSET);
if (old_format)
{
- log_seq=0;
+ log_pos=0;
flags=0;
}
else
{
- log_seq = uint4korr(buf + LOG_SEQ_OFFSET);
+ log_pos = uint4korr(buf + LOG_POS_OFFSET);
flags = uint2korr(buf + FLAGS_OFFSET);
}
#ifndef MYSQL_CLIENT
@@ -172,13 +174,13 @@ Log_event::Log_event(const char* buf, bool old_format):
#ifndef MYSQL_CLIENT
-int Log_event::exec_event(struct st_master_info* mi)
+int Log_event::exec_event(struct st_relay_log_info* rli)
{
- if (mi)
+ if (rli)
{
- thd->log_seq = 0;
- mi->inc_pos(get_event_len(), log_seq);
- flush_master_info(mi);
+ rli->inc_pos(get_event_len(),log_pos);
+ DBUG_ASSERT(rli->sql_thd != 0);
+ flush_relay_log_info(rli);
}
return 0;
}
@@ -193,14 +195,14 @@ void Query_log_event::pack_info(String* packet)
char buf[256];
String tmp(buf, sizeof(buf));
tmp.length(0);
- if(db && db_len)
+ if (db && db_len)
{
tmp.append("use ");
tmp.append(db, db_len);
tmp.append("; ", 2);
}
- if(query && q_len)
+ if (query && q_len)
tmp.append(query, q_len);
net_store_data(packet, (char*)tmp.ptr(), tmp.length());
}
@@ -345,7 +347,7 @@ void Log_event::init_show_field_list(List<Item>* field_list)
field_list->push_back(new Item_empty_string("Pos", 20));
field_list->push_back(new Item_empty_string("Event_type", 20));
field_list->push_back(new Item_empty_string("Server_id", 20));
- field_list->push_back(new Item_empty_string("Log_seq", 20));
+ field_list->push_back(new Item_empty_string("Orig_log_pos", 20));
field_list->push_back(new Item_empty_string("Info", 20));
}
@@ -363,7 +365,7 @@ int Log_event::net_send(THD* thd, const char* log_name, my_off_t pos)
event_type = get_type_str();
net_store_data(packet, event_type, strlen(event_type));
net_store_data(packet, server_id);
- net_store_data(packet, log_seq);
+ net_store_data(packet, log_pos);
pack_info(packet);
return my_net_write(&thd->net, (char*)packet->ptr(), packet->length());
}
@@ -392,7 +394,7 @@ int Log_event::write_header(IO_CACHE* file)
long tmp=get_data_size() + LOG_EVENT_HEADER_LEN;
int4store(pos, tmp);
pos += 4;
- int4store(pos, log_seq);
+ int4store(pos, log_pos);
pos += 4;
int2store(pos, flags);
pos += 2;
@@ -456,7 +458,6 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
#define LOCK_MUTEX
#endif
-
// allocates memory - the caller is responsible for clean-up
#ifndef MYSQL_CLIENT
Log_event* Log_event::read_log_event(IO_CACHE* file,
@@ -501,7 +502,7 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format)
}
buf[data_len] = 0;
memcpy(buf, head, header_size);
- if(my_b_read(file, (byte*) buf + header_size,
+ if (my_b_read(file, (byte*) buf + header_size,
data_len - header_size))
{
error = "read error";
@@ -511,9 +512,10 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format)
res->register_temp_buf(buf);
err:
UNLOCK_MUTEX;
- if(error)
+ if (error)
{
- sql_print_error(error);
+ sql_print_error("Error in Log_event::read_log_event(): '%s', \
+data_len=%d,event_type=%d",error,data_len,head[EVENT_TYPE_OFFSET]);
my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
}
return res;
@@ -581,9 +583,11 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
#ifdef MYSQL_CLIENT
void Log_event::print_header(FILE* file)
{
+ char llbuff[22];
fputc('#', file);
print_timestamp(file);
- fprintf(file, " server id %d ", server_id);
+ fprintf(file, " server id %d log_pos %s ", server_id,
+ llstr(log_pos,llbuff));
}
void Log_event::print_timestamp(FILE* file, time_t* ts)
@@ -1187,12 +1191,12 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db)
#ifndef MYSQL_CLIENT
-void Log_event::set_log_seq(THD* thd, MYSQL_LOG* log)
+void Log_event::set_log_pos(MYSQL_LOG* log)
{
- log_seq = (thd && thd->log_seq) ? thd->log_seq++ : log->log_seq++;
+ if (!log_pos)
+ log_pos = my_b_tell(&log->log_file);
}
-
void Load_log_event::set_fields(List<Item> &fields)
{
uint i;
@@ -1205,14 +1209,20 @@ void Load_log_event::set_fields(List<Item> &fields)
}
-Slave_log_event::Slave_log_event(THD* thd_arg,struct st_master_info* mi):
+Slave_log_event::Slave_log_event(THD* thd_arg,
+ struct st_relay_log_info* rli):
Log_event(thd_arg),mem_pool(0),master_host(0)
{
- if(!mi->inited)
+ if(!rli->inited)
return;
- pthread_mutex_lock(&mi->lock);
+
+ MASTER_INFO* mi = rli->mi;
+ // TODO: re-write this better without holding both
+ // locks at the same time
+ pthread_mutex_lock(&mi->data_lock);
+ pthread_mutex_lock(&rli->data_lock);
master_host_len = strlen(mi->host);
- master_log_len = strlen(mi->log_file_name);
+ master_log_len = strlen(rli->master_log_name);
// on OOM, just do not initialize the structure and print the error
if((mem_pool = (char*)my_malloc(get_data_size() + 1,
MYF(MY_WME))))
@@ -1220,13 +1230,14 @@ Slave_log_event::Slave_log_event(THD* thd_arg,struct st_master_info* mi):
master_host = mem_pool + SL_MASTER_HOST_OFFSET ;
memcpy(master_host, mi->host, master_host_len + 1);
master_log = master_host + master_host_len + 1;
- memcpy(master_log, mi->log_file_name, master_log_len + 1);
+ memcpy(master_log, rli->master_log_name, master_log_len + 1);
master_port = mi->port;
- master_pos = mi->pos;
+ master_pos = rli->master_log_pos;
}
else
sql_print_error("Out of memory while recording slave event");
- pthread_mutex_unlock(&mi->lock);
+ pthread_mutex_unlock(&rli->data_lock);
+ pthread_mutex_unlock(&mi->data_lock);
}
@@ -1533,7 +1544,7 @@ void Execute_load_log_event::pack_info(String* packet)
#endif
#ifndef MYSQL_CLIENT
-int Query_log_event::exec_event(struct st_master_info* mi)
+int Query_log_event::exec_event(struct st_relay_log_info* rli)
{
int expected_error,actual_error = 0;
init_sql_alloc(&thd->mem_root, 8192,0);
@@ -1553,7 +1564,7 @@ int Query_log_event::exec_event(struct st_master_info* mi)
// sanity check to make sure the master did not get a really bad
// error on the query
- if (!check_expected_error(thd, (expected_error = error_code)))
+ if (!check_expected_error(thd,rli,(expected_error = error_code)))
{
mysql_parse(thd, thd->query, q_len);
if (expected_error !=
@@ -1570,8 +1581,8 @@ int Query_log_event::exec_event(struct st_master_info* mi)
else if (expected_error == actual_error)
{
thd->query_error = 0;
- *last_slave_error = 0;
- last_slave_errno = 0;
+ *rli->last_slave_error = 0;
+ rli->last_slave_errno = 0;
}
}
else
@@ -1592,17 +1603,17 @@ int Query_log_event::exec_event(struct st_master_info* mi)
if (thd->query_error || thd->fatal_error)
{
- slave_print_error(actual_error, "error '%s' on query '%s'",
+ slave_print_error(rli,actual_error, "error '%s' on query '%s'",
actual_error ? thd->net.last_error :
"unexpected success or fatal error", query);
free_root(&thd->mem_root,0);
return 1;
}
free_root(&thd->mem_root,0);
- return Log_event::exec_event(mi);
+ return Log_event::exec_event(rli);
}
-int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
+int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli)
{
init_sql_alloc(&thd->mem_root, 8192,0);
thd->db = rewrite_db((char*)db);
@@ -1625,6 +1636,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
// the table will be opened in mysql_load
if(table_rules_on && !tables_ok(thd, &tables))
{
+ // TODO: this is a bug - this needs to be moved to the I/O thread
if (net)
skip_load_data_infile(net);
}
@@ -1632,7 +1644,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
{
char llbuff[22];
enum enum_duplicates handle_dup = DUP_IGNORE;
- if(sql_ex.opt_flags && REPLACE_FLAG)
+ if (sql_ex.opt_flags && REPLACE_FLAG)
handle_dup = DUP_REPLACE;
sql_exchange ex((char*)fname, sql_ex.opt_flags &&
DUMPFILE_FLAG );
@@ -1663,7 +1675,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
thd->query_error = 1;
if(thd->cuted_fields)
sql_print_error("Slave: load data infile at position %s in log \
-'%s' produced %d warning(s)", llstr(mi->pos,llbuff), RPL_LOG_NAME,
+'%s' produced %d warning(s)", llstr(rli->master_log_pos,llbuff), RPL_LOG_NAME,
thd->cuted_fields );
if(net)
net->pkt_nr = thd->net.pkt_nr;
@@ -1673,6 +1685,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
{
// we will just ask the master to send us /dev/null if we do not
// want to load the data
+ // TODO: this a bug - needs to be done in I/O thread
if (net)
skip_load_data_infile(net);
}
@@ -1683,10 +1696,11 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
if(thd->query_error)
{
int sql_error = thd->net.last_errno;
- if(!sql_error)
+ if (!sql_error)
sql_error = ER_UNKNOWN_ERROR;
- slave_print_error(sql_error, "Slave: Error '%s' running load data infile ",
+ slave_print_error(rli,sql_error,
+ "Slave: Error '%s' running load data infile ",
ER_SAFE(sql_error));
free_root(&thd->mem_root,0);
return 1;
@@ -1699,38 +1713,43 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
return 1;
}
- return Log_event::exec_event(mi);
+ return Log_event::exec_event(rli);
}
-int Start_log_event::exec_event(struct st_master_info* mi)
+int Start_log_event::exec_event(struct st_relay_log_info* rli)
{
- if (!mi->old_format)
+ if (!rli->mi->old_format)
{
close_temporary_tables(thd);
cleanup_load_tmpdir();
}
- return Log_event::exec_event(mi);
+ return Log_event::exec_event(rli);
}
-int Stop_log_event::exec_event(struct st_master_info* mi)
+int Stop_log_event::exec_event(struct st_relay_log_info* rli)
{
- if (mi->pos > 4) // stop event should be ignored after rotate event
+ // do not clean up immediately after rotate event
+ if (rli->master_log_pos > 4)
{
close_temporary_tables(thd);
cleanup_load_tmpdir();
- mi->inc_pos(get_event_len(), log_seq);
- flush_master_info(mi);
}
- thd->log_seq = 0;
+ // we do not want to update master_log pos because we get a rotate event
+ // before stop, so by now master_log_name is set to the next log
+ // if we updated it, we will have incorrect master coordinates and this
+ // could give false triggers in MASTER_POS_WAIT() that we have reached
+ // the targed position when in fact we have not
+ rli->inc_pos(get_event_len(), 0);
+ flush_relay_log_info(rli);
return 0;
}
-int Rotate_log_event::exec_event(struct st_master_info* mi)
+int Rotate_log_event::exec_event(struct st_relay_log_info* rli)
{
bool rotate_binlog = 0, write_slave_event = 0;
- char* log_name = mi->log_file_name;
- pthread_mutex_lock(&mi->lock);
-
+ char* log_name = rli->master_log_name;
+ pthread_mutex_lock(&rli->data_lock);
+ // TODO: probably needs re-write
// rotate local binlog only if the name of remote has changed
if (!*log_name || !(log_name[ident_len] == 0 &&
!memcmp(log_name, new_log_ident, ident_len)))
@@ -1738,41 +1757,38 @@ int Rotate_log_event::exec_event(struct st_master_info* mi)
write_slave_event = (!(flags & LOG_EVENT_FORCED_ROTATE_F)
&& mysql_bin_log.is_open());
rotate_binlog = (*log_name && write_slave_event);
- memcpy(log_name, new_log_ident,ident_len );
+ if (ident_len >= sizeof(rli->master_log_name))
+ return 1;
+ memcpy(log_name, new_log_ident,ident_len);
log_name[ident_len] = 0;
}
- mi->pos = pos;
- mi->last_log_seq = log_seq;
-#ifndef DBUG_OFF
- if (abort_slave_event_count)
- ++events_till_abort;
-#endif
+ rli->master_log_pos = pos;
+ rli->relay_log_pos += get_event_len();
if (rotate_binlog)
{
mysql_bin_log.new_file();
- mi->last_log_seq = 0;
+ rli->master_log_pos = 4;
}
- pthread_cond_broadcast(&mi->cond);
- pthread_mutex_unlock(&mi->lock);
- flush_master_info(mi);
+ pthread_cond_broadcast(&rli->data_cond);
+ pthread_mutex_unlock(&rli->data_lock);
+ flush_relay_log_info(rli);
if (write_slave_event)
{
- Slave_log_event s(thd, mi);
+ Slave_log_event s(thd, rli);
if (s.master_host)
{
- s.set_log_seq(0, &mysql_bin_log);
+ s.set_log_pos(&mysql_bin_log);
s.server_id = ::server_id;
mysql_bin_log.write(&s);
}
}
- thd->log_seq = 0;
return 0;
}
-int Intvar_log_event::exec_event(struct st_master_info* mi)
+int Intvar_log_event::exec_event(struct st_relay_log_info* rli)
{
- switch(type)
+ switch (type)
{
case LAST_INSERT_ID_EVENT:
thd->last_insert_id_used = 1;
@@ -1782,18 +1798,18 @@ int Intvar_log_event::exec_event(struct st_master_info* mi)
thd->next_insert_id = val;
break;
}
- mi->inc_pending(get_event_len());
+ rli->inc_pending(get_event_len());
return 0;
}
-int Slave_log_event::exec_event(struct st_master_info* mi)
+int Slave_log_event::exec_event(struct st_relay_log_info* rli)
{
if(mysql_bin_log.is_open())
mysql_bin_log.write(this);
- return Log_event::exec_event(mi);
+ return Log_event::exec_event(rli);
}
-int Create_file_log_event::exec_event(struct st_master_info* mi)
+int Create_file_log_event::exec_event(struct st_relay_log_info* rli)
{
char fname_buf[FN_REFLEN+10];
char *p;
@@ -1809,7 +1825,7 @@ int Create_file_log_event::exec_event(struct st_master_info* mi)
init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
MYF(MY_WME|MY_NABP)))
{
- slave_print_error(my_errno, "Could not open file '%s'", fname_buf);
+ slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf);
goto err;
}
@@ -1820,7 +1836,7 @@ int Create_file_log_event::exec_event(struct st_master_info* mi)
if (write_base(&file))
{
strmov(p, ".info"); // to have it right in the error message
- slave_print_error(my_errno, "Could not write to file '%s'", fname_buf);
+ slave_print_error(rli,my_errno, "Could not write to file '%s'", fname_buf);
goto err;
}
end_io_cache(&file);
@@ -1830,12 +1846,12 @@ int Create_file_log_event::exec_event(struct st_master_info* mi)
if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC,
MYF(MY_WME))) < 0)
{
- slave_print_error(my_errno, "Could not open file '%s'", fname_buf);
+ slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf);
goto err;
}
if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP)))
{
- slave_print_error(my_errno, "Write to '%s' failed", fname_buf);
+ slave_print_error(rli,my_errno, "Write to '%s' failed", fname_buf);
goto err;
}
if (mysql_bin_log.is_open())
@@ -1846,10 +1862,10 @@ err:
end_io_cache(&file);
if (fd >= 0)
my_close(fd, MYF(0));
- return error ? 1 : Log_event::exec_event(mi);
+ return error ? 1 : Log_event::exec_event(rli);
}
-int Delete_file_log_event::exec_event(struct st_master_info* mi)
+int Delete_file_log_event::exec_event(struct st_relay_log_info* rli)
{
char fname[FN_REFLEN+10];
char* p;
@@ -1860,10 +1876,10 @@ int Delete_file_log_event::exec_event(struct st_master_info* mi)
(void)my_delete(fname, MYF(MY_WME));
if (mysql_bin_log.is_open())
mysql_bin_log.write(this);
- return Log_event::exec_event(mi);
+ return Log_event::exec_event(rli);
}
-int Append_block_log_event::exec_event(struct st_master_info* mi)
+int Append_block_log_event::exec_event(struct st_relay_log_info* rli)
{
char fname[FN_REFLEN+10];
char* p;
@@ -1873,12 +1889,12 @@ int Append_block_log_event::exec_event(struct st_master_info* mi)
memcpy(p, ".data", 6);
if ((fd = my_open(fname, O_WRONLY|O_APPEND|O_BINARY, MYF(MY_WME))) < 0)
{
- slave_print_error(my_errno, "Could not open file '%s'", fname);
+ slave_print_error(rli,my_errno, "Could not open file '%s'", fname);
goto err;
}
if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP)))
{
- slave_print_error(my_errno, "Write to '%s' failed", fname);
+ slave_print_error(rli,my_errno, "Write to '%s' failed", fname);
goto err;
}
if (mysql_bin_log.is_open())
@@ -1887,10 +1903,10 @@ int Append_block_log_event::exec_event(struct st_master_info* mi)
err:
if (fd >= 0)
my_close(fd, MYF(0));
- return error ? error : Log_event::exec_event(mi);
+ return error ? error : Log_event::exec_event(rli);
}
-int Execute_load_log_event::exec_event(struct st_master_info* mi)
+int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
{
char fname[FN_REFLEN+10];
char* p;
@@ -1906,7 +1922,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi)
init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
MYF(MY_WME|MY_NABP)))
{
- slave_print_error(my_errno, "Could not open file '%s'", fname);
+ slave_print_error(rli,my_errno, "Could not open file '%s'", fname);
goto err;
}
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
@@ -1914,7 +1930,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi)
(bool)0))
|| lev->get_type_code() != NEW_LOAD_EVENT)
{
- slave_print_error(0, "File '%s' appears corrupted", fname);
+ slave_print_error(rli,0, "File '%s' appears corrupted", fname);
goto err;
}
// we want to disable binary logging in slave thread
@@ -1927,7 +1943,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi)
lev->thd = thd;
if (lev->exec_event(0,0))
{
- slave_print_error(my_errno, "Failed executing load from '%s'", fname);
+ slave_print_error(rli,my_errno, "Failed executing load from '%s'", fname);
thd->options = save_options;
goto err;
}
@@ -1943,7 +1959,7 @@ err:
end_io_cache(&file);
if (fd >= 0)
my_close(fd, MYF(0));
- return error ? error : Log_event::exec_event(mi);
+ return error ? error : Log_event::exec_event(rli);
}
diff --git a/sql/log_event.h b/sql/log_event.h
index 329d748025d..686d353d8bb 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -34,7 +34,7 @@
#define LOG_READ_TOO_LARGE -7
#define LOG_EVENT_OFFSET 4
-#define BINLOG_VERSION 2
+#define BINLOG_VERSION 3
/* we could have used SERVER_VERSION_LENGTH, but this introduces an
obscure dependency - if somebody decided to change SERVER_VERSION_LENGTH
@@ -120,7 +120,7 @@ struct sql_ex_info
#define EVENT_TYPE_OFFSET 4
#define SERVER_ID_OFFSET 5
#define EVENT_LEN_OFFSET 9
-#define LOG_SEQ_OFFSET 13
+#define LOG_POS_OFFSET 13
#define FLAGS_OFFSET 17
/* start event post-header */
@@ -206,7 +206,7 @@ class THD;
extern uint32 server_id;
-struct st_master_info;
+struct st_relay_log_info;
class Log_event
{
@@ -214,7 +214,7 @@ public:
time_t when;
ulong exec_time;
uint32 server_id;
- uint32 log_seq;
+ uint32 log_pos;
uint16 flags;
int cached_event_len;
char* temp_buf;
@@ -282,11 +282,11 @@ public:
#ifndef MYSQL_CLIENT
static int read_log_event(IO_CACHE* file, String* packet,
pthread_mutex_t* log_lock);
- void set_log_seq(THD* thd, MYSQL_LOG* log);
+ void set_log_pos(MYSQL_LOG* log);
virtual void pack_info(String* packet);
int net_send(THD* thd, const char* log_name, my_off_t pos);
static void init_show_field_list(List<Item>* field_list);
- virtual int exec_event(struct st_master_info* mi);
+ virtual int exec_event(struct st_relay_log_info* rli);
virtual const char* get_db()
{
return thd ? thd->db : 0;
@@ -316,7 +316,7 @@ public:
bool using_trans=0);
const char* get_db() { return db; }
void pack_info(String* packet);
- int exec_event(struct st_master_info* mi);
+ int exec_event(struct st_relay_log_info* rli);
bool get_cache_stmt() { return cache_stmt; }
#endif
@@ -359,9 +359,9 @@ public:
ulonglong master_pos;
#ifndef MYSQL_CLIENT
- Slave_log_event(THD* thd_arg, struct st_master_info* mi);
+ Slave_log_event(THD* thd_arg, struct st_relay_log_info* rli);
void pack_info(String* packet);
- int exec_event(struct st_master_info* mi);
+ int exec_event(struct st_relay_log_info* rli);
#endif
Slave_log_event(const char* buf, int event_len);
@@ -407,11 +407,11 @@ public:
void set_fields(List<Item> &fields_arg);
void pack_info(String* packet);
const char* get_db() { return db; }
- int exec_event(struct st_master_info* mi)
+ int exec_event(struct st_relay_log_info* rli)
{
- return exec_event(thd->slave_net,mi);
+ return exec_event(thd->slave_net,rli);
}
- int exec_event(NET* net, struct st_master_info* mi);
+ int exec_event(NET* net, struct st_relay_log_info* rli);
#endif
Load_log_event(const char* buf, int event_len, bool old_format);
@@ -465,7 +465,7 @@ public:
}
#ifndef MYSQL_CLIENT
void pack_info(String* packet);
- int exec_event(struct st_master_info* mi);
+ int exec_event(struct st_relay_log_info* rli);
#endif
#ifdef MYSQL_CLIENT
void print(FILE* file, bool short_form = 0, char* last_db = 0);
@@ -491,7 +491,7 @@ public:
bool is_valid() { return 1; }
#ifndef MYSQL_CLIENT
void pack_info(String* packet);
- int exec_event(struct st_master_info* mi);
+ int exec_event(struct st_relay_log_info* rli);
#endif
#ifdef MYSQL_CLIENT
@@ -517,7 +517,7 @@ public:
void print(FILE* file, bool short_form = 0, char* last_db = 0);
#endif
#ifndef MYSQL_CLIENT
- int exec_event(struct st_master_info* mi);
+ int exec_event(struct st_relay_log_info* rli);
#endif
};
@@ -553,7 +553,7 @@ public:
#endif
#ifndef MYSQL_CLIENT
void pack_info(String* packet);
- int exec_event(struct st_master_info* mi);
+ int exec_event(struct st_relay_log_info* rli);
#endif
};
@@ -602,7 +602,7 @@ public:
#endif
#ifndef MYSQL_CLIENT
void pack_info(String* packet);
- int exec_event(struct st_master_info* mi);
+ int exec_event(struct st_relay_log_info* rli);
#endif
};
@@ -616,7 +616,7 @@ public:
#ifndef MYSQL_CLIENT
Append_block_log_event(THD* thd, char* block_arg,
uint block_len_arg);
- int exec_event(struct st_master_info* mi);
+ int exec_event(struct st_relay_log_info* rli);
#endif
Append_block_log_event(const char* buf, int event_len);
@@ -659,7 +659,7 @@ public:
#endif
#ifndef MYSQL_CLIENT
void pack_info(String* packet);
- int exec_event(struct st_master_info* mi);
+ int exec_event(struct st_relay_log_info* rli);
#endif
};
@@ -686,7 +686,7 @@ public:
#endif
#ifndef MYSQL_CLIENT
void pack_info(String* packet);
- int exec_event(struct st_master_info* mi);
+ int exec_event(struct st_relay_log_info* rli);
#endif
};
diff --git a/sql/mini_client.cc b/sql/mini_client.cc
index f75f5b09cef..a43e5710e60 100644
--- a/sql/mini_client.cc
+++ b/sql/mini_client.cc
@@ -101,7 +101,7 @@ static MYSQL_FIELD *unpack_fields(MYSQL_DATA *data,MEM_ROOT *alloc,uint fields,
my_bool default_value,
my_bool long_flag_protocol);
-static void mc_end_server(MYSQL *mysql);
+void mc_end_server(MYSQL *mysql);
static int mc_sock_connect(File s, const struct sockaddr *name, uint namelen, uint to);
static void mc_free_old_query(MYSQL *mysql);
static int mc_send_file_to_server(MYSQL *mysql, const char *filename);
@@ -206,8 +206,7 @@ HANDLE create_named_pipe(NET *net, uint connect_timeout, char **arg_host,
** Init MySQL structure or allocate one
****************************************************************************/
-MYSQL * STDCALL
-mc_mysql_init(MYSQL *mysql)
+MYSQL *mc_mysql_init(MYSQL *mysql)
{
init_client_errs();
if (!mysql)
@@ -229,7 +228,7 @@ mc_mysql_init(MYSQL *mysql)
** Shut down connection
**************************************************************************/
-static void
+void
mc_end_server(MYSQL *mysql)
{
DBUG_ENTER("mc_end_server");
@@ -351,7 +350,7 @@ static int mc_sock_connect(my_socket s, const struct sockaddr *name,
** or packet is an error message
*****************************************************************************/
-ulong STDCALL
+ulong
mc_net_safe_read(MYSQL *mysql)
{
NET *net= &mysql->net;
@@ -412,17 +411,17 @@ max_allowed_packet on this server");
}
-char * STDCALL mc_mysql_error(MYSQL *mysql)
+char * mc_mysql_error(MYSQL *mysql)
{
return (mysql)->net.last_error;
}
-int STDCALL mc_mysql_errno(MYSQL *mysql)
+int mc_mysql_errno(MYSQL *mysql)
{
return (mysql)->net.last_errno;
}
-my_bool STDCALL mc_mysql_reconnect(MYSQL *mysql)
+my_bool mc_mysql_reconnect(MYSQL *mysql)
{
MYSQL tmp_mysql;
DBUG_ENTER("mc_mysql_reconnect");
@@ -452,7 +451,7 @@ my_bool STDCALL mc_mysql_reconnect(MYSQL *mysql)
-int STDCALL
+int
mc_simple_command(MYSQL *mysql,enum enum_server_command command,
const char *arg, uint length, my_bool skipp_check)
{
@@ -505,7 +504,7 @@ mc_simple_command(MYSQL *mysql,enum enum_server_command command,
}
-MYSQL * STDCALL
+MYSQL *
mc_mysql_connect(MYSQL *mysql,const char *host, const char *user,
const char *passwd, const char *db,
uint port, const char *unix_socket,uint client_flag)
@@ -856,7 +855,7 @@ error:
** NB! Errors are not reported until you do mysql_real_connect.
**************************************************************************
*/
-int STDCALL
+int
mysql_ssl_clear(MYSQL *mysql)
{
my_free(mysql->options.ssl_key, MYF(MY_ALLOW_ZERO_PTR));
@@ -879,7 +878,7 @@ mysql_ssl_clear(MYSQL *mysql)
** If handle is alloced by mysql connect free it.
*************************************************************************/
-void STDCALL
+void
mc_mysql_close(MYSQL *mysql)
{
DBUG_ENTER("mysql_close");
@@ -910,7 +909,7 @@ mc_mysql_close(MYSQL *mysql)
DBUG_VOID_RETURN;
}
-void STDCALL mc_mysql_free_result(MYSQL_RES *result)
+void mc_mysql_free_result(MYSQL_RES *result)
{
DBUG_ENTER("mc_mysql_free_result");
DBUG_PRINT("enter",("mysql_res: %lx",result));
@@ -988,13 +987,13 @@ mc_unpack_fields(MYSQL_DATA *data,MEM_ROOT *alloc,uint fields,
DBUG_RETURN(result);
}
-int STDCALL
+int
mc_mysql_send_query(MYSQL* mysql, const char* query, uint length)
{
return mc_simple_command(mysql, COM_QUERY, query, length, 1);
}
-int STDCALL mc_mysql_read_query_result(MYSQL *mysql)
+int mc_mysql_read_query_result(MYSQL *mysql)
{
uchar *pos;
ulong field_count;
@@ -1042,7 +1041,7 @@ get_info:
DBUG_RETURN(0);
}
-int STDCALL mc_mysql_query(MYSQL *mysql, const char *query, uint length)
+int mc_mysql_query(MYSQL *mysql, const char *query, uint length)
{
DBUG_ENTER("mysql_real_query");
DBUG_PRINT("enter",("handle: %lx",mysql));
@@ -1289,17 +1288,17 @@ static int mc_read_one_row(MYSQL *mysql,uint fields,MYSQL_ROW row,
return 0;
}
-my_ulonglong STDCALL mc_mysql_num_rows(MYSQL_RES *res)
+my_ulonglong mc_mysql_num_rows(MYSQL_RES *res)
{
return res->row_count;
}
-unsigned int STDCALL mc_mysql_num_fields(MYSQL_RES *res)
+unsigned int mc_mysql_num_fields(MYSQL_RES *res)
{
return res->field_count;
}
-void STDCALL mc_mysql_data_seek(MYSQL_RES *result, my_ulonglong row)
+void mc_mysql_data_seek(MYSQL_RES *result, my_ulonglong row)
{
MYSQL_ROWS *tmp=0;
DBUG_PRINT("info",("mysql_data_seek(%ld)",(long) row));
@@ -1309,7 +1308,7 @@ void STDCALL mc_mysql_data_seek(MYSQL_RES *result, my_ulonglong row)
result->data_cursor = tmp;
}
-MYSQL_ROW STDCALL mc_mysql_fetch_row(MYSQL_RES *res)
+MYSQL_ROW mc_mysql_fetch_row(MYSQL_RES *res)
{
DBUG_ENTER("mc_mysql_fetch_row");
if (!res->data)
@@ -1344,7 +1343,7 @@ MYSQL_ROW STDCALL mc_mysql_fetch_row(MYSQL_RES *res)
}
}
-int STDCALL mc_mysql_select_db(MYSQL *mysql, const char *db)
+int mc_mysql_select_db(MYSQL *mysql, const char *db)
{
int error;
DBUG_ENTER("mysql_select_db");
@@ -1358,7 +1357,7 @@ int STDCALL mc_mysql_select_db(MYSQL *mysql, const char *db)
}
-MYSQL_RES * STDCALL mc_mysql_store_result(MYSQL *mysql)
+MYSQL_RES *mc_mysql_store_result(MYSQL *mysql)
{
MYSQL_RES *result;
DBUG_ENTER("mysql_store_result");
diff --git a/sql/mini_client.h b/sql/mini_client.h
index de78da06eec..1e17d34244e 100644
--- a/sql/mini_client.h
+++ b/sql/mini_client.h
@@ -18,41 +18,35 @@
#define _MINI_CLIENT_H
-MYSQL* STDCALL
-mc_mysql_connect(MYSQL *mysql,const char *host, const char *user,
+MYSQL* mc_mysql_connect(MYSQL *mysql,const char *host, const char *user,
const char *passwd, const char *db,
uint port, const char *unix_socket,uint client_flag);
-int STDCALL
-mc_simple_command(MYSQL *mysql,enum enum_server_command command, const char *arg,
+int mc_simple_command(MYSQL *mysql,enum enum_server_command command, const char *arg,
uint length, my_bool skipp_check);
-void STDCALL
-mc_mysql_close(MYSQL *mysql);
-
-MYSQL * STDCALL
-mc_mysql_init(MYSQL *mysql);
-
-void STDCALL
-mc_mysql_debug(const char *debug);
-
-ulong STDCALL
-mc_net_safe_read(MYSQL *mysql);
-
-char * STDCALL mc_mysql_error(MYSQL *mysql);
-int STDCALL mc_mysql_errno(MYSQL *mysql);
-my_bool STDCALL mc_mysql_reconnect(MYSQL* mysql);
-
-int STDCALL mc_mysql_send_query(MYSQL* mysql, const char* query, uint length);
-int STDCALL mc_mysql_read_query_result(MYSQL *mysql);
-int STDCALL mc_mysql_query(MYSQL *mysql, const char *query, uint length);
-MYSQL_RES * STDCALL mc_mysql_store_result(MYSQL *mysql);
-void STDCALL mc_mysql_free_result(MYSQL_RES *result);
-void STDCALL mc_mysql_data_seek(MYSQL_RES *result, my_ulonglong row);
-my_ulonglong STDCALL mc_mysql_num_rows(MYSQL_RES *res);
-unsigned int STDCALL mc_mysql_num_fields(MYSQL_RES *res);
-MYSQL_ROW STDCALL mc_mysql_fetch_row(MYSQL_RES *res);
-int STDCALL mc_mysql_select_db(MYSQL *mysql, const char *db);
+void mc_mysql_close(MYSQL *mysql);
+
+MYSQL * mc_mysql_init(MYSQL *mysql);
+
+void mc_mysql_debug(const char *debug);
+ulong mc_net_safe_read(MYSQL *mysql);
+
+char * mc_mysql_error(MYSQL *mysql);
+int mc_mysql_errno(MYSQL *mysql);
+my_bool mc_mysql_reconnect(MYSQL* mysql);
+
+int mc_mysql_send_query(MYSQL* mysql, const char* query, uint length);
+int mc_mysql_read_query_result(MYSQL *mysql);
+int mc_mysql_query(MYSQL *mysql, const char *query, uint length);
+MYSQL_RES * mc_mysql_store_result(MYSQL *mysql);
+void mc_mysql_free_result(MYSQL_RES *result);
+void mc_mysql_data_seek(MYSQL_RES *result, my_ulonglong row);
+my_ulonglong mc_mysql_num_rows(MYSQL_RES *res);
+unsigned int mc_mysql_num_fields(MYSQL_RES *res);
+MYSQL_ROW STDCALL mc_mysql_fetch_row(MYSQL_RES *res);
+int mc_mysql_select_db(MYSQL *mysql, const char *db);
+void mc_end_server(MYSQL *mysql);
#endif
diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h
index e0e85145c52..12a043250a5 100644
--- a/sql/mysql_priv.h
+++ b/sql/mysql_priv.h
@@ -519,6 +519,10 @@ void sql_print_error(const char *format,...)
__attribute__ ((format (printf, 1, 2)));
bool fn_format_relative_to_data_home(my_string to, const char *name,
const char *dir, const char *extension);
+void open_log(MYSQL_LOG *log, const char *hostname,
+ const char *opt_name, const char *extension,
+ enum_log_type type, bool read_append = 0,
+ bool no_auto_events = 0);
extern uint32 server_id;
extern char *mysql_data_home,server_version[SERVER_VERSION_LENGTH],
@@ -550,9 +554,8 @@ extern pthread_mutex_t LOCK_mysql_create_db,LOCK_Acl,LOCK_open,
LOCK_thread_count,LOCK_mapped_file,LOCK_user_locks, LOCK_status,
LOCK_grant, LOCK_error_log, LOCK_delayed_insert,
LOCK_delayed_status, LOCK_delayed_create, LOCK_crypt, LOCK_timezone,
- LOCK_binlog_update, LOCK_slave, LOCK_server_id, LOCK_slave_list;
-extern pthread_cond_t COND_refresh,COND_thread_count, COND_binlog_update,
- COND_slave_stopped, COND_slave_start;
+ LOCK_server_id, LOCK_slave_list, LOCK_active_mi;
+extern pthread_cond_t COND_refresh,COND_thread_count;
extern pthread_attr_t connection_attrib;
extern bool opt_endinfo, using_udf_functions, locked_in_memory,
opt_using_transactions, use_temp_pool, mysql_embedded;
@@ -588,6 +591,7 @@ extern struct show_var_st init_vars[];
extern struct show_var_st status_vars[];
extern enum db_type default_table_type;
extern enum enum_tx_isolation default_tx_isolation;
+extern char glob_hostname[FN_REFLEN];
#ifndef __WIN__
extern pthread_t signal_thread;
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index c4af3366806..6d99db62f12 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -206,7 +206,7 @@ SHOW_COMP_OPTION have_openssl=SHOW_OPTION_NO;
SHOW_COMP_OPTION have_symlink=SHOW_OPTION_YES;
-static bool opt_skip_slave_start = 0; // If set, slave is not autostarted
+bool opt_skip_slave_start = 0; // If set, slave is not autostarted
static bool opt_do_pstack = 0;
static ulong opt_specialflag=SPECIAL_ENGLISH;
static ulong back_log,connect_timeout,concurrency;
@@ -225,8 +225,6 @@ bool opt_sql_bin_update = 0, opt_log_slave_updates = 0, opt_safe_show_db=0,
opt_safe_user_create = 0, opt_no_mix_types = 0;
FILE *bootstrap_file=0;
int segfaulted = 0; // ensure we do not enter SIGSEGV handler twice
-extern MASTER_INFO glob_mi;
-extern int init_master_info(MASTER_INFO* mi);
/*
If sql_bin_update is true, SQL_LOG_UPDATE and SQL_LOG_BIN are kept in sync,
@@ -238,7 +236,7 @@ static struct rand_struct sql_rand;
static int cleanup_done;
static char **defaults_argv,time_zone[30];
static const char *default_table_type_name;
-static char glob_hostname[FN_REFLEN];
+char glob_hostname[FN_REFLEN];
#include "sslopt-vars.h"
#ifdef HAVE_OPENSSL
@@ -274,7 +272,9 @@ volatile ulong cached_thread_count=0;
// replication parameters, if master_host is not NULL, we are a slave
my_string master_user = (char*) "test", master_password = 0, master_host=0,
- master_info_file = (char*) "master.info", master_ssl_key=0, master_ssl_cert=0;
+ master_info_file = (char*) "master.info",
+ relay_log_info_file = (char*) "relay-log.info",
+ master_ssl_key=0, master_ssl_cert=0;
my_string report_user = 0, report_password = 0, report_host=0;
const char *localhost=LOCAL_HOST;
@@ -321,6 +321,7 @@ bool mysql_embedded=1;
#endif
char *opt_bin_logname = 0; // this one needs to be seen in sql_parse.cc
+char *opt_relay_logname = 0, *opt_relaylog_index_name=0;
char server_version[SERVER_VERSION_LENGTH]=MYSQL_SERVER_VERSION;
const char *first_keyword="first";
const char **errmesg; /* Error messages */
@@ -356,11 +357,10 @@ pthread_mutex_t LOCK_mysql_create_db, LOCK_Acl, LOCK_open, LOCK_thread_count,
LOCK_error_log,
LOCK_delayed_insert, LOCK_delayed_status, LOCK_delayed_create,
LOCK_crypt, LOCK_bytes_sent, LOCK_bytes_received,
- LOCK_binlog_update, LOCK_slave, LOCK_server_id,
- LOCK_user_conn, LOCK_slave_list;
+ LOCK_server_id,
+ LOCK_user_conn, LOCK_slave_list, LOCK_active_mi;
-pthread_cond_t COND_refresh,COND_thread_count,COND_binlog_update,
- COND_slave_stopped, COND_slave_start;
+pthread_cond_t COND_refresh,COND_thread_count;
pthread_cond_t COND_thread_cache,COND_flush_thread_cache;
pthread_t signal_thread;
pthread_attr_t connection_attrib;
@@ -759,6 +759,7 @@ void clean_up(bool print_message)
my_free(allocated_mysql_tmpdir,MYF(MY_ALLOW_ZERO_PTR));
my_free(slave_load_tmpdir,MYF(MY_ALLOW_ZERO_PTR));
x_free(opt_bin_logname);
+ x_free(opt_relay_logname);
bitmap_free(&temp_pool);
free_max_user_conn();
end_slave_list();
@@ -1095,6 +1096,8 @@ void end_thread(THD *thd, bool put_in_cache)
DBUG_PRINT("info", ("sending a broadcast"))
/* Tell main we are ready */
+ // TODO: explain why we broadcast outside of the lock or
+ // fix the bug - Sasha
(void) pthread_mutex_unlock(&LOCK_thread_count);
(void) pthread_cond_broadcast(&COND_thread_count);
DBUG_PRINT("info", ("unlocked thread_count mutex"))
@@ -1266,6 +1269,7 @@ the thread stack. Please read http://www.mysql.com/doc/L/i/Linux.html\n\n",
#ifdef HAVE_STACKTRACE
if(!(test_flags & TEST_NO_STACKTRACE))
{
+ fprintf(stderr,"thd=%p\n",thd);
print_stacktrace(thd ? (gptr) thd->thread_stack : (gptr) 0,
thread_stack);
}
@@ -1594,9 +1598,10 @@ const char *load_default_groups[]= { "mysqld","server",0 };
char *libwrapName=NULL;
#endif
-static void open_log(MYSQL_LOG *log, const char *hostname,
+void open_log(MYSQL_LOG *log, const char *hostname,
const char *opt_name, const char *extension,
- enum_log_type type)
+ enum_log_type type, bool read_append,
+ bool no_auto_events)
{
char tmp[FN_REFLEN];
if (!opt_name || !opt_name[0])
@@ -1620,7 +1625,8 @@ static void open_log(MYSQL_LOG *log, const char *hostname,
opt_name=tmp;
}
}
- log->open(opt_name,type);
+ log->open(opt_name,type,0,(read_append) ? SEQ_READ_APPEND : WRITE_CACHE,
+ no_auto_events);
}
@@ -1716,19 +1722,15 @@ int main(int argc, char **argv)
(void) pthread_mutex_init(&LOCK_bytes_sent,MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_bytes_received,MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_timezone,MY_MUTEX_INIT_FAST);
- (void) pthread_mutex_init(&LOCK_binlog_update, MY_MUTEX_INIT_FAST); // QQ NOT USED
- (void) pthread_mutex_init(&LOCK_slave, MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_server_id, MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_user_conn, MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_rpl_status, MY_MUTEX_INIT_FAST);
+ (void) pthread_mutex_init(&LOCK_active_mi, MY_MUTEX_INIT_FAST);
(void) pthread_cond_init(&COND_thread_count,NULL);
(void) pthread_cond_init(&COND_refresh,NULL);
(void) pthread_cond_init(&COND_thread_cache,NULL);
(void) pthread_cond_init(&COND_flush_thread_cache,NULL);
(void) pthread_cond_init(&COND_manager,NULL);
- (void) pthread_cond_init(&COND_binlog_update, NULL);
- (void) pthread_cond_init(&COND_slave_stopped, NULL);
- (void) pthread_cond_init(&COND_slave_start, NULL);
(void) pthread_cond_init(&COND_rpl_status, NULL);
init_signals();
@@ -1825,20 +1827,9 @@ int main(int argc, char **argv)
LOG_NEW);
using_update_log=1;
}
-
- /*
- make sure slave thread gets started if server_id is set,
- valid master.info is present, and master_host has not been specified
- */
- if (server_id && !master_host)
- {
- char fname[FN_REFLEN+128];
- MY_STAT stat_area;
- fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32);
- if (my_stat(fname, &stat_area, MYF(0)) && !init_master_info(&glob_mi))
- master_host = glob_mi.host;
- }
-
+
+ init_slave();
+
if (opt_bin_log && !server_id)
{
server_id= !master_host ? 1 : 2;
@@ -1991,17 +1982,6 @@ The server will not act as a slave.");
sql_print_error("Warning: Can't create thread to manage maintenance");
}
- // slave thread
- if (master_host)
- {
- pthread_t hThread;
- if (!opt_skip_slave_start &&
- pthread_create(&hThread, &connection_attrib, handle_slave, 0))
- sql_print_error("Warning: Can't create thread to handle slave");
- else if(opt_skip_slave_start)
- init_master_info(&glob_mi);
- }
-
printf(ER(ER_READY),my_progname,server_version,"");
fflush(stdout);
@@ -2665,7 +2645,8 @@ enum options {
OPT_REPORT_USER, OPT_REPORT_PASSWORD, OPT_REPORT_PORT,
OPT_SHOW_SLAVE_AUTH_INFO, OPT_OLD_RPL_COMPAT,
OPT_SLAVE_LOAD_TMPDIR, OPT_NO_MIX_TYPE,
- OPT_RPL_RECOVERY_RANK,OPT_INIT_RPL_ROLE
+ OPT_RPL_RECOVERY_RANK,OPT_INIT_RPL_ROLE,
+ OPT_RELAY_LOG, OPT_RELAY_LOG_INDEX, OPT_RELAY_LOG_INFO_FILE
};
static struct option long_options[] = {
@@ -2790,6 +2771,8 @@ static struct option long_options[] = {
{"report-password", required_argument, 0, (int) OPT_REPORT_PASSWORD},
{"report-port", required_argument, 0, (int) OPT_REPORT_PORT},
{"rpl-recovery-rank", required_argument, 0, (int) OPT_RPL_RECOVERY_RANK},
+ {"relay-log", required_argument, 0, (int) OPT_RELAY_LOG},
+ {"relay-log-index", required_argument, 0, (int) OPT_RELAY_LOG_INDEX},
{"safe-mode", no_argument, 0, (int) OPT_SAFE},
{"safe-show-database", no_argument, 0, (int) OPT_SAFE_SHOW_DB},
{"safe-user-create", no_argument, 0, (int) OPT_SAFE_USER_CREATE},
@@ -2813,6 +2796,8 @@ static struct option long_options[] = {
{"skip-stack-trace", no_argument, 0, (int) OPT_SKIP_STACK_TRACE},
{"skip-symlink", no_argument, 0, (int) OPT_SKIP_SYMLINKS},
{"skip-thread-priority", no_argument, 0, (int) OPT_SKIP_PRIOR},
+ {"relay-log-info-file", required_argument, 0,
+ (int) OPT_RELAY_LOG_INFO_FILE},
{"slave-load-tmpdir", required_argument, 0, (int) OPT_SLAVE_LOAD_TMPDIR},
{"socket", required_argument, 0, (int) OPT_SOCKET},
{"sql-bin-update-same", no_argument, 0, (int) OPT_SQL_BIN_UPDATE_SAME},
@@ -3151,8 +3136,8 @@ struct show_var_st status_vars[]= {
{"Select_range", (char*) &select_range_count, SHOW_LONG},
{"Select_range_check", (char*) &select_range_check_count, SHOW_LONG},
{"Select_scan", (char*) &select_scan_count, SHOW_LONG},
- {"Slave_running", (char*) &slave_running, SHOW_BOOL},
{"Slave_open_temp_tables", (char*) &slave_open_temp_tables, SHOW_LONG},
+ {"Slave_running", (char*) 0, SHOW_SLAVE_RUNNING},
{"Slow_launch_threads", (char*) &slow_launch_threads, SHOW_LONG},
{"Slow_queries", (char*) &long_query_count, SHOW_LONG},
{"Sort_merge_passes", (char*) &filesort_merge_passes, SHOW_LONG},
@@ -3548,6 +3533,14 @@ static void get_options(int argc,char **argv)
opt_update_log=1;
opt_update_logname=optarg; // Use hostname.# if null
break;
+ case (int) OPT_RELAY_LOG_INDEX:
+ opt_relaylog_index_name = optarg;
+ break;
+ case (int) OPT_RELAY_LOG:
+ x_free(opt_relay_logname);
+ if (optarg && optarg[0])
+ opt_relay_logname=my_strdup(optarg,MYF(0));
+ break;
case (int) OPT_BIN_LOG_INDEX:
opt_binlog_index_name = optarg;
break;
@@ -4007,6 +4000,9 @@ static void get_options(int argc,char **argv)
case OPT_MASTER_INFO_FILE:
master_info_file=optarg;
break;
+ case OPT_RELAY_LOG_INFO_FILE:
+ relay_log_info_file=optarg;
+ break;
case OPT_MASTER_PORT:
master_port= atoi(optarg);
break;
diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc
index d846662947d..705ef58d0e7 100644
--- a/sql/repl_failsafe.cc
+++ b/sql/repl_failsafe.cc
@@ -199,7 +199,7 @@ void end_slave_list()
static int find_target_pos(LEX_MASTER_INFO* mi, IO_CACHE* log, char* errmsg)
{
- uint32 log_seq = mi->last_log_seq;
+ uint32 log_pos = mi->pos;
uint32 target_server_id = mi->server_id;
for (;;)
@@ -217,7 +217,7 @@ static int find_target_pos(LEX_MASTER_INFO* mi, IO_CACHE* log, char* errmsg)
return 1;
}
- if (ev->log_seq == log_seq && ev->server_id == target_server_id)
+ if (ev->log_pos == log_pos && ev->server_id == target_server_id)
{
delete ev;
mi->pos = my_b_tell(log);
@@ -527,7 +527,7 @@ pthread_handler_decl(handle_failsafe_rpl,arg)
const char* msg = thd->enter_cond(&COND_rpl_status,
&LOCK_rpl_status, "Waiting for request");
pthread_cond_wait(&COND_rpl_status, &LOCK_rpl_status);
- thd->proc_info="Processling request";
+ thd->proc_info="Processing request";
while (!break_req_chain)
{
switch (rpl_status)
@@ -630,10 +630,9 @@ static inline void cleanup_mysql_results(MYSQL_RES* db_res,
static inline int fetch_db_tables(THD* thd, MYSQL* mysql, const char* db,
- MYSQL_RES* table_res)
+ MYSQL_RES* table_res, MASTER_INFO* mi)
{
MYSQL_ROW row;
-
for( row = mc_mysql_fetch_row(table_res); row;
row = mc_mysql_fetch_row(table_res))
{
@@ -649,11 +648,9 @@ static inline int fetch_db_tables(THD* thd, MYSQL* mysql, const char* db,
if (!tables_ok(thd, &table))
continue;
}
-
- if ((error = fetch_nx_table(thd, db, table_name, &glob_mi, mysql)))
+ if ((error = fetch_master_table(thd, db, table_name, mi, mysql)))
return error;
}
-
return 0;
}
@@ -664,25 +661,26 @@ int load_master_data(THD* thd)
MYSQL_RES* master_status_res = 0;
bool slave_was_running = 0;
int error = 0;
-
+ const char* errmsg=0;
+ int restart_thread_mask;
mc_mysql_init(&mysql);
// we do not want anyone messing with the slave at all for the entire
// duration of the data load;
- pthread_mutex_lock(&LOCK_slave);
-
- // first, kill the slave
- if ((slave_was_running = slave_running))
+ LOCK_ACTIVE_MI;
+ lock_slave_threads(active_mi);
+ init_thread_mask(&restart_thread_mask,active_mi,0 /*not inverse*/);
+ if (restart_thread_mask &&
+ (error=terminate_slave_threads(active_mi,restart_thread_mask,
+ 1 /*skip lock*/)))
{
- abort_slave = 1;
- KICK_SLAVE;
- thd->proc_info = "waiting for slave to die";
- while (slave_running)
- pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done
+ send_error(&thd->net,error);
+ unlock_slave_threads(active_mi);
+ UNLOCK_ACTIVE_MI;
+ return 1;
}
-
-
- if (connect_to_master(thd, &mysql, &glob_mi))
+
+ if (connect_to_master(thd, &mysql, active_mi))
{
net_printf(&thd->net, error = ER_CONNECT_TO_MASTER,
mc_mysql_error(&mysql));
@@ -744,7 +742,7 @@ int load_master_data(THD* thd)
mess up and not exclude mysql database with the rules when
he actually means to - in this case, he is up for a surprise if
his priv tables get dropped and downloaded from master
- TO DO - add special option, not enabled
+ TODO - add special option, not enabled
by default, to allow inclusion of mysql database into load
data from master
*/
@@ -774,7 +772,7 @@ int load_master_data(THD* thd)
goto err;
}
- if ((error = fetch_db_tables(thd, &mysql, db, *cur_table_res)))
+ if ((error = fetch_db_tables(thd,&mysql,db,*cur_table_res,active_mi)))
{
// we do not report the error - fetch_db_tables handles it
cleanup_mysql_results(db_res, cur_table_res, table_res);
@@ -797,14 +795,15 @@ int load_master_data(THD* thd)
*/
if (row[0] && row[1])
{
- strmake(glob_mi.log_file_name, row[0], sizeof(glob_mi.log_file_name));
- glob_mi.pos = atoi(row[1]); // atoi() is ok, since offset is <= 1GB
- if (glob_mi.pos < 4)
- glob_mi.pos = 4; // don't hit the magic number
- glob_mi.pending = 0;
- flush_master_info(&glob_mi);
+ strmake(active_mi->master_log_name, row[0],
+ sizeof(active_mi->master_log_name));
+ // atoi() is ok, since offset is <= 1GB
+ active_mi->master_log_pos = atoi(row[1]);
+ if (active_mi->master_log_pos < 4)
+ active_mi->master_log_pos = 4; // don't hit the magic number
+ active_mi->rli.pending = 0;
+ flush_master_info(active_mi);
}
-
mc_mysql_free_result(master_status_res);
}
@@ -815,11 +814,35 @@ int load_master_data(THD* thd)
goto err;
}
}
+ thd->proc_info="purging old relay logs";
+ if (purge_relay_logs(&active_mi->rli,0 /* not only reset, but also reinit*/,
+ &errmsg))
+ {
+ send_error(&thd->net, 0, "Failed purging old relay logs");
+ unlock_slave_threads(active_mi);
+ UNLOCK_ACTIVE_MI;
+ return 1;
+ }
+ pthread_mutex_lock(&active_mi->rli.data_lock);
+ active_mi->rli.master_log_pos = active_mi->master_log_pos;
+ strnmov(active_mi->rli.master_log_name,active_mi->master_log_name,
+ sizeof(active_mi->rli.master_log_name));
+ pthread_cond_broadcast(&active_mi->rli.data_cond);
+ pthread_mutex_unlock(&active_mi->rli.data_lock);
+ thd->proc_info = "starting slave";
+ if (restart_thread_mask)
+ {
+ error=start_slave_threads(0 /* mutex not needed*/,
+ 1 /* wait for start*/,
+ active_mi,master_info_file,relay_log_info_file,
+ restart_thread_mask);
+ }
err:
- pthread_mutex_unlock(&LOCK_slave);
- if (slave_was_running)
- start_slave(0, 0);
+ unlock_slave_threads(active_mi);
+ UNLOCK_ACTIVE_MI;
+ thd->proc_info = 0;
+
mc_mysql_close(&mysql); // safe to call since we always do mc_mysql_init()
if (!error)
send_ok(&thd->net);
diff --git a/sql/slave.cc b/sql/slave.cc
index 700838d7cd7..e68741e7434 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -24,24 +24,23 @@
#include "repl_failsafe.h"
#include <thr_alarm.h>
#include <my_dir.h>
+#include <assert.h>
-volatile bool slave_running = 0;
+volatile bool slave_sql_running = 0, slave_io_running = 0;
char* slave_load_tmpdir = 0;
-pthread_t slave_real_id;
-MASTER_INFO glob_mi;
+MASTER_INFO main_mi;
+MASTER_INFO* active_mi;
+volatile int active_mi_in_use = 0;
HASH replicate_do_table, replicate_ignore_table;
DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table;
bool do_table_inited = 0, ignore_table_inited = 0;
bool wild_do_table_inited = 0, wild_ignore_table_inited = 0;
bool table_rules_on = 0;
-uint32 slave_skip_counter = 0;
static TABLE* save_temporary_tables = 0;
-THD* slave_thd = 0;
// when slave thread exits, we need to remember the temporary tables so we
// can re-use them on slave start
-int last_slave_errno = 0;
-char last_slave_error[MAX_SLAVE_ERRMSG] = "";
+// TODO: move the vars below under MASTER_INFO
#ifndef DBUG_OFF
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
static int events_till_disconnect = -1;
@@ -49,15 +48,17 @@ int events_till_abort = -1;
static int stuck_count = 0;
#endif
+typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
void skip_load_data_infile(NET* net);
-inline bool slave_killed(THD* thd);
-static int init_slave_thread(THD* thd);
+static inline bool slave_killed(THD* thd,MASTER_INFO* mi);
+static inline bool slave_killed(THD* thd,RELAY_LOG_INFO* rli);
+static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
bool reconnect);
-static int safe_sleep(THD* thd, int sec);
+static int safe_sleep(THD* thd, MASTER_INFO* mi, int sec);
static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db,
const char* table_name);
@@ -65,6 +66,79 @@ static int check_master_version(MYSQL* mysql, MASTER_INFO* mi);
char* rewrite_db(char* db);
+void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse)
+{
+ bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
+ if (inverse)
+ {
+ /* This makes me think of the Russian idiom "I am not I, and this is
+ not my horse", which is used to deny reponsibility for
+ one's actions.
+ */
+ set_io = !set_io;
+ set_sql = !set_sql;
+ }
+ register int tmp_mask=0;
+ if (set_io)
+ tmp_mask |= SLAVE_IO;
+ if (set_sql)
+ tmp_mask |= SLAVE_SQL;
+ *mask = tmp_mask;
+}
+
+void lock_slave_threads(MASTER_INFO* mi)
+{
+ //TODO: see if we can do this without dual mutex
+ pthread_mutex_lock(&mi->run_lock);
+ pthread_mutex_lock(&mi->rli.run_lock);
+}
+
+void unlock_slave_threads(MASTER_INFO* mi)
+{
+ //TODO: see if we can do this without dual mutex
+ pthread_mutex_unlock(&mi->rli.run_lock);
+ pthread_mutex_unlock(&mi->run_lock);
+}
+
+int init_slave()
+{
+ // TODO (multi-master): replace this with list initialization
+ active_mi = &main_mi;
+
+ // TODO: the code below is a copy-paste mess - clean it up
+ /*
+ make sure slave thread gets started if server_id is set,
+ valid master.info is present, and master_host has not been specified
+ */
+ if (server_id && !master_host)
+ {
+ // TODO: re-write this to interate through the list of files
+ // for multi-master
+ char fname[FN_REFLEN+128];
+ MY_STAT stat_area;
+ fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32);
+ if (my_stat(fname, &stat_area, MYF(0)) &&
+ !init_master_info(active_mi,master_info_file,relay_log_info_file))
+ master_host = active_mi->host;
+ }
+ // slave thread
+ if (master_host)
+ {
+ if (!opt_skip_slave_start && start_slave_threads(1 /* need mutex */,
+ 0 /* no wait for start*/,
+ active_mi,
+ master_info_file,
+ relay_log_info_file,
+ SLAVE_IO|SLAVE_SQL
+ ))
+ sql_print_error("Warning: Can't create threads to handle slave");
+ else if (opt_skip_slave_start)
+ if (init_master_info(active_mi, master_info_file, relay_log_info_file))
+ sql_print_error("Warning: failed to initialized master info");
+ }
+ return 0;
+}
+
static void free_table_ent(TABLE_RULE_ENT* e)
{
my_free((gptr) e, MYF(0));
@@ -77,6 +151,285 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
return (byte*)e->db;
}
+// TODO: check proper initialization of master_log_name/master_log_pos
+int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
+ ulonglong pos, bool need_data_lock,
+ const char** errmsg)
+{
+ if (rli->log_pos_current)
+ return 0;
+ pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
+ pthread_mutex_lock(log_lock);
+ if (need_data_lock)
+ pthread_mutex_lock(&rli->data_lock);
+
+ if (rli->cur_log_fd >= 0)
+ {
+ end_io_cache(&rli->cache_buf);
+ my_close(rli->cur_log_fd, MYF(MY_WME));
+ rli->cur_log_fd = -1;
+ }
+
+ if (!log)
+ log = rli->relay_log_name; // already inited
+ if (!pos)
+ pos = rli->relay_log_pos; // already inited
+ else
+ rli->relay_log_pos = pos;
+ if (rli->relay_log.find_first_log(&rli->linfo,log))
+ {
+ *errmsg="Could not find first log during relay log initialization";
+ goto err;
+ }
+ strnmov(rli->relay_log_name,rli->linfo.log_file_name,
+ sizeof(rli->relay_log_name));
+ // to make end_io_cache(&rli->cache_buf) safe in all cases
+ if (!rli->inited)
+ bzero((char*) &rli->cache_buf, sizeof(IO_CACHE));
+ if (rli->relay_log.is_active(rli->linfo.log_file_name))
+ {
+ if (my_b_tell((rli->cur_log=rli->relay_log.get_log_file())) == 0 &&
+ check_binlog_magic(rli->cur_log,errmsg))
+ {
+ goto err;
+ }
+ rli->cur_log_init_count=rli->cur_log->init_count;
+ }
+ else
+ {
+ if (rli->inited)
+ end_io_cache(&rli->cache_buf);
+ if (rli->cur_log_fd>=0)
+ my_close(rli->cur_log_fd,MYF(MY_WME));
+ if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
+ rli->linfo.log_file_name,errmsg)) < 0)
+ {
+ goto err;
+ }
+ rli->cur_log = &rli->cache_buf;
+ }
+ if (pos > 4)
+ my_b_seek(rli->cur_log,(off_t)pos);
+ rli->log_pos_current=1;
+err:
+ pthread_cond_broadcast(&rli->data_cond);
+ if (need_data_lock)
+ pthread_mutex_unlock(&rli->data_lock);
+ pthread_mutex_unlock(log_lock);
+ return (*errmsg) ? 1 : 0;
+}
+
+// we assume we have a run lock on rli and that the both slave thread
+// are not running
+int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg)
+{
+ if (!rli->inited)
+ return 0; /* successfully do nothing */
+ DBUG_ASSERT(rli->slave_running == 0);
+ DBUG_ASSERT(rli->mi->slave_running == 0);
+ int error=0;
+ rli->slave_skip_counter=0;
+ pthread_mutex_lock(&rli->data_lock);
+ rli->pending=0;
+ rli->master_log_name[0]=0;
+ rli->master_log_pos=0; // 0 means uninitialized
+ if (rli->relay_log.reset_logs(rli->sql_thd) ||
+ rli->relay_log.find_first_log(&rli->linfo,""))
+ {
+ *errmsg = "Failed during log reset";
+ error=1;
+ goto err;
+ }
+ strnmov(rli->relay_log_name,rli->linfo.log_file_name,
+ sizeof(rli->relay_log_name));
+ rli->relay_log_pos=4;
+ rli->log_pos_current=0;
+ if (!just_reset)
+ error = init_relay_log_pos(rli,0,0,0/*do not need data lock*/,errmsg);
+err:
+ pthread_mutex_unlock(&rli->data_lock);
+ return error;
+}
+
+int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock)
+{
+ if (!mi->inited)
+ return 0; /* successfully do nothing */
+ int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
+ pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
+ pthread_mutex_t *sql_cond_lock,*io_cond_lock;
+
+ sql_cond_lock=sql_lock;
+ io_cond_lock=io_lock;
+
+ if (skip_lock)
+ {
+ sql_lock = io_lock = 0;
+ }
+ if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) && mi->slave_running)
+ {
+ mi->abort_slave=1;
+ if ((error=terminate_slave_thread(mi->io_thd,io_lock,
+ io_cond_lock,
+ &mi->stop_cond,
+ &mi->slave_running)) &&
+ !force_all)
+ return error;
+ }
+ if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) && mi->rli.slave_running)
+ {
+ DBUG_ASSERT(mi->rli.sql_thd != 0) ;
+ mi->rli.abort_slave=1;
+ if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
+ sql_cond_lock,
+ &mi->rli.stop_cond,
+ &mi->rli.slave_running)) &&
+ !force_all)
+ return error;
+ }
+ return 0;
+}
+
+int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
+ pthread_mutex_t *cond_lock,
+ pthread_cond_t* term_cond,
+ volatile bool* slave_running)
+{
+ if (term_lock)
+ {
+ pthread_mutex_lock(term_lock);
+ if (!*slave_running)
+ {
+ pthread_mutex_unlock(term_lock);
+ return ER_SLAVE_NOT_RUNNING;
+ }
+ }
+ DBUG_ASSERT(thd != 0);
+ KICK_SLAVE(thd);
+ while (*slave_running)
+ {
+ /* there is a small chance that slave thread might miss the first
+ alarm. To protect againts it, resend the signal until it reacts
+ */
+ struct timespec abstime;
+#ifdef HAVE_TIMESPEC_TS_SEC
+ abstime.ts_sec=time(NULL)+2;
+ abstime.ts_nsec=0;
+#elif defined(__WIN__)
+ abstime.tv_sec=time((time_t*) 0)+2;
+ abstime.tv_nsec=0;
+#else
+ struct timeval tv;
+ gettimeofday(&tv,0);
+ abstime.tv_sec=tv.tv_sec+2;
+ abstime.tv_nsec=tv.tv_usec*1000;
+#endif
+ pthread_cond_timedwait(term_cond, cond_lock, &abstime);
+ if (*slave_running)
+ KICK_SLAVE(thd);
+ }
+ if (term_lock)
+ pthread_mutex_unlock(term_lock);
+ return 0;
+}
+
+int start_slave_thread(pthread_handler h_func, pthread_mutex_t* start_lock,
+ pthread_mutex_t *cond_lock,
+ pthread_cond_t* start_cond,
+ volatile bool* slave_running,
+ MASTER_INFO* mi)
+{
+ pthread_t th;
+ DBUG_ASSERT(mi->inited);
+ if (start_lock)
+ pthread_mutex_lock(start_lock);
+ if (!server_id)
+ {
+ if (start_cond)
+ pthread_cond_broadcast(start_cond);
+ if (start_lock)
+ pthread_mutex_unlock(start_lock);
+ sql_print_error("Server id not set, will not start slave");
+ return ER_BAD_SLAVE;
+ }
+
+ if (*slave_running)
+ {
+ if (start_cond)
+ pthread_cond_broadcast(start_cond);
+ if (start_lock)
+ pthread_mutex_unlock(start_lock);
+ return ER_SLAVE_MUST_STOP;
+ }
+ if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
+ {
+ if (start_lock)
+ pthread_mutex_unlock(start_lock);
+ return ER_SLAVE_THREAD;
+ }
+ if (start_cond && cond_lock)
+ {
+ THD* thd = current_thd;
+ while (!*slave_running)
+ {
+ const char* old_msg = thd->enter_cond(start_cond,cond_lock,
+ "Waiting for slave thread to start");
+ pthread_cond_wait(start_cond,cond_lock);
+ thd->exit_cond(old_msg);
+ // TODO: in a very rare case of init_slave_thread failing, it is
+ // possible that we can get stuck here since slave_running will not
+ // be set. We need to change slave_running to int and have -1 as
+ // error code
+ if (thd->killed)
+ {
+ pthread_mutex_unlock(cond_lock);
+ return ER_SERVER_SHUTDOWN;
+ }
+ }
+ }
+ if (start_lock)
+ pthread_mutex_unlock(start_lock);
+ return 0;
+}
+/* SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
+ sense to do that for starting a slave - we always care if it actually
+ started the threads that were not previously running
+*/
+int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
+ MASTER_INFO* mi, const char* master_info_fname,
+ const char* slave_info_fname, int thread_mask)
+{
+ pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
+ pthread_cond_t* cond_io=0,*cond_sql=0;
+ int error=0;
+
+ if (need_slave_mutex)
+ {
+ lock_io = &mi->run_lock;
+ lock_sql = &mi->rli.run_lock;
+ }
+ if (wait_for_start)
+ {
+ cond_io = &mi->start_cond;
+ cond_sql = &mi->rli.start_cond;
+ lock_cond_io = &mi->run_lock;
+ lock_cond_sql = &mi->rli.run_lock;
+ }
+ if (init_master_info(mi,master_info_fname,slave_info_fname))
+ return ER_MASTER_INFO;
+
+ if ((thread_mask & SLAVE_IO) &&
+ (error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
+ cond_io,&mi->slave_running,
+ mi)))
+ return error;
+ if ((thread_mask & SLAVE_SQL) &&
+ (error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
+ cond_sql,
+ &mi->rli.slave_running,mi)))
+ return error;
+ return 0;
+}
void init_table_rule_hash(HASH* h, bool* h_inited)
{
@@ -98,11 +451,11 @@ static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len)
uint i;
const char* key_end = key + len;
- for(i = 0; i < a->elements; i++)
+ for (i = 0; i < a->elements; i++)
{
TABLE_RULE_ENT* e ;
get_dynamic(a, (gptr)&e, i);
- if(!wild_case_compare(key, key_end, (const char*)e->db,
+ if (!wild_case_compare(key, key_end, (const char*)e->db,
(const char*)(e->db + e->key_len),'\\'))
return e;
}
@@ -126,7 +479,7 @@ int tables_ok(THD* thd, TABLE_LIST* tables)
if (hash_search(&replicate_do_table, (byte*) hash_key, len))
return 1;
}
- if (ignore_table_inited) // if there are any do's
+ if (ignore_table_inited) // if there are any ignores
{
if (hash_search(&replicate_ignore_table, (byte*) hash_key, len))
return 0;
@@ -191,44 +544,52 @@ static void free_string_array(DYNAMIC_ARRAY *a)
delete_dynamic(a);
}
+static int end_slave_on_walk(MASTER_INFO* mi, gptr /*unused*/)
+{
+ end_master_info(mi);
+ return 0;
+}
+
void end_slave()
{
- pthread_mutex_lock(&LOCK_slave);
- if (slave_running)
- {
- abort_slave = 1;
- thr_alarm_kill(slave_real_id);
-#ifdef SIGNAL_WITH_VIO_CLOSE
- slave_thd->close_active_vio();
-#endif
- while (slave_running)
- pthread_cond_wait(&COND_slave_stopped, &LOCK_slave);
- }
- pthread_mutex_unlock(&LOCK_slave);
-
- end_master_info(&glob_mi);
- if(do_table_inited)
+ // TODO: replace the line below with
+ // list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
+ // once multi-master code is ready
+ terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
+ end_master_info(active_mi);
+ if (do_table_inited)
hash_free(&replicate_do_table);
- if(ignore_table_inited)
+ if (ignore_table_inited)
hash_free(&replicate_ignore_table);
- if(wild_do_table_inited)
+ if (wild_do_table_inited)
free_string_array(&replicate_wild_do_table);
- if(wild_ignore_table_inited)
+ if (wild_ignore_table_inited)
free_string_array(&replicate_wild_ignore_table);
}
-inline bool slave_killed(THD* thd)
+static inline bool slave_killed(THD* thd, MASTER_INFO* mi)
{
- return abort_slave || abort_loop || thd->killed;
+ DBUG_ASSERT(mi->io_thd == thd);
+ DBUG_ASSERT(mi->slave_running == 1); // tracking buffer overrun
+ return mi->abort_slave || abort_loop || thd->killed;
}
-void slave_print_error(int err_code, const char* msg, ...)
+static inline bool slave_killed(THD* thd, RELAY_LOG_INFO* rli)
+{
+ DBUG_ASSERT(rli->sql_thd == thd);
+ DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
+ return rli->abort_slave || abort_loop || thd->killed;
+}
+
+void slave_print_error(RELAY_LOG_INFO* rli, int err_code, const char* msg, ...)
{
va_list args;
va_start(args,msg);
- my_vsnprintf(last_slave_error, sizeof(last_slave_error), msg, args);
- sql_print_error("Slave: %s, error_code=%d", last_slave_error, err_code);
- last_slave_errno = err_code;
+ my_vsnprintf(rli->last_slave_error,
+ sizeof(rli->last_slave_error), msg, args);
+ sql_print_error("Slave: %s, error_code=%d", rli->last_slave_error,
+ err_code);
+ rli->last_slave_errno = err_code;
}
void skip_load_data_infile(NET* net)
@@ -476,16 +837,16 @@ err:
return error;
}
-int fetch_nx_table(THD* thd, const char* db_name, const char* table_name,
+int fetch_master_table(THD* thd, const char* db_name, const char* table_name,
MASTER_INFO* mi, MYSQL* mysql)
{
int error = 1;
- int nx_errno = 0;
+ int fetch_errno = 0;
bool called_connected = (mysql != NULL);
if (!called_connected && !(mysql = mc_mysql_init(NULL)))
{
- sql_print_error("fetch_nx_table: Error in mysql_init()");
- nx_errno = ER_GET_ERRNO;
+ sql_print_error("fetch_master_table: Error in mysql_init()");
+ fetch_errno = ER_GET_ERRNO;
goto err;
}
@@ -495,17 +856,17 @@ int fetch_nx_table(THD* thd, const char* db_name, const char* table_name,
{
sql_print_error("Could not connect to master while fetching table\
'%-64s.%-64s'", db_name, table_name);
- nx_errno = ER_CONNECT_TO_MASTER;
+ fetch_errno = ER_CONNECT_TO_MASTER;
goto err;
}
}
- if (slave_killed(thd))
+ if (thd->killed)
goto err;
if (request_table_dump(mysql, db_name, table_name))
{
- nx_errno = ER_GET_ERRNO;
- sql_print_error("fetch_nx_table: failed on table dump request ");
+ fetch_errno = ER_GET_ERRNO;
+ sql_print_error("fetch_master_table: failed on table dump request ");
goto err;
}
@@ -513,24 +874,25 @@ int fetch_nx_table(THD* thd, const char* db_name, const char* table_name,
table_name))
{
// create_table_from_dump will have sent the error alread
- sql_print_error("fetch_nx_table: failed on create table ");
+ sql_print_error("fetch_master_table: failed on create table ");
goto err;
}
-
error = 0;
-
err:
if (mysql && !called_connected)
mc_mysql_close(mysql);
- if (nx_errno && thd->net.vio)
- send_error(&thd->net, nx_errno, "Error in fetch_nx_table");
+ if (fetch_errno && thd->net.vio)
+ send_error(&thd->net, fetch_errno, "Error in fetch_master_table");
thd->net.no_send_ok = 0; // Clear up garbage after create_table_from_dump
return error;
}
void end_master_info(MASTER_INFO* mi)
{
- if(mi->fd >= 0)
+ if (!mi->inited)
+ return;
+ end_relay_log_info(&mi->rli);
+ if (mi->fd >= 0)
{
end_io_cache(&mi->file);
(void)my_close(mi->fd, MYF(MY_WME));
@@ -539,21 +901,136 @@ void end_master_info(MASTER_INFO* mi)
mi->inited = 0;
}
-int init_master_info(MASTER_INFO* mi)
+int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
+{
+ if (rli->inited)
+ return 0;
+ MY_STAT stat_area;
+ char fname[FN_REFLEN+128];
+ int info_fd;
+ const char* msg = 0;
+ int error = 0;
+
+ fn_format(fname, info_fname,
+ mysql_data_home, "", 4+32);
+ pthread_mutex_lock(&rli->data_lock);
+ info_fd = rli->info_fd;
+ rli->pending = 0;
+ rli->cur_log_fd = -1;
+ rli->slave_skip_counter=0;
+ rli->log_pos_current=0;
+ // TODO: make this work with multi-master
+ if (!opt_relay_logname)
+ {
+ char tmp[FN_REFLEN];
+ /* TODO: The following should be using fn_format(); We just need to
+ first change fn_format() to cut the file name if it's too long.
+ */
+ strmake(tmp,glob_hostname,FN_REFLEN-5);
+ strmov(strcend(tmp,'.'),"-relay-bin");
+ opt_relay_logname=my_strdup(tmp,MYF(MY_WME));
+ }
+ rli->relay_log.set_index_file_name(opt_relaylog_index_name);
+ open_log(&rli->relay_log, glob_hostname, opt_relay_logname, "-relay-bin",
+ LOG_BIN, 1 /* read_append cache */,
+ 1 /* no auto events*/);
+
+ /* if file does not exist */
+ if (!my_stat(fname, &stat_area, MYF(0)))
+ {
+ // if someone removed the file from underneath our feet, just close
+ // the old descriptor and re-create the old file
+ if (info_fd >= 0)
+ my_close(info_fd, MYF(MY_WME));
+ if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0
+ || init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
+ MYF(MY_WME)))
+ {
+ if(info_fd >= 0)
+ my_close(info_fd, MYF(0));
+ rli->info_fd=-1;
+ pthread_mutex_unlock(&rli->data_lock);
+ return 1;
+ }
+ if (init_relay_log_pos(rli,"",4,0/*no data mutex*/,&msg))
+ goto err;
+ rli->master_log_pos = 0; // uninitialized
+ rli->info_fd = info_fd;
+ }
+ else // file exists
+ {
+ if(info_fd >= 0)
+ reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
+ else if((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0
+ || init_io_cache(&rli->info_file, info_fd,
+ IO_SIZE*2, READ_CACHE, 0L,
+ 0, MYF(MY_WME)))
+ {
+ if (info_fd >= 0)
+ my_close(info_fd, MYF(0));
+ rli->info_fd=-1;
+ pthread_mutex_unlock(&rli->data_lock);
+ return 1;
+ }
+
+ rli->info_fd = info_fd;
+ if (init_strvar_from_file(rli->relay_log_name,
+ sizeof(rli->relay_log_name), &rli->info_file,
+ (char*)"") ||
+ init_intvar_from_file((int*)&rli->relay_log_pos,
+ &rli->info_file, 4) ||
+ init_strvar_from_file(rli->master_log_name,
+ sizeof(rli->master_log_name), &rli->info_file,
+ (char*)"") ||
+ init_intvar_from_file((int*)&rli->master_log_pos,
+ &rli->info_file, 0))
+ {
+ msg="Error reading slave log configuration";
+ goto err;
+ }
+ if (init_relay_log_pos(rli,0 /*log already inited*/,
+ 0 /*pos already inited*/,
+ 0 /* no data lock*/,
+ &msg))
+ goto err;
+ }
+ DBUG_ASSERT(rli->relay_log_pos >= 4);
+ DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos);
+ rli->inited = 1;
+ // now change the cache from READ to WRITE - must do this
+ // before flush_relay_log_info
+ reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
+ error=test(flush_relay_log_info(rli));
+ pthread_mutex_unlock(&rli->data_lock);
+ return error;
+
+err:
+ sql_print_error(msg);
+ end_io_cache(&rli->info_file);
+ my_close(info_fd, MYF(0));
+ rli->info_fd=-1;
+ pthread_mutex_unlock(&rli->data_lock);
+ return 1;
+}
+
+int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
+ const char* slave_info_fname)
{
if (mi->inited)
return 0;
+ if (init_relay_log_info(&mi->rli, slave_info_fname))
+ return 1;
+ mi->rli.mi = mi;
int fd,length,error;
MY_STAT stat_area;
char fname[FN_REFLEN+128];
const char *msg;
- fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
+ fn_format(fname, master_info_fname, mysql_data_home, "", 4+32);
// we need a mutex while we are changing master info parameters to
// keep other threads from reading bogus info
- pthread_mutex_lock(&mi->lock);
- mi->pending = 0;
+ pthread_mutex_lock(&mi->data_lock);
fd = mi->fd;
// we do not want any messages if the file does not exist
@@ -569,11 +1046,13 @@ int init_master_info(MASTER_INFO* mi)
{
if(fd >= 0)
my_close(fd, MYF(0));
- pthread_mutex_unlock(&mi->lock);
+ mi->fd=-1;
+ end_relay_log_info(&mi->rli);
+ pthread_mutex_unlock(&mi->data_lock);
return 1;
}
- mi->log_file_name[0] = 0;
- mi->pos = 4; // skip magic number
+ mi->master_log_name[0] = 0;
+ mi->master_log_pos = 4; // skip magic number
mi->fd = fd;
if (master_host)
@@ -595,28 +1074,19 @@ int init_master_info(MASTER_INFO* mi)
{
if(fd >= 0)
my_close(fd, MYF(0));
- pthread_mutex_unlock(&mi->lock);
+ mi->fd=-1;
+ end_relay_log_info(&mi->rli);
+ pthread_mutex_unlock(&mi->data_lock);
return 1;
}
-
- if ((length=my_b_gets(&mi->file, mi->log_file_name,
- sizeof(mi->log_file_name))) < 1)
- {
- msg="Error reading log file name from master info file ";
- goto error;
- }
-
- mi->log_file_name[length-1]= 0; // kill \n
- /* Reuse fname buffer */
- if(!my_b_gets(&mi->file, fname, sizeof(fname)))
- {
- msg="Error reading log file position from master info file";
- goto error;
- }
- mi->pos = strtoull(fname,(char**) 0, 10);
mi->fd = fd;
- if(init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file,
+ if (init_strvar_from_file(mi->master_log_name,
+ sizeof(mi->master_log_name), &mi->file,
+ (char*)"") ||
+ init_intvar_from_file((int*)&mi->master_log_pos, &mi->file, 4)
+ ||
+ init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file,
master_host) ||
init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file,
master_user) ||
@@ -624,12 +1094,11 @@ int init_master_info(MASTER_INFO* mi)
master_password) ||
init_intvar_from_file((int*)&mi->port, &mi->file, master_port) ||
init_intvar_from_file((int*)&mi->connect_retry, &mi->file,
- master_connect_retry) ||
- init_intvar_from_file((int*)&mi->last_log_seq, &mi->file, 0)
+ master_connect_retry)
)
{
msg="Error reading master configuration";
- goto error;
+ goto err;
}
}
@@ -638,14 +1107,17 @@ int init_master_info(MASTER_INFO* mi)
// before flush_master_info
reinit_io_cache(&mi->file, WRITE_CACHE,0L,0,1);
error=test(flush_master_info(mi));
- pthread_mutex_unlock(&mi->lock);
+ pthread_mutex_unlock(&mi->data_lock);
return error;
-error:
+err:
sql_print_error(msg);
end_io_cache(&mi->file);
+ end_relay_log_info(&mi->rli);
+ DBUG_ASSERT(fd>=0);
my_close(fd, MYF(0));
- pthread_mutex_unlock(&mi->lock);
+ mi->fd=-1;
+ pthread_mutex_unlock(&mi->data_lock);
return 1;
}
@@ -654,14 +1126,14 @@ int register_slave_on_master(MYSQL* mysql)
String packet;
char buf[4];
- if(!report_host)
+ if (!report_host)
return 0;
int4store(buf, server_id);
packet.append(buf, 4);
net_store_data(&packet, report_host);
- if(report_user)
+ if (report_user)
net_store_data(&packet, report_user);
else
packet.append((char)0);
@@ -678,7 +1150,7 @@ int register_slave_on_master(MYSQL* mysql)
int4store(buf, 0); /* tell the master will fill in master_id */
packet.append(buf, 4);
- if(mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(),
+ if (mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(),
packet.length(), 0))
{
sql_print_error("Error on COM_REGISTER_SLAVE: '%s'",
@@ -689,52 +1161,61 @@ int register_slave_on_master(MYSQL* mysql)
return 0;
}
-
-int show_master_info(THD* thd)
+int show_master_info(THD* thd, MASTER_INFO* mi)
{
+ // TODO: fix this for multi-master
DBUG_ENTER("show_master_info");
List<Item> field_list;
field_list.push_back(new Item_empty_string("Master_Host",
- sizeof(glob_mi.host)));
+ sizeof(mi->host)));
field_list.push_back(new Item_empty_string("Master_User",
- sizeof(glob_mi.user)));
+ sizeof(mi->user)));
field_list.push_back(new Item_empty_string("Master_Port", 6));
field_list.push_back(new Item_empty_string("Connect_retry", 6));
- field_list.push_back( new Item_empty_string("Log_File",
+ field_list.push_back(new Item_empty_string("Master_Log_File",
FN_REFLEN));
- field_list.push_back(new Item_empty_string("Pos", 12));
- field_list.push_back(new Item_empty_string("Slave_Running", 3));
+ field_list.push_back(new Item_empty_string("Read_Master_Log_Pos", 12));
+ field_list.push_back(new Item_empty_string("Relay_Log_File",
+ FN_REFLEN));
+ field_list.push_back(new Item_empty_string("Relay_Log_Pos", 12));
+ field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
+ FN_REFLEN));
+ field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
+ field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
field_list.push_back(new Item_empty_string("Replicate_do_db", 20));
field_list.push_back(new Item_empty_string("Replicate_ignore_db", 20));
field_list.push_back(new Item_empty_string("Last_errno", 4));
field_list.push_back(new Item_empty_string("Last_error", 20));
field_list.push_back(new Item_empty_string("Skip_counter", 12));
- field_list.push_back(new Item_empty_string("Last_log_seq", 12));
+ field_list.push_back(new Item_empty_string("Exec_master_log_pos", 12));
if(send_fields(thd, field_list, 1))
DBUG_RETURN(-1);
String* packet = &thd->packet;
- uint32 last_log_seq;
packet->length(0);
- pthread_mutex_lock(&glob_mi.lock);
- net_store_data(packet, glob_mi.host);
- net_store_data(packet, glob_mi.user);
- net_store_data(packet, (uint32) glob_mi.port);
- net_store_data(packet, (uint32) glob_mi.connect_retry);
- net_store_data(packet, glob_mi.log_file_name);
- net_store_data(packet, (longlong) glob_mi.pos);
- last_log_seq = glob_mi.last_log_seq;
- pthread_mutex_unlock(&glob_mi.lock);
- pthread_mutex_lock(&LOCK_slave); // QQ; This is not needed
- net_store_data(packet, slave_running ? "Yes":"No");
- pthread_mutex_unlock(&LOCK_slave); // QQ; This is not needed
+ pthread_mutex_lock(&mi->data_lock);
+ pthread_mutex_lock(&mi->rli.data_lock);
+ net_store_data(packet, mi->host);
+ net_store_data(packet, mi->user);
+ net_store_data(packet, (uint32) mi->port);
+ net_store_data(packet, (uint32) mi->connect_retry);
+ net_store_data(packet, mi->master_log_name);
+ net_store_data(packet, (longlong) mi->master_log_pos);
+ net_store_data(packet, mi->rli.relay_log_name +
+ dirname_length(mi->rli.relay_log_name));
+ net_store_data(packet, (longlong) mi->rli.relay_log_pos);
+ net_store_data(packet, mi->rli.master_log_name);
+ net_store_data(packet, mi->slave_running ? "Yes":"No");
+ net_store_data(packet, mi->rli.slave_running ? "Yes":"No");
net_store_data(packet, &replicate_do_db);
net_store_data(packet, &replicate_ignore_db);
- net_store_data(packet, (uint32)last_slave_errno);
- net_store_data(packet, last_slave_error);
- net_store_data(packet, slave_skip_counter);
- net_store_data(packet, last_log_seq);
+ net_store_data(packet, (uint32)mi->rli.last_slave_errno);
+ net_store_data(packet, mi->rli.last_slave_error);
+ net_store_data(packet, mi->rli.slave_skip_counter);
+ net_store_data(packet, (longlong)mi->rli.master_log_pos);
+ pthread_mutex_unlock(&mi->rli.data_lock);
+ pthread_mutex_unlock(&mi->data_lock);
if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
DBUG_RETURN(-1);
@@ -747,58 +1228,64 @@ int flush_master_info(MASTER_INFO* mi)
{
IO_CACHE* file = &mi->file;
char lbuf[22];
- char lbuf1[22];
my_b_seek(file, 0L);
my_b_printf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n",
- mi->log_file_name, llstr(mi->pos, lbuf), mi->host, mi->user,
- mi->password, mi->port, mi->connect_retry,
- llstr(mi->last_log_seq, lbuf1));
+ mi->master_log_name, llstr(mi->master_log_pos, lbuf),
+ mi->host, mi->user,
+ mi->password, mi->port, mi->connect_retry
+ );
flush_io_cache(file);
return 0;
}
-int st_master_info::wait_for_pos(THD* thd, String* log_name, ulonglong log_pos)
+/* TODO: the code below needs to be re-written almost from scratch
+ Main issue is how to find out if we have reached a certain position
+ in the master log my knowing the offset in the relay log.
+ */
+int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
+ ulonglong log_pos)
{
if (!inited) return -1;
- bool pos_reached;
+ bool pos_reached = 0;
int event_count = 0;
- pthread_mutex_lock(&lock);
- while(!thd->killed)
+ pthread_mutex_lock(&data_lock);
+ while (!thd->killed)
{
int cmp_result;
- if (*log_file_name)
+ DBUG_ASSERT(*master_log_name || master_log_pos == 0);
+ if (*master_log_name)
{
/*
We should use dirname_length() here when we have a version of
this that doesn't modify the argument */
- char *basename = strrchr(log_file_name, FN_LIBCHAR);
+ char *basename = strrchr(master_log_name, FN_LIBCHAR);
if (basename)
++basename;
else
- basename = log_file_name;
+ basename = master_log_name;
cmp_result = strncmp(basename, log_name->ptr(),
log_name->length());
}
else
cmp_result = 0;
- pos_reached = ((!cmp_result && pos >= log_pos) || cmp_result > 0);
+ pos_reached = ((!cmp_result && master_log_pos >= log_pos) ||
+ cmp_result > 0);
if (pos_reached || thd->killed)
break;
- const char* msg = thd->enter_cond(&cond, &lock,
+ const char* msg = thd->enter_cond(&data_cond, &data_lock,
"Waiting for master update");
- pthread_cond_wait(&cond, &lock);
+ pthread_cond_wait(&data_cond, &data_lock);
thd->exit_cond(msg);
event_count++;
}
- pthread_mutex_unlock(&lock);
+ pthread_mutex_unlock(&data_lock);
return thd->killed ? -1 : event_count;
}
-
-static int init_slave_thread(THD* thd)
+static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
{
DBUG_ENTER("init_slave_thread");
thd->system_thread = thd->bootstrap = 1;
@@ -812,7 +1299,7 @@ static int init_slave_thread(THD* thd)
thd->options = (((opt_log_slave_updates) ? OPTION_BIN_LOG:0) | OPTION_AUTO_IS_NULL) ;
thd->system_thread = 1;
thd->client_capabilities = CLIENT_LOCAL_FILES;
- slave_real_id=thd->real_id=pthread_self();
+ thd->real_id=pthread_self();
pthread_mutex_lock(&LOCK_thread_count);
thd->thread_id = thread_id++;
pthread_mutex_unlock(&LOCK_thread_count);
@@ -822,7 +1309,6 @@ static int init_slave_thread(THD* thd)
my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) ||
my_pthread_setspecific_ptr(THR_NET, &thd->net))
{
- close_connection(&thd->net,ER_OUT_OF_RESOURCES); // is this needed?
end_thread(thd,0);
DBUG_RETURN(-1);
}
@@ -839,14 +1325,21 @@ static int init_slave_thread(THD* thd)
if (thd->max_join_size == (ulong) ~0L)
thd->options |= OPTION_BIG_SELECTS;
- thd->proc_info="Waiting for master update";
+ if (thd_type == SLAVE_THD_SQL)
+ {
+ thd->proc_info = "Waiting for the next event in slave queue";
+ }
+ else
+ {
+ thd->proc_info="Waiting for master update";
+ }
thd->version=refresh_version;
thd->set_time();
DBUG_RETURN(0);
}
-static int safe_sleep(THD* thd, int sec)
+static int safe_sleep(THD* thd, MASTER_INFO* mi, int sec)
{
thr_alarm_t alarmed;
thr_alarm_init(&alarmed);
@@ -869,21 +1362,20 @@ static int safe_sleep(THD* thd, int sec)
if (thr_alarm_in_use(&alarmed))
thr_end_alarm(&alarmed);
- if (slave_killed(thd))
+ if (slave_killed(thd,mi))
return 1;
start_time=time((time_t*) 0);
}
return 0;
}
-
static int request_dump(MYSQL* mysql, MASTER_INFO* mi)
{
char buf[FN_REFLEN + 10];
int len;
int binlog_flags = 0; // for now
- char* logname = mi->log_file_name;
- int4store(buf, mi->pos);
+ char* logname = mi->master_log_name;
+ int4store(buf, mi->master_log_pos);
int2store(buf + 4, binlog_flags);
int4store(buf + 6, server_id);
len = (uint) strlen(logname);
@@ -929,7 +1421,6 @@ command");
return 0;
}
-
static ulong read_event(MYSQL* mysql, MASTER_INFO *mi)
{
ulong len = packet_error;
@@ -944,13 +1435,13 @@ static ulong read_event(MYSQL* mysql, MASTER_INFO *mi)
return packet_error;
#endif
- while (!abort_loop && !abort_slave && len == packet_error &&
+ while (!abort_loop && !mi->abort_slave && len == packet_error &&
read_errno == EINTR )
{
len = mc_net_safe_read(mysql);
read_errno = errno;
}
- if (abort_loop || abort_slave)
+ if (abort_loop || mi->abort_slave)
return packet_error;
if (len == packet_error || (long) len < 1)
{
@@ -973,65 +1464,74 @@ server_errno=%d)",
return len - 1;
}
-int check_expected_error(THD* thd, int expected_error)
+int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int expected_error)
{
- switch(expected_error)
+ switch (expected_error)
{
case ER_NET_READ_ERROR:
case ER_NET_ERROR_ON_WRITE:
case ER_SERVER_SHUTDOWN:
case ER_NEW_ABORTING_CONNECTION:
- my_snprintf(last_slave_error, sizeof(last_slave_error),
+ my_snprintf(rli->last_slave_error, sizeof(rli->last_slave_error),
"Slave: query '%s' partially completed on the master \
and was aborted. There is a chance that your master is inconsistent at this \
point. If you are sure that your master is ok, run this query manually on the\
slave and then restart the slave with SET SQL_SLAVE_SKIP_COUNTER=1;\
SLAVE START;", thd->query);
- last_slave_errno = expected_error;
- sql_print_error("%s",last_slave_error);
+ rli->last_slave_errno = expected_error;
+ sql_print_error("%s",rli->last_slave_error);
return 1;
default:
return 0;
}
}
-static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
+static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
{
const char *error_msg;
- Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1,
- event_len, &error_msg,
- mi->old_format);
+ DBUG_ASSERT(rli->sql_thd==thd);
+ Log_event * ev = next_event(rli);
+ DBUG_ASSERT(rli->sql_thd==thd);
+ if (slave_killed(thd,rli))
+ return 1;
if (ev)
{
int type_code = ev->get_type_code();
int exec_res;
+ pthread_mutex_lock(&rli->data_lock);
if (ev->server_id == ::server_id ||
- (slave_skip_counter && type_code != ROTATE_EVENT))
+ (rli->slave_skip_counter && type_code != ROTATE_EVENT))
{
- if(type_code == LOAD_EVENT)
- skip_load_data_infile(net);
-
- mi->inc_pos(event_len, ev->log_seq);
- flush_master_info(mi);
- if(slave_skip_counter && /* protect against common user error of
+ /*
+ TODO: I/O thread must handle skipping file delivery for
+ old load data infile events
+ */
+ /* TODO: I/O thread should not even log events with the same server id */
+ rli->inc_pos(ev->get_event_len(),
+ type_code != STOP_EVENT ? ev->log_pos : 0,
+ 1/* skip lock*/);
+ flush_relay_log_info(rli);
+ if (rli->slave_skip_counter && /* protect against common user error of
setting the counter to 1 instead of 2
while recovering from an failed
auto-increment insert */
- !(type_code == INTVAR_EVENT &&
- slave_skip_counter == 1))
- --slave_skip_counter;
+ !((type_code == INTVAR_EVENT || type_code == STOP_EVENT) &&
+ rli->slave_skip_counter == 1))
+ --rli->slave_skip_counter;
+ pthread_mutex_unlock(&rli->data_lock);
delete ev;
return 0; // avoid infinite update loops
}
+ pthread_mutex_unlock(&rli->data_lock);
thd->server_id = ev->server_id; // use the original server id for logging
thd->set_time(); // time the query
- if(!thd->log_seq)
- thd->log_seq = ev->log_seq;
if (!ev->when)
ev->when = time(NULL);
ev->thd = thd;
- exec_res = ev->exec_event(mi);
+ thd->log_pos = ev->log_pos;
+ exec_res = ev->exec_event(rli);
+ DBUG_ASSERT(rli->sql_thd==thd);
delete ev;
return exec_res;
}
@@ -1044,167 +1544,148 @@ This may also be a network problem, or just a bug in the master or slave code.\
return 1;
}
}
-
-// slave thread
-pthread_handler_decl(handle_slave,arg __attribute__((unused)))
+/* slave I/O thread */
+pthread_handler_decl(handle_slave_io,arg)
{
#ifndef DBUG_OFF
slave_begin:
#endif
THD *thd; // needs to be first for thread_stack
MYSQL *mysql = NULL ;
+ MASTER_INFO* mi = (MASTER_INFO*)arg;
char llbuff[22];
-
- pthread_mutex_lock(&LOCK_slave);
- if (!server_id)
- {
- pthread_cond_broadcast(&COND_slave_start);
- pthread_mutex_unlock(&LOCK_slave);
- sql_print_error("Server id not set, will not start slave");
- pthread_exit((void*)1);
- }
+ bool retried_once = 0;
+ ulonglong last_failed_pos = 0; // TODO: see if last_failed_pos is needed
+ DBUG_ASSERT(mi->inited);
- if(slave_running)
- {
- pthread_cond_broadcast(&COND_slave_start);
- pthread_mutex_unlock(&LOCK_slave);
- pthread_exit((void*)1); // safety just in case
- }
- slave_running = 1;
- abort_slave = 0;
+ pthread_mutex_lock(&mi->run_lock);
#ifndef DBUG_OFF
- events_till_abort = abort_slave_event_count;
+ mi->events_till_abort = abort_slave_event_count;
#endif
- pthread_cond_broadcast(&COND_slave_start);
- pthread_mutex_unlock(&LOCK_slave);
-
- // int error = 1;
- bool retried_once = 0;
- ulonglong last_failed_pos = 0;
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init();
- slave_thd = thd = new THD; // note that contructor of THD uses DBUG_ !
- DBUG_ENTER("handle_slave");
+ thd = new THD; // note that contructor of THD uses DBUG_ !
+ DBUG_ENTER("handle_slave_io");
pthread_detach_this_thread();
- if (init_slave_thread(thd) || init_master_info(&glob_mi))
+ if (init_slave_thread(thd, SLAVE_THD_IO))
{
- sql_print_error("Failed during slave thread initialization");
+ pthread_cond_broadcast(&mi->start_cond);
+ pthread_mutex_unlock(&mi->run_lock);
+ sql_print_error("Failed during slave I/O thread initialization");
goto err;
}
+ mi->io_thd = thd;
thd->thread_stack = (char*)&thd; // remember where our stack is
- thd->temporary_tables = save_temporary_tables; // restore temp tables
threads.append(thd);
- glob_mi.pending = 0; //this should always be set to 0 when the slave thread
- // is started
+ mi->slave_running = 1;
+ mi->abort_slave = 0;
+ pthread_cond_broadcast(&mi->start_cond);
+ pthread_mutex_unlock(&mi->run_lock);
DBUG_PRINT("info",("master info: log_file_name=%s, position=%s",
- glob_mi.log_file_name, llstr(glob_mi.pos,llbuff)));
-
+ mi->master_log_name, llstr(mi->master_log_pos,llbuff)));
if (!(mysql = mc_mysql_init(NULL)))
{
- sql_print_error("Slave thread: error in mc_mysql_init()");
+ sql_print_error("Slave I/O thread: error in mc_mysql_init()");
goto err;
}
thd->proc_info = "connecting to master";
#ifndef DBUG_OFF
- sql_print_error("Slave thread initialized");
+ sql_print_error("Slave I/O thread initialized");
#endif
// we can get killed during safe_connect
- if (!safe_connect(thd, mysql, &glob_mi))
- sql_print_error("Slave: connected to master '%s@%s:%d',\
- replication started in log '%s' at position %s", glob_mi.user,
- glob_mi.host, glob_mi.port,
- RPL_LOG_NAME,
- llstr(glob_mi.pos,llbuff));
+ if (!safe_connect(thd, mysql, mi))
+ sql_print_error("Slave I/O thread: connected to master '%s@%s:%d',\
+ replication started in log '%s' at position %s", mi->user,
+ mi->host, mi->port,
+ IO_RPL_LOG_NAME,
+ llstr(mi->master_log_pos,llbuff));
else
{
- sql_print_error("Slave thread killed while connecting to master");
+ sql_print_error("Slave I/O thread killed while connecting to master");
goto err;
}
connected:
thd->slave_net = &mysql->net;
- // register ourselves with the master
- // if fails, this is not fatal - we just print the error message and go
- // on with life
thd->proc_info = "Checking master version";
- if (check_master_version(mysql, &glob_mi))
+ if (check_master_version(mysql, mi))
{
goto err;
}
- if (!glob_mi.old_format)
+ if (!mi->old_format)
{
+ // register ourselves with the master
+ // if fails, this is not fatal - we just print the error message and go
+ // on with life
thd->proc_info = "Registering slave on master";
if (register_slave_on_master(mysql) || update_slave_list(mysql))
goto err;
}
- while (!slave_killed(thd))
+ while (!slave_killed(thd,mi))
{
thd->proc_info = "Requesting binlog dump";
- if(request_dump(mysql, &glob_mi))
+ if (request_dump(mysql, mi))
{
sql_print_error("Failed on request_dump()");
- if(slave_killed(thd))
+ if(slave_killed(thd,mi))
{
- sql_print_error("Slave thread killed while requesting master \
+ sql_print_error("Slave I/O thread killed while requesting master \
dump");
goto err;
}
thd->proc_info = "Waiiting to reconnect after a failed dump request";
- if(mysql->net.vio)
- vio_close(mysql->net.vio);
+ mc_end_server(mysql);
// first time retry immediately, assuming that we can recover
// right away - if first time fails, sleep between re-tries
// hopefuly the admin can fix the problem sometime
- if(retried_once)
- safe_sleep(thd, glob_mi.connect_retry);
+ if (retried_once)
+ safe_sleep(thd, mi, mi->connect_retry);
else
retried_once = 1;
- if(slave_killed(thd))
+ if (slave_killed(thd,mi))
{
- sql_print_error("Slave thread killed while retrying master \
+ sql_print_error("Slave I/O thread killed while retrying master \
dump");
goto err;
}
thd->proc_info = "Reconnecting after a failed dump request";
- last_failed_pos=glob_mi.pos;
- sql_print_error("Slave: failed dump request, reconnecting to \
-try again, log '%s' at postion %s", RPL_LOG_NAME,
- llstr(last_failed_pos,llbuff));
- if(safe_reconnect(thd, mysql, &glob_mi) || slave_killed(thd))
+ sql_print_error("Slave I/O thread: failed dump request, \
+reconnecting to try again, log '%s' at postion %s", IO_RPL_LOG_NAME,
+ llstr(mi->master_log_pos,llbuff));
+ if (safe_reconnect(thd, mysql, mi) || slave_killed(thd,mi))
{
- sql_print_error("Slave thread killed during or after reconnect");
+ sql_print_error("Slave I/O thread killed during or \
+after reconnect");
goto err;
}
goto connected;
}
-
- while(!slave_killed(thd))
+ while (!slave_killed(thd,mi))
{
thd->proc_info = "Reading master update";
- ulong event_len = read_event(mysql, &glob_mi);
- if(slave_killed(thd))
+ ulong event_len = read_event(mysql, mi);
+ if (slave_killed(thd,mi))
{
- sql_print_error("Slave thread killed while reading event");
+ sql_print_error("Slave I/O thread killed while reading event");
goto err;
}
-
if (event_len == packet_error)
{
- if(mc_mysql_errno(mysql) == ER_NET_PACKET_TOO_LARGE)
+ if (mc_mysql_errno(mysql) == ER_NET_PACKET_TOO_LARGE)
{
sql_print_error("Log entry on master is longer than \
max_allowed_packet on slave. Slave thread will be aborted. If the entry is \
@@ -1214,99 +1695,72 @@ max_allowed_packet. The current value is %ld", max_allowed_packet);
}
thd->proc_info = "Waiting to reconnect after a failed read";
- if(mysql->net.vio)
- vio_close(mysql->net.vio);
- if(retried_once) // punish repeat offender with sleep
- safe_sleep(thd, glob_mi.connect_retry);
+ mc_end_server(mysql);
+ if (retried_once) // punish repeat offender with sleep
+ safe_sleep(thd,mi,mi->connect_retry);
else
retried_once = 1;
- if(slave_killed(thd))
+ if (slave_killed(thd,mi))
{
- sql_print_error("Slave thread killed while waiting to \
+ sql_print_error("Slave I/O thread killed while waiting to \
reconnect after a failed read");
goto err;
}
thd->proc_info = "Reconnecting after a failed read";
- last_failed_pos= glob_mi.pos;
- sql_print_error("Slave: Failed reading log event, \
-reconnecting to retry, log '%s' position %s", RPL_LOG_NAME,
- llstr(last_failed_pos, llbuff));
- if(safe_reconnect(thd, mysql, &glob_mi) || slave_killed(thd))
+ sql_print_error("Slave I/O thread: Failed reading log event, \
+reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME,
+ llstr(mi->master_log_pos, llbuff));
+ if (safe_reconnect(thd, mysql, mi) || slave_killed(thd,mi))
{
- sql_print_error("Slave thread killed during or after a \
+ sql_print_error("Slave I/O thread killed during or after a \
reconnect done to recover from failed read");
goto err;
}
-
goto connected;
} // if(event_len == packet_error)
- thd->proc_info = "Processing master log event";
- if(exec_event(thd, &mysql->net, &glob_mi, event_len))
- {
- sql_print_error("\
-Error running query, slave aborted. Fix the problem, and re-start \
-the slave thread with \"mysqladmin start-slave\". We stopped at log \
-'%s' position %s",
- RPL_LOG_NAME, llstr(glob_mi.pos, llbuff));
- goto err;
- // there was an error running the query
- // abort the slave thread, when the problem is fixed, the user
- // should restart the slave with mysqladmin start-slave
- }
+ thd->proc_info = "Queueing event from master";
+ if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
+ (uint)event_len))
+ {
+ sql_print_error("Slave I/O thread could not queue event \
+from master");
+ goto err;
+ }
+ // TODO: check debugging abort code
#ifndef DBUG_OFF
- if(abort_slave_event_count && !--events_till_abort)
+ if (abort_slave_event_count && !--events_till_abort)
{
- sql_print_error("Slave: debugging abort");
+ sql_print_error("Slave I/O thread: debugging abort");
goto err;
}
#endif
-
- // successful exec with offset advance,
- // the slave repents and his sins are forgiven!
- if(glob_mi.pos > last_failed_pos)
- {
- retried_once = 0;
-#ifndef DBUG_OFF
- stuck_count = 0;
-#endif
- }
-#ifndef DBUG_OFF
- else
- {
- // show a little mercy, allow slave to read one more event
- // before cutting him off - otherwise he gets stuck
- // on Intvar events, since they do not advance the offset
- // immediately
- if (++stuck_count > 2)
- events_till_disconnect++;
- }
-#endif
- } // while(!slave_killed(thd)) - read/exec loop
- } // while(!slave_killed(thd)) - slave loop
+ } // while(!slave_killed(thd,mi)) - read/exec loop
+ } // while(!slave_killed(thd,mi)) - slave loop
// error = 0;
err:
- // print the current replication position
- sql_print_error("Slave thread exiting, replication stopped in log '%s' at \
-position %s",
- RPL_LOG_NAME, llstr(glob_mi.pos,llbuff));
+ // print the current replication position
+ sql_print_error("Slave I/O thread exiting, read up to log '%s', position %s",
+ IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
thd->query = thd->db = 0; // extra safety
if(mysql)
mc_mysql_close(mysql);
thd->proc_info = "Waiting for slave mutex on exit";
- pthread_mutex_lock(&LOCK_slave);
- slave_running = 0;
+ pthread_mutex_lock(&mi->run_lock);
+ mi->slave_running = 0;
+ mi->io_thd = 0;
+ // TODO: make rpl_status part of MASTER_INFO
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
- abort_slave = 0;
- save_temporary_tables = thd->temporary_tables;
- thd->temporary_tables = 0; // remove tempation from destructor to close them
- pthread_cond_broadcast(&COND_slave_stopped); // tell the world we are done
- pthread_mutex_unlock(&LOCK_slave);
+ mi->abort_slave = 0; // TODO: check if this is needed
+ DBUG_ASSERT(thd->net.buff != 0);
net_end(&thd->net); // destructor will not free it, because we are weird
- slave_thd = 0;
+ pthread_mutex_lock(&LOCK_thread_count);
delete thd;
+ pthread_mutex_unlock(&LOCK_thread_count);
+ pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
+ pthread_mutex_unlock(&mi->run_lock);
my_thread_end();
#ifndef DBUG_OFF
if(abort_slave_event_count && !events_till_abort)
@@ -1316,9 +1770,185 @@ position %s",
DBUG_RETURN(0); // Can't return anything here
}
+/* slave SQL logic thread */
-/* try to connect until successful or slave killed */
+pthread_handler_decl(handle_slave_sql,arg)
+{
+#ifndef DBUG_OFF
+ slave_begin:
+#endif
+ THD *thd; /* needs to be first for thread_stack */
+ MYSQL *mysql = NULL ;
+ bool retried_once = 0;
+ ulonglong last_failed_pos = 0; // TODO: see if this can be removed
+ char llbuff[22],llbuff1[22];
+ RELAY_LOG_INFO* rli = &((MASTER_INFO*)arg)->rli;
+ const char* errmsg=0;
+ DBUG_ASSERT(rli->inited);
+ pthread_mutex_lock(&rli->run_lock);
+ DBUG_ASSERT(!rli->slave_running);
+#ifndef DBUG_OFF
+ rli->events_till_abort = abort_slave_event_count;
+#endif
+
+
+ // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
+ my_thread_init();
+ thd = new THD; // note that contructor of THD uses DBUG_ !
+ DBUG_ENTER("handle_slave_sql");
+
+ pthread_detach_this_thread();
+ if (init_slave_thread(thd, SLAVE_THD_SQL))
+ {
+ // TODO: this is currently broken - slave start and change master
+ // will be stuck if we fail here
+ pthread_cond_broadcast(&rli->start_cond);
+ pthread_mutex_unlock(&rli->run_lock);
+ sql_print_error("Failed during slave thread initialization");
+ goto err;
+ }
+ thd->thread_stack = (char*)&thd; // remember where our stack is
+ thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
+ threads.append(thd);
+ rli->sql_thd = thd;
+ rli->slave_running = 1;
+ rli->abort_slave = 0;
+ pthread_cond_broadcast(&rli->start_cond);
+ pthread_mutex_unlock(&rli->run_lock);
+ rli->pending = 0; //this should always be set to 0 when the slave thread
+ // is started
+ if (init_relay_log_pos(rli,0,0,1/*need data lock*/,&errmsg))
+ {
+ sql_print_error("Error initializing relay log position: %s",
+ errmsg);
+ goto err;
+ }
+ DBUG_ASSERT(rli->relay_log_pos >= 4);
+ DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos);
+
+ DBUG_PRINT("info",("master info: log_file_name=%s, position=%s",
+ rli->master_log_name, llstr(rli->master_log_pos,llbuff)));
+ DBUG_ASSERT(rli->sql_thd == thd);
+ sql_print_error("Slave SQL thread initialized, starting replication in \
+log '%s' at position %s,relay log: name='%s',pos='%s'", RPL_LOG_NAME,
+ llstr(rli->master_log_pos,llbuff),rli->relay_log_name,
+ llstr(rli->relay_log_pos,llbuff1));
+ while (!slave_killed(thd,rli))
+ {
+ thd->proc_info = "Processing master log event";
+ DBUG_ASSERT(rli->sql_thd == thd);
+ if (exec_relay_log_event(thd,rli))
+ {
+ // do not scare the user if SQL thread was simply killed or stopped
+ if (!slave_killed(thd,rli))
+ sql_print_error("\
+Error running query, slave SQL thread aborted. Fix the problem, and restart \
+the slave SQL thread with \"mysqladmin start-slave\". We stopped at log \
+'%s' position %s",
+ RPL_LOG_NAME, llstr(rli->master_log_pos, llbuff));
+ goto err;
+ }
+ } // while(!slave_killed(thd,rli)) - read/exec loop
+
+ // error = 0;
+ err:
+ // print the current replication position
+ sql_print_error("Slave SQL thread exiting, replication stopped in log \
+ '%s' at position %s",
+ RPL_LOG_NAME, llstr(rli->master_log_pos,llbuff));
+ thd->query = thd->db = 0; // extra safety
+ thd->proc_info = "Waiting for slave mutex on exit";
+ pthread_mutex_lock(&rli->run_lock);
+ DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun
+ rli->slave_running = 0;
+ rli->save_temporary_tables = thd->temporary_tables;
+ //TODO: see if we can do this conditionally in next_event() instead
+ // to avoid unneeded position re-init
+ rli->log_pos_current=0;
+ thd->temporary_tables = 0; // remove tempation from destructor to close them
+ DBUG_ASSERT(thd->net.buff != 0);
+ net_end(&thd->net); // destructor will not free it, because we are weird
+ DBUG_ASSERT(rli->sql_thd == thd);
+ rli->sql_thd = 0;
+ pthread_mutex_lock(&LOCK_thread_count);
+ delete thd;
+ pthread_mutex_unlock(&LOCK_thread_count);
+ pthread_cond_broadcast(&rli->stop_cond);
+ // tell the world we are done
+ pthread_mutex_unlock(&rli->run_lock);
+ my_thread_end();
+#ifndef DBUG_OFF // TODO: reconsider the code below
+ if (abort_slave_event_count && !rli->events_till_abort)
+ goto slave_begin;
+#endif
+ pthread_exit(0);
+ DBUG_RETURN(0); // Can't return anything here
+}
+int queue_event(MASTER_INFO* mi,const char* buf,uint event_len)
+{
+ int error;
+ bool inc_pos = 1;
+ if (mi->old_format)
+ return 1; // TODO: deal with old format
+
+ switch (buf[EVENT_TYPE_OFFSET])
+ {
+ case ROTATE_EVENT:
+ {
+ Rotate_log_event rev(buf,event_len,0);
+ if (!rev.is_valid())
+ return 1;
+ DBUG_ASSERT(rev.ident_len<sizeof(mi->master_log_name));
+ memcpy(mi->master_log_name,rev.new_log_ident,
+ rev.ident_len);
+ mi->master_log_name[rev.ident_len] = 0;
+ mi->master_log_pos = rev.pos;
+ inc_pos = 0;
+#ifndef DBUG_OFF
+ /* if we do not do this, we will be getting the first
+ rotate event forever, so
+ we need to not disconnect after one
+ */
+ if (disconnect_slave_event_count)
+ events_till_disconnect++;
+#endif
+ break;
+ }
+ default:
+ break;
+ }
+
+ if (!(error = mi->rli.relay_log.appendv(buf,event_len,0)))
+ {
+ if (inc_pos)
+ mi->master_log_pos += event_len;
+ }
+ return error;
+}
+
+void end_relay_log_info(RELAY_LOG_INFO* rli)
+{
+ if (!rli->inited)
+ return;
+ if (rli->info_fd >= 0)
+ {
+ end_io_cache(&rli->info_file);
+ (void)my_close(rli->info_fd, MYF(MY_WME));
+ rli->info_fd = -1;
+ }
+ if (rli->cur_log_fd >= 0)
+ {
+ end_io_cache(&rli->cache_buf);
+ (void)my_close(rli->cur_log_fd, MYF(MY_WME));
+ rli->cur_log_fd = -1;
+ }
+ rli->inited = 0;
+ rli->log_pos_current=0;
+ rli->relay_log.close(1);
+}
+
+/* try to connect until successful or slave killed */
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
{
return connect_to_master(thd, mysql, mi, 0);
@@ -1328,7 +1958,6 @@ static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
Try to connect until successful or slave killed or we have retried
master_retry_count times
*/
-
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
bool reconnect)
{
@@ -1337,15 +1966,10 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
ulong err_count=0;
char llbuff[22];
- /*
- If we lost connection after reading a state set event
- we will be re-reading it, so pending needs to be cleared
- */
- mi->pending = 0;
#ifndef DBUG_OFF
events_till_disconnect = disconnect_slave_event_count;
#endif
- while (!(slave_was_killed = slave_killed(thd)) &&
+ while (!(slave_was_killed = slave_killed(thd,mi)) &&
(reconnect ? mc_mysql_reconnect(mysql) :
!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, 0)))
@@ -1353,12 +1977,13 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
/* Don't repeat last error */
if (mc_mysql_errno(mysql) != last_errno)
{
- sql_print_error("Slave thread: error connecting to master: \
-%s, last_errno=%d, retry in %d sec",
+ sql_print_error("Slave I/O thread: error connecting to master \
+'%s@%s:%d': \
+%s, last_errno=%d, retry in %d sec",mi->user,mi->host,mi->port,
mc_mysql_error(mysql), last_errno=mc_mysql_errno(mysql),
mi->connect_retry);
}
- safe_sleep(thd, mi->connect_retry);
+ safe_sleep(thd,mi,mi->connect_retry);
/* by default we try forever. The reason is that failure will trigger
master election, so if the user did not set master_retry_count we
do not want to have electioin triggered on the first failure to
@@ -1377,10 +2002,10 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
{
if (reconnect)
sql_print_error("Slave: connected to master '%s@%s:%d',\
-replication resumed in log '%s' at position %s", glob_mi.user,
- glob_mi.host, glob_mi.port,
- RPL_LOG_NAME,
- llstr(glob_mi.pos,llbuff));
+replication resumed in log '%s' at position %s", mi->user,
+ mi->host, mi->port,
+ IO_RPL_LOG_NAME,
+ llstr(mi->master_log_pos,llbuff));
else
{
change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
@@ -1405,6 +2030,175 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
return connect_to_master(thd, mysql, mi, 1);
}
+int flush_relay_log_info(RELAY_LOG_INFO* rli)
+{
+ IO_CACHE* file = &rli->info_file;
+ char lbuf[22],lbuf1[22];
+
+ my_b_seek(file, 0L);
+ my_b_printf(file, "%s\n%s\n%s\n%s\n",
+ rli->relay_log_name, llstr(rli->relay_log_pos, lbuf),
+ rli->master_log_name, llstr(rli->master_log_pos, lbuf1)
+ );
+ flush_io_cache(file);
+ flush_io_cache(rli->cur_log);
+ return 0;
+}
+
+IO_CACHE* reopen_relay_log(RELAY_LOG_INFO* rli, const char** errmsg)
+{
+ DBUG_ASSERT(rli->cur_log != &rli->cache_buf);
+ IO_CACHE* cur_log = rli->cur_log=&rli->cache_buf;
+ DBUG_ASSERT(rli->cur_log_fd == -1);
+ if ((rli->cur_log_fd=open_binlog(cur_log,rli->relay_log_name,
+ errmsg))<0)
+ return 0;
+ my_b_seek(cur_log,rli->relay_log_pos);
+ return cur_log;
+}
+
+Log_event* next_event(RELAY_LOG_INFO* rli)
+{
+ Log_event* ev;
+ IO_CACHE* cur_log = rli->cur_log;
+ pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
+ const char* errmsg=0;
+ THD* thd = rli->sql_thd;
+ bool was_killed;
+ DBUG_ASSERT(thd != 0);
+
+ // For most operations we need to protect rli members with data_lock,
+ // so we will hold it for the most of the loop below
+ // However, we will release it whenever it is worth the hassle,
+ // and in the cases when we go into a pthread_cond_wait() with the
+ // non-data_lock mutex
+ pthread_mutex_lock(&rli->data_lock);
+
+ for (;!(was_killed=slave_killed(thd,rli));)
+ {
+ // we can have two kinds of log reading:
+ // hot_log - rli->cur_log points at the IO_CACHE of relay_log, which
+ // is actively being updated by the I/O thread. We need to be careful
+ // in this case and make sure that we are not looking at a stale log that
+ // has already been rotated. If it has been, we reopen the log
+ // the other case is much simpler - we just have a read only log that
+ // nobody else will be updating.
+ bool hot_log;
+ if ((hot_log = (cur_log != &rli->cache_buf)))
+ {
+ DBUG_ASSERT(rli->cur_log_fd == -1); // foreign descriptor
+ pthread_mutex_lock(log_lock);
+ // reading cur_log->init_count here is safe because the log will only
+ // be rotated when we hold relay_log.LOCK_log
+ if (cur_log->init_count != rli->cur_log_init_count)
+ {
+ if (!(cur_log=reopen_relay_log(rli,&errmsg)))
+ {
+ pthread_mutex_unlock(log_lock);
+ goto err;
+ }
+ pthread_mutex_unlock(log_lock);
+ hot_log=0;
+ }
+ }
+ DBUG_ASSERT(my_b_tell(cur_log) >= 4);
+ DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending);
+ if ((ev=Log_event::read_log_event(cur_log,0,rli->mi->old_format)))
+ {
+ DBUG_ASSERT(thd==rli->sql_thd);
+ if (hot_log)
+ pthread_mutex_unlock(log_lock);
+ pthread_mutex_unlock(&rli->data_lock);
+ return ev;
+ }
+ DBUG_ASSERT(thd==rli->sql_thd);
+ if (!cur_log->error) /* EOF */
+ {
+ // on a hot log, EOF means that there are no more updates to
+ // process and we must block until I/O thread adds some and
+ // signals us to continue
+ if (hot_log)
+ {
+ DBUG_ASSERT(cur_log->init_count == rli->cur_log_init_count);
+ //we can, and should release data_lock while we are waiting for
+ // update. If we do not, show slave status will block
+ pthread_mutex_unlock(&rli->data_lock);
+
+ // IMPORTANT: note that wait_for_update will unlock LOCK_log, but
+ // expects the caller to lock it
+ rli->relay_log.wait_for_update(rli->sql_thd);
+
+ // re-acquire data lock since we released it earlier
+ pthread_mutex_lock(&rli->data_lock);
+ continue;
+ }
+ // if the log was not hot, we need to move to the next log in
+ // sequence. The next log could be hot or cold, we deal with both
+ // cases separately after doing some common initialization
+ else
+ {
+ end_io_cache(cur_log);
+ DBUG_ASSERT(rli->cur_log_fd >= 0);
+ my_close(rli->cur_log_fd, MYF(MY_WME));
+ rli->cur_log_fd = -1;
+ int error;
+
+ // purge_first_log will properly set up relay log coordinates in rli
+ if (rli->relay_log.purge_first_log(rli))
+ {
+ errmsg = "Error purging processed log";
+ goto err;
+ }
+
+ // next log is hot
+ if (rli->relay_log.is_active(rli->linfo.log_file_name))
+ {
+#ifdef EXTRA_DEBUG
+ sql_print_error("next log '%s' is currently active",
+ rli->linfo.log_file_name);
+#endif
+ rli->cur_log = cur_log = rli->relay_log.get_log_file();
+ rli->cur_log_init_count = cur_log->init_count;
+ DBUG_ASSERT(rli->cur_log_fd == -1);
+
+ // read pointer has to be at the start since we are the only
+ // reader
+ if (check_binlog_magic(cur_log,&errmsg))
+ goto err;
+ continue;
+ }
+ // if we get here, the log was not hot, so we will have to
+ // open it ourselves
+#ifdef EXTRA_DEBUG
+ sql_print_error("next log '%s' is not active",
+ rli->linfo.log_file_name);
+#endif
+ // open_binlog() will check the magic header
+ if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
+ &errmsg))<0)
+ goto err;
+ }
+ }
+ else // read failed with a non-EOF error
+ {
+ // TODO: come up with something better to handle this error
+ sql_print_error("Slave SQL thread: I/O error reading \
+event(errno=%d,cur_log->error=%d)",
+ my_errno,cur_log->error);
+ // no need to hog the mutex while we sleep
+ pthread_mutex_unlock(&rli->data_lock);
+ safe_sleep(rli->sql_thd,rli->mi,1);
+ pthread_mutex_lock(&rli->data_lock);
+ }
+ }
+ if (!errmsg && was_killed)
+ errmsg = "slave SQL thread was killed";
+err:
+ pthread_mutex_unlock(&rli->data_lock);
+ sql_print_error("Error reading relay log event: %s", errmsg);
+ return 0;
+}
+
#ifdef __GNUC__
template class I_List_iterator<i_string>;
diff --git a/sql/slave.h b/sql/slave.h
index a5bc4d61309..9ad5c75a556 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -2,67 +2,265 @@
#define SLAVE_H
#include "mysql.h"
+#include "my_list.h"
#define SLAVE_NET_TIMEOUT 3600
+#define MAX_SLAVE_ERRMSG 1024
+
+/*
+ The replication is accomplished by starting two threads - I/O
+ thread, and SQL thread. I/O thread is associated with its
+ MASTER_INFO struct, so MASTER_INFO can be viewed as I/O thread
+ descriptor. SQL thread is associated with RELAY_LOG_INFO struct.
+
+ I/O thread reads maintains a connection to the master, and reads log
+ events from the master as they arrive, queueing them by writing them
+ out into the temporary slave binary log (relay log). The SQL thread,
+ in turn, reads the slave binary log executing each event.
+
+ Relay log is needed to be able to handle situations when there is a large
+ backlog of unprocessed events from the master (eg. one particular update
+ takes a day to finish), and to be able to restart the slave server without
+ having to re-read the master updates.
+ */
extern ulong slave_net_timeout, master_retry_count;
extern char* slave_load_tmpdir;
+extern my_string master_info_file,relay_log_info_file;
+extern my_string opt_relay_logname, opt_relaylog_index_name;
+extern bool opt_skip_slave_start;
+struct st_master_info;
+
+#define LOCK_ACTIVE_MI { pthread_mutex_lock(&LOCK_active_mi); \
+ ++active_mi_in_use; \
+ pthread_mutex_unlock(&LOCK_active_mi);}
+
+#define UNLOCK_ACTIVE_MI { pthread_mutex_lock(&LOCK_active_mi); \
+ --active_mi_in_use; \
+ pthread_mutex_unlock(&LOCK_active_mi); }
+
+/*
+ st_relay_log_info contains information on the current relay log and
+ relay log offset, and master log name and log sequence corresponding to the
+ last update. Additionally, misc information specific to the SQL thread is
+ included.
+
+ st_relay_log_info is initialized from the slave.info file if such exists.
+ Otherwise, data members are intialized with defaults. The initialization is
+ done with init_relay_log_info() call.
+
+ The format of slave.info file:
+
+ relay_log_name
+ relay_log_pos
+ master_log_name
+ master_log_pos
+
+ To clean up, call end_relay_log_info()
+ */
+typedef struct st_relay_log_info
+{
+ // info_fd - file descriptor of the info file. set only during
+ // initialization or clean up - safe to read anytime
+ // cur_log_fd - file descriptor of the current read relay log, protected by
+ // data_lock
+ File info_fd,cur_log_fd;
+
+ // IO_CACHE of the info file - set only during init or end, safe to read
+ // anytime
+ IO_CACHE info_file;
+
+ // name of current read relay log - protected by data_lock
+ char relay_log_name[FN_REFLEN];
+
+ // master log name corresponding to current read position - protected by
+ // data lock
+ char master_log_name[FN_REFLEN];
+
+ // original log position of last processed event - protected by data_lock
+ volatile uint32 master_log_pos;
+ // when we restart slave thread we need to have access to the previously
+ // created temporary tables. Modified only on init/end and by the SQL
+ // thread, read only by SQL thread, need no mutex
+ TABLE* save_temporary_tables;
+
+ // relay_log_pos - current offset in the relay log - protected by data_lock
+ // pending - in some cases we do not increment offset immediately after
+ // processing an event, because the following event needs to be processed
+ // atomically together with this one ( so far, there is only one type of
+ // such event - Intvar_event that sets auto_increment value). However, once
+ // both events have been processed, we need to increment by the cumulative
+ // offset. pending stored the extra offset to be added to the position.
+ ulonglong relay_log_pos,pending;
+
+ // standard lock acquistion order to avoid deadlocks:
+ // run_lock, data_lock, relay_log.LOCK_log,relay_log.LOCK_index
+ pthread_mutex_t data_lock,run_lock;
+
+ // start_cond is broadcast when SQL thread is started
+ // stop_cond - when stopped
+ // data_cond - when data protected by data_lock changes
+ pthread_cond_t start_cond,stop_cond,data_cond;
+
+ // if not set, the value of other members of the structure are undefined
+ bool inited;
+
+ // parent master info structure
+ struct st_master_info *mi;
+
+ // protected with internal locks
+ // must get data_lock when resetting the logs
+ MYSQL_LOG relay_log;
+ LOG_INFO linfo;
+ IO_CACHE cache_buf,*cur_log;
+
+ /* needed to deal properly with cur_log getting closed and re-opened with
+ a different log under our feet
+ */
+ int cur_log_init_count;
+
+ volatile bool abort_slave, slave_running;
+// needed for problems when slave stops and
+// we want to restart it skipping one or more events in the master log that
+// have caused errors, and have been manually applied by DBA already
+ volatile uint32 slave_skip_counter;
+#ifndef DBUG_OFF
+ int events_till_abort;
+#endif
+ int last_slave_errno;
+ char last_slave_error[MAX_SLAVE_ERRMSG];
+ THD* sql_thd;
+ bool log_pos_current;
+
+ st_relay_log_info():info_fd(-1),cur_log_fd(-1),inited(0),
+ cur_log_init_count(0),
+ log_pos_current(0)
+ {
+ relay_log_name[0] = master_log_name[0] = 0;
+ pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
+ pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
+ pthread_cond_init(&data_cond, NULL);
+ pthread_cond_init(&start_cond, NULL);
+ pthread_cond_init(&stop_cond, NULL);
+ }
+ ~st_relay_log_info()
+ {
+ pthread_mutex_destroy(&run_lock);
+ pthread_mutex_destroy(&data_lock);
+ pthread_cond_destroy(&data_cond);
+ pthread_cond_destroy(&start_cond);
+ pthread_cond_destroy(&stop_cond);
+ }
+ inline void inc_pending(ulonglong val)
+ {
+ pending += val;
+ }
+ // TODO: this probably needs to be fixed
+ inline void inc_pos(ulonglong val, uint32 log_pos, bool skip_lock=0)
+ {
+ if (!skip_lock)
+ pthread_mutex_lock(&data_lock);
+ relay_log_pos += val+pending;
+ pending = 0;
+ if (log_pos)
+ master_log_pos = log_pos+val;
+ pthread_cond_broadcast(&data_cond);
+ if (!skip_lock)
+ pthread_mutex_unlock(&data_lock);
+ }
+ // thread safe read of position - not needed if we are in the slave thread,
+ // but required otherwise
+ inline void read_pos(ulonglong& var)
+ {
+ pthread_mutex_lock(&data_lock);
+ var = relay_log_pos;
+ pthread_mutex_unlock(&data_lock);
+ }
+
+ int wait_for_pos(THD* thd, String* log_name, ulonglong log_pos);
+} RELAY_LOG_INFO;
+
+// repopen_relay_log() is called when we notice that the current "hot" log
+// got rotated under our feet
+IO_CACHE* reopen_relay_log(RELAY_LOG_INFO* rli, const char** errmsg);
+Log_event* next_event(RELAY_LOG_INFO* rli);
+
+/* st_master_info contains information about how to connect to a master,
+ current master log name, and current log offset, as well as misc
+ control variables
+
+ st_master_info is initialized once from the master.info file if such
+ exists. Otherwise, data members corresponding to master.info fields are
+ initialized with defaults specified by master-* options. The initialization
+ is done through init_master_info() call.
+
+ The format of master.info file:
+
+ log_name
+ log_pos
+ master_host
+ master_user
+ master_pass
+ master_port
+ master_connect_retry
+
+ To write out the contents of master.info file to disk ( needed every
+ time we read and queue data from the master ), a call to
+ flush_master_info() is required.
+
+ To clean up, call end_master_info()
+
+*/
+
typedef struct st_master_info
{
- char log_file_name[FN_REFLEN];
- ulonglong pos,pending;
- File fd; // we keep the file open, so we need to remember the file pointer
+ char master_log_name[FN_REFLEN];
+
+ ulonglong master_log_pos;
+ File fd;
IO_CACHE file;
+
// the variables below are needed because we can change masters on the fly
char host[HOSTNAME_LENGTH+1];
char user[USERNAME_LENGTH+1];
char password[HASH_PASSWORD_LENGTH+1];
uint port;
uint connect_retry;
- uint32 last_log_seq; // log sequence number of last processed event
- pthread_mutex_t lock;
- pthread_cond_t cond;
+ pthread_mutex_t data_lock,run_lock;
+ pthread_cond_t data_cond,start_cond,stop_cond;
bool inited;
bool old_format; /* master binlog is in 3.23 format */
+ RELAY_LOG_INFO rli;
+#ifndef DBUG_OFF
+ int events_till_abort;
+#endif
+ volatile bool abort_slave, slave_running;
+ THD* io_thd;
- st_master_info():pending(0),fd(-1),last_log_seq(0),inited(0),
- old_format(0)
+ st_master_info():fd(-1),inited(0),
+ old_format(0),io_thd(0)
{
host[0] = 0; user[0] = 0; password[0] = 0;
- pthread_mutex_init(&lock, MY_MUTEX_INIT_FAST);
- pthread_cond_init(&cond, NULL);
+ pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
+ pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
+ pthread_cond_init(&data_cond, NULL);
+ pthread_cond_init(&start_cond, NULL);
+ pthread_cond_init(&stop_cond, NULL);
}
~st_master_info()
{
- pthread_mutex_destroy(&lock);
- pthread_cond_destroy(&cond);
- }
- inline void inc_pending(ulonglong val)
- {
- pending += val;
- }
- inline void inc_pos(ulonglong val, uint32 log_seq)
- {
- pthread_mutex_lock(&lock);
- pos += val + pending;
- pending = 0;
- last_log_seq = log_seq;
- pthread_cond_broadcast(&cond);
- pthread_mutex_unlock(&lock);
- }
- // thread safe read of position - not needed if we are in the slave thread,
- // but required otherwise
- inline void read_pos(ulonglong& var)
- {
- pthread_mutex_lock(&lock);
- var = pos;
- pthread_mutex_unlock(&lock);
+ pthread_mutex_destroy(&run_lock);
+ pthread_mutex_destroy(&data_lock);
+ pthread_cond_destroy(&data_cond);
+ pthread_cond_destroy(&start_cond);
+ pthread_cond_destroy(&stop_cond);
}
- int wait_for_pos(THD* thd, String* log_name, ulonglong log_pos);
} MASTER_INFO;
+int queue_event(MASTER_INFO* mi,const char* buf,uint event_len);
+
typedef struct st_table_rule_ent
{
char* db;
@@ -74,22 +272,52 @@ typedef struct st_table_rule_ent
#define TABLE_RULE_ARR_SIZE 16
#define MAX_SLAVE_ERRMSG 1024
-#define RPL_LOG_NAME (glob_mi.log_file_name[0] ? glob_mi.log_file_name :\
+#define RPL_LOG_NAME (rli->master_log_name[0] ? rli->master_log_name :\
+ "FIRST")
+#define IO_RPL_LOG_NAME (mi->master_log_name[0] ? mi->master_log_name :\
"FIRST")
+/* masks for start/stop operations on io and sql slave threads */
+#define SLAVE_IO 1
+#define SLAVE_SQL 2
+#define SLAVE_FORCE_ALL 4 /* if this is set, if first gives an
+ error, second will be tried. Otherwise,
+ if first fails, we fail
+ */
+int init_slave();
int flush_master_info(MASTER_INFO* mi);
+int flush_relay_log_info(RELAY_LOG_INFO* rli);
int register_slave_on_master(MYSQL* mysql);
+int terminate_slave_threads(MASTER_INFO* mi, int thread_mask,
+ bool skip_lock = 0);
+int terminate_slave_thread(THD* thd, pthread_mutex_t* term_mutex,
+ pthread_mutex_t* cond_lock,
+ pthread_cond_t* term_cond,
+ volatile bool* slave_running);
+int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
+ MASTER_INFO* mi, const char* master_info_fname,
+ const char* slave_info_fname, int thread_mask);
+/* cond_lock is usually same as start_lock. It is needed for the case when
+ start_lock is 0 which happens if start_slave_thread() is called already
+ inside the start_lock section, but at the same time we want a
+ pthread_cond_wait() on start_cond,start_lock
+*/
+int start_slave_thread(pthread_handler h_func, pthread_mutex_t* start_lock,
+ pthread_mutex_t *cond_lock,
+ pthread_cond_t* start_cond,
+ volatile bool* slave_running,
+ MASTER_INFO* mi);
int mysql_table_dump(THD* thd, const char* db,
const char* tbl_name, int fd = -1);
// if fd is -1, dump to NET
-int fetch_nx_table(THD* thd, const char* db_name, const char* table_name,
+int fetch_master_table(THD* thd, const char* db_name, const char* table_name,
MASTER_INFO* mi, MYSQL* mysql);
// retrieve non-exitent table from master
-int show_master_info(THD* thd);
+int show_master_info(THD* thd, MASTER_INFO* mi);
int show_binlog_info(THD* thd);
int tables_ok(THD* thd, TABLE_LIST* tables);
@@ -105,29 +333,31 @@ int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec);
void init_table_rule_hash(HASH* h, bool* h_inited);
void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited);
char* rewrite_db(char* db);
-int check_expected_error(THD* thd, int error_code);
+int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int error_code);
void skip_load_data_infile(NET* net);
-void slave_print_error(int err_code, const char* msg, ...);
+void slave_print_error(RELAY_LOG_INFO* rli,int err_code, const char* msg, ...);
void end_slave(); // clean up
-int init_master_info(MASTER_INFO* mi);
+int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
+ const char* slave_info_fname);
void end_master_info(MASTER_INFO* mi);
-extern bool opt_log_slave_updates ;
-pthread_handler_decl(handle_slave,arg);
-extern bool volatile abort_loop, abort_slave, slave_running;
-extern uint32 slave_skip_counter;
-// needed for problems when slave stops and
-// we want to restart it skipping one or more events in the master log that
-// have caused errors, and have been manually applied by DBA already
+int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname);
+void end_relay_log_info(RELAY_LOG_INFO* rli);
+void lock_slave_threads(MASTER_INFO* mi);
+void unlock_slave_threads(MASTER_INFO* mi);
+void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse);
+int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,ulonglong pos,
+ bool need_data_lock, const char** errmsg);
-extern int last_slave_errno;
-#ifndef DBUG_OFF
-extern int events_till_abort;
-#endif
-extern char last_slave_error[MAX_SLAVE_ERRMSG];
-extern pthread_t slave_real_id;
-extern THD* slave_thd;
-extern MASTER_INFO glob_mi;
+int purge_relay_logs(RELAY_LOG_INFO* rli,bool just_reset,const char** errmsg);
+
+extern bool opt_log_slave_updates ;
+pthread_handler_decl(handle_slave_io,arg);
+pthread_handler_decl(handle_slave_sql,arg);
+extern bool volatile abort_loop;
+extern MASTER_INFO main_mi, *active_mi; // active_mi for multi-master
+extern volatile int active_mi_in_use;
+extern LIST master_list;
extern HASH replicate_do_table, replicate_ignore_table;
extern DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table;
extern bool do_table_inited, ignore_table_inited,
@@ -141,7 +371,8 @@ extern int disconnect_slave_event_count, abort_slave_event_count ;
// the master variables are defaults read from my.cnf or command line
extern uint master_port, master_connect_retry, report_port;
extern my_string master_user, master_password, master_host,
- master_info_file, report_user, report_host, report_password;
+ master_info_file, relay_log_info_file, report_user, report_host,
+ report_password;
extern I_List<i_string> replicate_do_db, replicate_ignore_db;
extern I_List<i_string_pair> replicate_rewrite_db;
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index d76461b6faf..44a1dbc1720 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -98,7 +98,6 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0),
current_linfo = 0;
slave_thread = 0;
slave_proxy_id = 0;
- log_seq = 0;
file_id = 0;
cond_count=0;
convert_set=0;
@@ -119,6 +118,7 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0),
where="field list";
server_id = ::server_id;
slave_net = 0;
+ log_pos = 0;
server_status=SERVER_STATUS_AUTOCOMMIT;
update_lock_default= low_priority_updates ? TL_WRITE_LOW_PRIORITY : TL_WRITE;
options=thd_startup_options;
@@ -216,10 +216,11 @@ THD::~THD()
DBUG_VOID_RETURN;
}
-void THD::prepare_to_die()
+void THD::awake(bool prepare_to_die)
{
+ if (prepare_to_die)
+ killed = 1;
thr_alarm_kill(real_id);
- killed = 1;
#ifdef SIGNAL_WITH_VIO_CLOSE
close_active_vio();
#endif
@@ -228,6 +229,10 @@ void THD::prepare_to_die()
pthread_mutex_lock(&mysys_var->mutex);
if (!system_thread) // Don't abort locks
mysys_var->abort=1;
+ // this broadcast could be up in the air if the victim thread
+ // exits the cond in the time between read and broadcast, but that is
+ // ok since all we want to do is to make the victim thread get out
+ // of waiting on current_cond
if (mysys_var->current_cond)
{
pthread_mutex_lock(mysys_var->current_mutex);
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 698a90c1a28..87d19381b78 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -21,6 +21,8 @@
#pragma interface /* gcc class implementation */
#endif
+// TODO: create log.h and move all the log header stuff there
+
class Query_log_event;
class Load_log_event;
class Slave_log_event;
@@ -40,6 +42,8 @@ enum enum_log_type { LOG_CLOSED, LOG_NORMAL, LOG_NEW, LOG_BIN };
#define LOG_INFO_FATAL -7
#define LOG_INFO_IN_USE -8
+struct st_relay_log_info;
+
typedef struct st_log_info
{
char log_file_name[FN_REFLEN];
@@ -64,8 +68,6 @@ class MYSQL_LOG {
char time_buff[20],db[NAME_LEN+1];
char log_file_name[FN_REFLEN],index_file_name[FN_REFLEN];
bool write_error,inited;
- uint32 log_seq; // current event sequence number
- // needed this for binlog
uint file_id; // current file sequence number for load data infile
// binary logging
bool no_rotate; // for binlog - if log name can never change
@@ -74,36 +76,52 @@ class MYSQL_LOG {
// purging
enum cache_type io_cache_type;
bool need_start_event;
+ pthread_cond_t update_cond;
+ bool no_auto_events; // for relay binlog
friend class Log_event;
public:
MYSQL_LOG();
~MYSQL_LOG();
pthread_mutex_t* get_log_lock() { return &LOCK_log; }
+ IO_CACHE* get_log_file() { return &log_file; }
+ void signal_update() { pthread_cond_broadcast(&update_cond);}
+ void wait_for_update(THD* thd);
void set_need_start_event() { need_start_event = 1; }
void set_index_file_name(const char* index_file_name = 0);
void init(enum_log_type log_type_arg,
- enum cache_type io_cache_type_arg = WRITE_CACHE);
+ enum cache_type io_cache_type_arg = WRITE_CACHE,
+ bool no_auto_events_arg = 0);
void open(const char *log_name,enum_log_type log_type,
- const char *new_name=0);
+ const char *new_name, enum cache_type io_cache_type_arg,
+ bool no_auto_events_arg);
void new_file(bool inside_mutex = 0);
bool open_index(int options);
void close_index();
- bool write(THD *thd, enum enum_server_command command,const char *format,...);
+ bool write(THD *thd, enum enum_server_command command,
+ const char *format,...);
bool write(THD *thd, const char *query, uint query_length,
time_t query_start=0);
bool write(Log_event* event_info); // binary log write
bool write(IO_CACHE *cache);
+
+ //v stands for vector
+ //invoked as appendv(buf1,len1,buf2,len2,...,bufn,lenn,0)
+ bool appendv(const char* buf,uint len,...);
+
int generate_new_name(char *new_name,const char *old_name);
void make_log_name(char* buf, const char* log_ident);
bool is_active(const char* log_file_name);
int purge_logs(THD* thd, const char* to_log);
+ int purge_first_log(struct st_relay_log_info* rli);
+ int reset_logs(THD* thd);
void close(bool exiting = 0); // if we are exiting, we also want to close the
// index file
// iterating through the log index file
- int find_first_log(LOG_INFO* linfo, const char* log_name);
- int find_next_log(LOG_INFO* linfo);
+ int find_first_log(LOG_INFO* linfo, const char* log_name,
+ bool need_mutex=1);
+ int find_next_log(LOG_INFO* linfo, bool need_mutex=1);
int get_current_log(LOG_INFO* linfo);
uint next_file_id();
@@ -226,33 +244,72 @@ public:
};
-/****************************************************************************
-** every connection is handled by a thread with a THD
-****************************************************************************/
-
class delayed_insert;
+/* For each client connection we create a separate thread with THD serving as
+ a thread/connection descriptor */
+
class THD :public ilink {
public:
- NET net;
- LEX lex;
- MEM_ROOT mem_root;
- HASH user_vars;
- String packet; /* Room for 1 row */
- struct sockaddr_in remote;
- struct rand_struct rand;
+ NET net; // client connection descriptor
+ LEX lex; // parse tree descriptor
+ MEM_ROOT mem_root; // memory allocation pool
+ HASH user_vars; // hash for user variables
+ String packet; // dynamic string buffer used for network I/O
+ struct sockaddr_in remote; // client socket address
+ struct rand_struct rand; // used for authentication
+
+ /* query points to the current query,
+ thread_stack is a pointer to the stack frame of handle_one_connection(),
+ which is called first in the thread for handling a client
+ */
char *query,*thread_stack;
+ /*
+ host - host of the client
+ user - user of the client, set to NULL until the user has been read from
+ the connection
+ priv_user - not sure why we have it, but it is set to "boot" when we run
+ with --bootstrap
+ db - currently selected database
+ ip - client IP
+ */
+
char *host,*user,*priv_user,*db,*ip;
+ /* proc_info points to a string that will show in the Info column of
+ SHOW PROCESSLIST output
+ host_or_ip points to host if host is available, otherwise points to ip
+ */
const char *proc_info, *host_or_ip;
+
+ /*
+ client_capabilities has flags describing what the client can do
+ sql_mode determines if certain non-standard SQL behaviour should be
+ enabled
+ max_packet_length - supposed to be maximum packet length the client
+ can handle, but it currently appears to be assigned but never used
+ except for one debugging statement
+ */
uint client_capabilities,sql_mode,max_packet_length;
+
+ /*
+ master_access - privillege descriptor mask for system threads
+ db_access - privillege descriptor mask for regular threads
+ */
uint master_access,db_access;
+
+ /*
+ open_tables - list of regular tables in use by this thread
+ temporary_tables - list of temp tables in use by this thread
+ handler_tables - list of tables that were opened with HANDLER OPEN
+ and are still in use by this thread
+ */
TABLE *open_tables,*temporary_tables, *handler_tables;
+ // TODO: document the variables below
MYSQL_LOCK *lock,*locked_tables;
ULL *ull;
struct st_my_thread_var *mysys_var;
enum enum_server_command command;
uint32 server_id;
- uint32 log_seq;
uint32 file_id; // for LOAD DATA INFILE
const char *where;
time_t start_time,time_after_lock,user_time;
@@ -293,15 +350,18 @@ public:
bool system_thread,in_lock_tables,global_read_lock;
bool query_error, bootstrap, cleanup_done;
bool volatile killed;
- LOG_INFO* current_linfo;
+
// if we do a purge of binary logs, log index info of the threads
// that are currently reading it needs to be adjusted. To do that
// each thread that is using LOG_INFO needs to adjust the pointer to it
-
- ulong slave_proxy_id; // in slave thread we need to know in behalf of which
+ LOG_INFO* current_linfo;
+
+ // in slave thread we need to know in behalf of which
// thread the query is being run to replicate temp tables properly
-
+ ulong slave_proxy_id;
+
NET* slave_net; // network connection from slave to master
+ uint32 log_pos;
THD();
~THD();
@@ -331,7 +391,7 @@ public:
pthread_mutex_unlock(&active_vio_lock);
}
#endif
- void prepare_to_die();
+ void awake(bool prepare_to_die);
inline const char* enter_cond(pthread_cond_t *cond, pthread_mutex_t* mutex,
const char* msg)
{
diff --git a/sql/sql_lex.h b/sql/sql_lex.h
index 9f9fe6c79b3..a796eca58f9 100644
--- a/sql/sql_lex.h
+++ b/sql/sql_lex.h
@@ -94,7 +94,6 @@ typedef struct st_lex_master_info
{
char* host, *user, *password,*log_file_name;
uint port, connect_retry;
- ulong last_log_seq;
ulonglong pos;
ulong server_id;
} LEX_MASTER_INFO;
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 96185a174b5..a459ca0d602 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -1220,14 +1220,18 @@ mysql_execute_command(void)
{
if (check_access(thd, PROCESS_ACL, any_db))
goto error;
- res = change_master(thd);
+ LOCK_ACTIVE_MI;
+ res = change_master(thd,active_mi);
+ UNLOCK_ACTIVE_MI;
break;
}
case SQLCOM_SHOW_SLAVE_STAT:
{
if (check_process_priv(thd))
goto error;
- res = show_master_info(thd);
+ LOCK_ACTIVE_MI;
+ res = show_master_info(thd,active_mi);
+ UNLOCK_ACTIVE_MI;
break;
}
case SQLCOM_SHOW_MASTER_STAT:
@@ -1245,7 +1249,7 @@ mysql_execute_command(void)
break;
case SQLCOM_LOAD_MASTER_TABLE:
-
+ {
if (!tables->db)
tables->db=thd->db;
if (check_access(thd,CREATE_ACL,tables->db,&tables->grant.privilege))
@@ -1265,12 +1269,16 @@ mysql_execute_command(void)
net_printf(&thd->net,ER_WRONG_TABLE_NAME,tables->name);
break;
}
-
- if (fetch_nx_table(thd, tables->db, tables->real_name, &glob_mi, 0))
- break; // fetch_nx_table did send the error to the client
- send_ok(&thd->net);
+ LOCK_ACTIVE_MI;
+ // fetch_master_table will send the error to the client on failure
+ if (!fetch_master_table(thd, tables->db, tables->real_name,
+ active_mi, 0))
+ {
+ send_ok(&thd->net);
+ }
+ UNLOCK_ACTIVE_MI;
break;
-
+ }
case SQLCOM_CREATE_TABLE:
if (!tables->db)
tables->db=thd->db;
@@ -1368,12 +1376,19 @@ mysql_execute_command(void)
break;
case SQLCOM_SLAVE_START:
- start_slave(thd);
+ {
+ LOCK_ACTIVE_MI;
+ start_slave(thd,active_mi,1 /* net report*/);
+ UNLOCK_ACTIVE_MI;
break;
+ }
case SQLCOM_SLAVE_STOP:
- stop_slave(thd);
+ {
+ LOCK_ACTIVE_MI;
+ stop_slave(thd,active_mi,1/* net report*/);
+ UNLOCK_ACTIVE_MI;
break;
-
+ }
case SQLCOM_ALTER_TABLE:
#if defined(DONT_ALLOW_SHOW_COMMANDS)
send_error(&thd->net,ER_NOT_ALLOWED_COMMAND); /* purecov: inspected */
@@ -2967,6 +2982,7 @@ bool reload_acl_and_cache(THD *thd, uint options, TABLE_LIST *tables)
bool result=0;
select_errors=0; /* Write if more errors */
+ // TODO: figure out what's up with the commented out line below
// mysql_log.flush(); // Flush log
if (options & REFRESH_GRANT)
{
@@ -2998,10 +3014,15 @@ bool reload_acl_and_cache(THD *thd, uint options, TABLE_LIST *tables)
if (options & REFRESH_THREADS)
flush_thread_cache();
if (options & REFRESH_MASTER)
- reset_master();
+ if (reset_master(thd))
+ result=1;
if (options & REFRESH_SLAVE)
- reset_slave();
-
+ {
+ LOCK_ACTIVE_MI;
+ if (reset_slave(active_mi))
+ result=1;
+ UNLOCK_ACTIVE_MI;
+ }
return result;
}
@@ -3019,7 +3040,7 @@ void kill_one_thread(THD *thd, ulong id)
if ((thd->master_access & PROCESS_ACL) ||
!strcmp(thd->user,tmp->user))
{
- tmp->prepare_to_die();
+ tmp->awake(1 /*prepare to die*/);
error=0;
}
else
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 6c738ba36b4..146490c7b87 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -23,9 +23,9 @@
#include "mini_client.h"
#include <thr_alarm.h>
#include <my_dir.h>
+#include <assert.h>
extern const char* any_db;
-extern pthread_handler_decl(handle_slave,arg);
#ifndef DBUG_OFF
int max_binlog_dump_events = 0; // unlimited
@@ -33,6 +33,26 @@ bool opt_sporadic_binlog_dump_fail = 0;
static int binlog_dump_count = 0;
#endif
+int check_binlog_magic(IO_CACHE* log, const char** errmsg)
+{
+ char magic[4];
+ DBUG_ASSERT(my_b_tell(log) == 0);
+
+ if (my_b_read(log, (byte*) magic, sizeof(magic)))
+ {
+ *errmsg = "I/O error reading the header from the binary log";
+ sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno,
+ log->error);
+ return 1;
+ }
+ if (memcmp(magic, BINLOG_MAGIC, sizeof(magic)))
+ {
+ *errmsg = "Binlog has bad magic number; It's not a binary log file that can be used by this version of MySQL";
+ return 1;
+ }
+ return 0;
+}
+
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
const char**errmsg)
{
@@ -46,7 +66,10 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
int4store(header + SERVER_ID_OFFSET, server_id);
int4store(header + EVENT_LEN_OFFSET, event_len);
int2store(header + FLAGS_OFFSET, 0);
- int4store(header + LOG_SEQ_OFFSET, 0);
+
+ // TODO: check what problems this may cause and fix them
+ int4store(header + LOG_POS_OFFSET, 0);
+
packet->append(header, sizeof(header));
/* We need to split the next statement because of problem with cxx */
int4store(buf,4); // tell slave to skip magic number
@@ -133,7 +156,6 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
const char **errmsg)
{
File file;
- char magic[4];
if ((file = my_open(log_file_name, O_RDONLY | O_BINARY, MYF(MY_WME))) < 0 ||
init_io_cache(log, file, IO_SIZE*2, READ_CACHE, 0, 0,
@@ -142,19 +164,8 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
*errmsg = "Could not open log file"; // This will not be sent
goto err;
}
-
- if (my_b_read(log, (byte*) magic, sizeof(magic)))
- {
- *errmsg = "I/O error reading the header from the binary log";
- sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno,
- log->error);
+ if (check_binlog_magic(log,errmsg))
goto err;
- }
- if (memcmp(magic, BINLOG_MAGIC, sizeof(magic)))
- {
- *errmsg = "Binlog has bad magic number; It's not a binary log file that can be used by this version of MySQL";
- goto err;
- }
return file;
err:
@@ -366,7 +377,8 @@ impossible position";
packet->length(0);
packet->append("\0",1);
}
-
+ // TODO: now that we are logging the offset, check to make sure
+ // the recorded offset and the actual match
if (error != LOG_READ_EOF)
{
switch(error) {
@@ -410,13 +422,6 @@ impossible position";
// to signal us
{
log.error=0;
-
- // tell the kill thread how to wake us up
- thd->mysys_var->current_mutex = log_lock;
- thd->mysys_var->current_cond = &COND_binlog_update;
- const char* proc_info = thd->proc_info;
- thd->proc_info = "Slave connection: waiting for binlog update";
-
bool read_packet = 0, fatal_error = 0;
#ifndef DBUG_OFF
@@ -431,32 +436,30 @@ impossible position";
// no one will update the log while we are reading
// now, but we'll be quick and just read one record
pthread_mutex_lock(log_lock);
- switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*) 0))
+ switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0))
{
case 0:
+ pthread_mutex_unlock(log_lock);
read_packet = 1;
// we read successfully, so we'll need to send it to the
// slave
break;
case LOG_READ_EOF:
- DBUG_PRINT("wait",("waiting for data on binary log"));
+ DBUG_PRINT("wait",("waiting for data in binary log"));
+ // wait_for_update unlocks the log lock - needed to avoid race
if (!thd->killed)
- pthread_cond_wait(&COND_binlog_update, log_lock);
+ mysql_bin_log.wait_for_update(thd);
+ else
+ pthread_mutex_unlock(log_lock);
DBUG_PRINT("wait",("binary log received update"));
break;
default:
+ pthread_mutex_unlock(log_lock);
fatal_error = 1;
break;
}
- pthread_mutex_unlock(log_lock);
-
- pthread_mutex_lock(&thd->mysys_var->mutex);
- thd->mysys_var->current_mutex= 0;
- thd->mysys_var->current_cond= 0;
- thd->proc_info= proc_info;
- pthread_mutex_unlock(&thd->mysys_var->mutex);
-
+
if (read_packet)
{
thd->proc_info = "sending update to slave";
@@ -548,39 +551,37 @@ impossible position";
DBUG_VOID_RETURN;
}
-int start_slave(THD* thd , bool net_report)
+int start_slave(THD* thd , MASTER_INFO* mi, bool net_report)
{
int slave_errno = 0;
if (!thd) thd = current_thd;
NET* net = &thd->net;
-
+ int thread_mask;
+
if (check_access(thd, PROCESS_ACL, any_db))
return 1;
- pthread_mutex_lock(&LOCK_slave);
- if (!slave_running)
+ lock_slave_threads(mi); // this allows us to cleanly read slave_running
+ init_thread_mask(&thread_mask,mi,1 /* inverse */);
+ if (thread_mask)
{
- if (init_master_info(&glob_mi))
- slave_errno = ER_MASTER_INFO;
- else if (server_id_supplied && *glob_mi.host)
- {
- pthread_t hThread;
- if (pthread_create(&hThread, &connection_attrib, handle_slave, 0))
- {
- slave_errno = ER_SLAVE_THREAD;
- }
- while (!slave_running) // slave might already be running by now
- pthread_cond_wait(&COND_slave_start, &LOCK_slave);
- }
+ if (server_id_supplied && (!mi->inited || (mi->inited && *mi->host)))
+ slave_errno = start_slave_threads(0 /*no mutex */,
+ 1 /* wait for start */,
+ mi,
+ master_info_file,relay_log_info_file,
+ thread_mask);
else
slave_errno = ER_BAD_SLAVE;
}
else
slave_errno = ER_SLAVE_MUST_STOP;
-
- pthread_mutex_unlock(&LOCK_slave);
+
+ unlock_slave_threads(mi);
+
if (slave_errno)
{
- if (net_report) send_error(net, slave_errno);
+ if (net_report)
+ send_error(net, slave_errno);
return 1;
}
else if (net_report)
@@ -589,8 +590,7 @@ int start_slave(THD* thd , bool net_report)
return 0;
}
-
-int stop_slave(THD* thd, bool net_report )
+int stop_slave(THD* thd, MASTER_INFO* mi, bool net_report )
{
int slave_errno = 0;
if (!thd) thd = current_thd;
@@ -598,43 +598,14 @@ int stop_slave(THD* thd, bool net_report )
if (check_access(thd, PROCESS_ACL, any_db))
return 1;
-
- pthread_mutex_lock(&LOCK_slave);
- if (slave_running)
- {
- abort_slave = 1;
- KICK_SLAVE;
- // do not abort the slave in the middle of a query, so we do not set
- // thd->killed for the slave thread
- thd->proc_info = "waiting for slave to die";
- while(slave_running)
- {
- /* there is a small chance that slave thread might miss the first
- alarm. To protect againts it, resend the signal until it reacts
- */
-
- struct timespec abstime;
-#ifdef HAVE_TIMESPEC_TS_SEC
- abstime.ts_sec=time(NULL)+2;
- abstime.ts_nsec=0;
-#elif defined(__WIN__)
- abstime.tv_sec=time((time_t*) 0)+2;
- abstime.tv_nsec=0;
-#else
- struct timeval tv;
- gettimeofday(&tv,0);
- abstime.tv_sec=tv.tv_sec+2;
- abstime.tv_nsec=tv.tv_usec*1000;
-#endif
- pthread_cond_timedwait(&COND_slave_stopped, &LOCK_slave, &abstime);
- if (slave_running)
- KICK_SLAVE;
- }
- }
- else
- slave_errno = ER_SLAVE_NOT_RUNNING;
-
- pthread_mutex_unlock(&LOCK_slave);
+ thd->proc_info = "Killing slave";
+ int thread_mask;
+ lock_slave_threads(mi);
+ init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
+ slave_errno = (thread_mask) ?
+ terminate_slave_threads(mi,thread_mask,
+ 1 /*skip lock */) : ER_SLAVE_NOT_RUNNING;
+ unlock_slave_threads(mi);
thd->proc_info = 0;
if (slave_errno)
@@ -649,31 +620,43 @@ int stop_slave(THD* thd, bool net_report )
return 0;
}
-
-void reset_slave()
+int reset_slave(MASTER_INFO* mi)
{
MY_STAT stat_area;
char fname[FN_REFLEN];
- bool slave_was_running ;
-
- pthread_mutex_lock(&LOCK_slave);
- if ((slave_was_running = slave_running))
+ int restart_thread_mask = 0,error=0;
+ const char* errmsg=0;
+
+ lock_slave_threads(mi);
+ init_thread_mask(&restart_thread_mask,mi,0 /* not inverse */);
+ if ((error=terminate_slave_threads(mi,restart_thread_mask,1 /*skip lock*/))
+ || (error=purge_relay_logs(&mi->rli,1 /*just reset*/,&errmsg)))
+ goto err;
+
+ end_master_info(mi);
+ fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
+ if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
{
- pthread_mutex_unlock(&LOCK_slave);
- stop_slave(0,0);
+ error=1;
+ goto err;
}
- else
- pthread_mutex_unlock(&LOCK_slave);
-
- end_master_info(&glob_mi);
- fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
+ fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
- return;
- if (slave_was_running)
- start_slave(0,0);
+ {
+ error=1;
+ goto err;
+ }
+ if (restart_thread_mask)
+ error=start_slave_threads(0 /* mutex not needed*/,
+ 1 /* wait for start*/,
+ mi,master_info_file,relay_log_info_file,
+ restart_thread_mask);
+ // TODO: fix error messages so they get to the client
+err:
+ unlock_slave_threads(mi);
+ return error;
}
-
void kill_zombie_dump_threads(uint32 slave_server_id)
{
pthread_mutex_lock(&LOCK_thread_count);
@@ -692,119 +675,114 @@ void kill_zombie_dump_threads(uint32 slave_server_id)
make safe_mutex complain and abort.
We just to do kill the thread ourselves.
*/
-
- thr_alarm_kill(tmp->real_id);
- tmp->killed = 1;
- tmp->mysys_var->abort = 1;
- pthread_mutex_lock(&tmp->mysys_var->mutex);
- if (tmp->mysys_var->current_cond)
- {
- pthread_mutex_lock(tmp->mysys_var->current_mutex);
- pthread_cond_broadcast(tmp->mysys_var->current_cond);
- pthread_mutex_unlock(tmp->mysys_var->current_mutex);
- }
- pthread_mutex_unlock(&tmp->mysys_var->mutex);
+ tmp->awake(1/*prepare to die*/);
}
}
pthread_mutex_unlock(&LOCK_thread_count);
}
-int change_master(THD* thd)
+int change_master(THD* thd, MASTER_INFO* mi)
{
- bool slave_was_running;
+ int error=0,restart_thread_mask;
+ const char* errmsg=0;
+
// kill slave thread
- pthread_mutex_lock(&LOCK_slave);
- if ((slave_was_running = slave_running))
+ lock_slave_threads(mi);
+ init_thread_mask(&restart_thread_mask,mi,0 /*not inverse*/);
+ if (restart_thread_mask &&
+ (error=terminate_slave_threads(mi,
+ restart_thread_mask,
+ 1 /*skip lock*/)))
{
- abort_slave = 1;
- KICK_SLAVE;
- thd->proc_info = "waiting for slave to die";
- while (slave_running)
- pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done
+ send_error(&thd->net,error);
+ unlock_slave_threads(mi);
+ return 1;
}
- pthread_mutex_unlock(&LOCK_slave);
thd->proc_info = "changing master";
LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
-
- if (init_master_info(&glob_mi))
+ // TODO: see if needs re-write
+ if (init_master_info(mi,master_info_file,relay_log_info_file))
{
send_error(&thd->net, 0, "Could not initialize master info");
+ unlock_slave_threads(mi);
return 1;
}
- pthread_mutex_lock(&glob_mi.lock);
+ pthread_mutex_lock(&mi->data_lock);
if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
{
// if we change host or port, we must reset the postion
- glob_mi.log_file_name[0] = 0;
- glob_mi.pos = 4; // skip magic number
- glob_mi.pending = 0;
+ mi->master_log_name[0] = 0;
+ mi->master_log_pos = 4; // skip magic number
+ mi->rli.pending = 0;
}
if (lex_mi->log_file_name)
- strmake(glob_mi.log_file_name, lex_mi->log_file_name,
- sizeof(glob_mi.log_file_name));
+ strmake(mi->master_log_name, lex_mi->log_file_name,
+ sizeof(mi->master_log_name));
if (lex_mi->pos)
{
- glob_mi.pos = lex_mi->pos;
- glob_mi.pending = 0;
+ mi->master_log_pos = lex_mi->pos;
+ mi->rli.pending = 0;
}
if (lex_mi->host)
- strmake(glob_mi.host, lex_mi->host, sizeof(glob_mi.host));
+ strmake(mi->host, lex_mi->host, sizeof(mi->host));
if (lex_mi->user)
- strmake(glob_mi.user, lex_mi->user, sizeof(glob_mi.user));
+ strmake(mi->user, lex_mi->user, sizeof(mi->user));
if (lex_mi->password)
- strmake(glob_mi.password, lex_mi->password, sizeof(glob_mi.password));
+ strmake(mi->password, lex_mi->password, sizeof(mi->password));
if (lex_mi->port)
- glob_mi.port = lex_mi->port;
+ mi->port = lex_mi->port;
if (lex_mi->connect_retry)
- glob_mi.connect_retry = lex_mi->connect_retry;
+ mi->connect_retry = lex_mi->connect_retry;
+
+ flush_master_info(mi);
+ pthread_mutex_unlock(&mi->data_lock);
+ thd->proc_info="purging old relay logs";
+ if (purge_relay_logs(&mi->rli,0 /* not only reset, but also reinit*/,
+ &errmsg))
+ {
+ send_error(&thd->net, 0, "Failed purging old relay logs");
+ unlock_slave_threads(mi);
+ return 1;
+ }
+ pthread_mutex_lock(&mi->rli.data_lock);
+ mi->rli.master_log_pos = mi->master_log_pos;
+ strnmov(mi->rli.master_log_name,mi->master_log_name,
+ sizeof(mi->rli.master_log_name));
+ if (!mi->rli.master_log_name[0]) // uninitialized case
+ mi->rli.master_log_pos=0;
+ pthread_cond_broadcast(&mi->rli.data_cond);
+ pthread_mutex_unlock(&mi->rli.data_lock);
- flush_master_info(&glob_mi);
- pthread_mutex_unlock(&glob_mi.lock);
thd->proc_info = "starting slave";
- if (slave_was_running)
- start_slave(0,0);
+ if (restart_thread_mask)
+ error=start_slave_threads(0 /* mutex not needed*/,
+ 1 /* wait for start*/,
+ mi,master_info_file,relay_log_info_file,
+ restart_thread_mask);
+err:
+ unlock_slave_threads(mi);
thd->proc_info = 0;
-
- send_ok(&thd->net);
+ if (error)
+ send_error(&thd->net,error);
+ else
+ send_ok(&thd->net);
return 0;
}
-
-void reset_master()
+int reset_master(THD* thd)
{
if (!mysql_bin_log.is_open())
{
my_error(ER_FLUSH_MASTER_BINLOG_CLOSED, MYF(ME_BELL+ME_WAITTANG));
- return;
- }
-
- LOG_INFO linfo;
- pthread_mutex_t* log_lock = mysql_bin_log.get_log_lock();
- pthread_mutex_lock(log_lock);
- if (mysql_bin_log.find_first_log(&linfo, ""))
- {
- pthread_mutex_unlock(log_lock);
- return;
- }
-
- for(;;)
- {
- my_delete(linfo.log_file_name, MYF(MY_WME));
- if (mysql_bin_log.find_next_log(&linfo))
- break;
+ return 1;
}
- mysql_bin_log.close(1); // exiting close
- my_delete(mysql_bin_log.get_index_fname(), MYF(MY_WME));
- mysql_bin_log.set_need_start_event();
- mysql_bin_log.open(opt_bin_logname,LOG_BIN);
- pthread_mutex_unlock(log_lock);
+ return mysql_bin_log.reset_logs(thd);
}
-
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
const char* log_file_name2, ulonglong log_pos2)
{
@@ -891,6 +869,7 @@ int show_binlog_events(THD* thd)
if (event_count < limit_end && log.error)
{
errmsg = "Wrong offset or I/O error";
+ pthread_mutex_unlock(mysql_bin_log.get_log_lock());
goto err;
}
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
index 4b9f741dde7..360fd50a1e3 100644
--- a/sql/sql_repl.h
+++ b/sql/sql_repl.h
@@ -26,30 +26,26 @@ extern int max_binlog_dump_events;
extern bool opt_sporadic_binlog_dump_fail;
#endif
-#ifdef SIGNAL_WITH_VIO_CLOSE
-#define KICK_SLAVE { slave_thd->close_active_vio(); \
- thr_alarm_kill(slave_real_id); }
-#else
-#define KICK_SLAVE thr_alarm_kill(slave_real_id);
-#endif
+#define KICK_SLAVE(thd) thd->awake(0 /* do not prepare to die*/);
File open_binlog(IO_CACHE *log, const char *log_file_name,
const char **errmsg);
-int start_slave(THD* thd = 0, bool net_report = 1);
-int stop_slave(THD* thd = 0, bool net_report = 1);
-int change_master(THD* thd);
+int start_slave(THD* thd, MASTER_INFO* mi, bool net_report);
+int stop_slave(THD* thd, MASTER_INFO* mi, bool net_report);
+int change_master(THD* thd, MASTER_INFO* mi);
int show_binlog_events(THD* thd);
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
const char* log_file_name2, ulonglong log_pos2);
-void reset_slave();
-void reset_master();
+int reset_slave(MASTER_INFO* mi);
+int reset_master(THD* thd);
int purge_master_logs(THD* thd, const char* to_log);
bool log_in_use(const char* log_name);
void adjust_linfo_offsets(my_off_t purge_offset);
int show_binlogs(THD* thd);
extern int init_master_info(MASTER_INFO* mi);
void kill_zombie_dump_threads(uint32 slave_server_id);
+int check_binlog_magic(IO_CACHE* log, const char** errmsg);
typedef struct st_load_file_info
{
diff --git a/sql/sql_show.cc b/sql/sql_show.cc
index 19c3d89caaf..1dac8033ad7 100644
--- a/sql/sql_show.cc
+++ b/sql/sql_show.cc
@@ -1168,6 +1168,15 @@ int mysqld_show(THD *thd, const char *wild, show_var_st *variables)
case SHOW_RPL_STATUS:
net_store_data(&packet2, rpl_status_type[(int)rpl_status]);
break;
+ case SHOW_SLAVE_RUNNING:
+ {
+ LOCK_ACTIVE_MI;
+ net_store_data(&packet2, (active_mi->slave_running &&
+ active_mi->rli.slave_running)
+ ? "ON" : "OFF");
+ UNLOCK_ACTIVE_MI;
+ break;
+ }
case SHOW_OPENTABLES:
net_store_data(&packet2,(uint32) cached_tables());
break;
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index 01ba468175e..2b62ee03575 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -2453,15 +2453,14 @@ show_param:
YYABORT;
}
| NEW_SYM MASTER_SYM FOR_SYM SLAVE WITH MASTER_LOG_FILE_SYM EQ
- TEXT_STRING AND MASTER_LOG_POS_SYM EQ ulonglong_num AND
- MASTER_LOG_SEQ_SYM EQ ULONG_NUM AND MASTER_SERVER_ID_SYM EQ
+ TEXT_STRING AND MASTER_LOG_POS_SYM EQ ulonglong_num
+ AND MASTER_SERVER_ID_SYM EQ
ULONG_NUM
{
Lex->sql_command = SQLCOM_SHOW_NEW_MASTER;
Lex->mi.log_file_name = $8.str;
Lex->mi.pos = $12;
- Lex->mi.last_log_seq = $16;
- Lex->mi.server_id = $20;
+ Lex->mi.server_id = $16;
}
| MASTER_SYM LOGS_SYM
{
@@ -3083,12 +3082,18 @@ option_value:
}
| SQL_SLAVE_SKIP_COUNTER equal ULONG_NUM
{
- pthread_mutex_lock(&LOCK_slave);
- if (slave_running)
+ LOCK_ACTIVE_MI;
+ pthread_mutex_lock(&active_mi->rli.run_lock);
+ if (active_mi->rli.slave_running)
send_error(&current_thd->net, ER_SLAVE_MUST_STOP);
else
- slave_skip_counter = $3;
- pthread_mutex_unlock(&LOCK_slave);
+ {
+ pthread_mutex_lock(&active_mi->rli.data_lock);
+ active_mi->rli.slave_skip_counter = $3;
+ pthread_mutex_unlock(&active_mi->rli.data_lock);
+ }
+ pthread_mutex_unlock(&active_mi->rli.run_lock);
+ UNLOCK_ACTIVE_MI;
}
| ident equal DEFAULT
{
diff --git a/sql/stacktrace.c b/sql/stacktrace.c
index bef53c71a42..dd7db5548c1 100644
--- a/sql/stacktrace.c
+++ b/sql/stacktrace.c
@@ -123,7 +123,7 @@ terribly wrong...\n");
}
#endif /* __alpha__ */
- if (!stack_bottom)
+ if (!stack_bottom || (gptr) stack_bottom > (gptr) &fp)
{
ulong tmp= min(0x10000,thread_stack);
/* Assume that the stack starts at the previous even 65K */
diff --git a/sql/structs.h b/sql/structs.h
index 780057061c3..e76e628ddda 100644
--- a/sql/structs.h
+++ b/sql/structs.h
@@ -140,7 +140,7 @@ enum SHOW_TYPE { SHOW_LONG,SHOW_CHAR,SHOW_INT,SHOW_CHAR_PTR,SHOW_BOOL,
,SHOW_SSL_CTX_SESS_TIMEOUTS, SHOW_SSL_CTX_SESS_CACHE_FULL
,SHOW_SSL_GET_CIPHER_LIST
#endif /* HAVE_OPENSSL */
- ,SHOW_RPL_STATUS
+ ,SHOW_RPL_STATUS, SHOW_SLAVE_RUNNING
};
enum SHOW_COMP_OPTION { SHOW_OPTION_YES, SHOW_OPTION_NO, SHOW_OPTION_DISABLED};
diff --git a/support-files/build-tags b/support-files/build-tags
new file mode 100755
index 00000000000..d5f9fbf5100
--- /dev/null
+++ b/support-files/build-tags
@@ -0,0 +1,9 @@
+#! /bin/sh
+
+rm -f TAGS
+filter='\.cc$\|\.c$\|\.h$\|\.yy$'
+files=`bk -r sfiles -gU | grep $filter `
+for f in $files ;
+do
+ etags -o TAGS --append $f
+done